- Notifications
You must be signed in to change notification settings - Fork36
The message queue in java.(java 简易版本 mq 实现)
License
houbb/mq
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
mq 是基于 netty 实现的 java mq 框架,类似于 rocket mq。
主要用于个人学习,由渐入深,理解 mq 的底层实现原理。
基于 netty4 的客户端调用服务端
timeout 超时处理
broker 启动的 check 检测服务可用性
load balance 负载均衡
基于 TAG 的消息过滤,broker 端实现
生产者的消息同步发送,ONE WAY 发送
生产消息的批量发送
消息状态的批量确认
fail 支持 failOver failFast 等失败处理策略
heartbeat 服务端心跳
AT LEAST ONCE 最少一次原则
依赖 maven 包:
<dependency> <groupId>com.github.houbb</groupId> <artifactId>mq-broker</artifactId> <version>0.1.3</version></dependency>
代码实现:
MqBrokerbroker =newMqBroker();broker.start();
依赖 maven 包:
<dependency> <groupId>com.github.houbb</groupId> <artifactId>mq-consumer</artifactId> <version>0.1.3</version></dependency>
代码实现:
finalMqConsumerPushmqConsumerPush =newMqConsumerPush();mqConsumerPush.start();mqConsumerPush.subscribe("TOPIC","TAGA");mqConsumerPush.registerListener(newIMqConsumerListener() {@OverridepublicConsumerStatusconsumer(MqMessagemqMessage,IMqConsumerListenerContextcontext) {System.out.println("---------- 自定义 " +JSON.toJSONString(mqMessage));returnConsumerStatus.SUCCESS; }});
依赖 maven 包:
<dependency> <groupId>com.github.houbb</groupId> <artifactId>mq-producer</artifactId> <version>0.1.3</version></dependency>
代码实现:
MqProducermqProducer =newMqProducer();mqProducer.start();Stringmessage ="HELLO MQ!";MqMessagemqMessage =newMqMessage();mqMessage.setTopic("TOPIC");mqMessage.setTags(Arrays.asList("TAGA","TAGB"));mqMessage.setPayload(message);SendResultsendResult =mqProducer.send(mqMessage);System.out.println(JSON.toJSON(sendResult));
工作至今,接触 mq 框架已经有很长时间。
但是对于其原理一直只是知道个大概,从来没有深入学习过。
以前一直想写,但由于各种原因被耽搁。
这些技术的准备阶段,花费了比较长的时间。
也建议想写 mq 框架的有相关的知识储备。
其他 mq 框架使用的经验此处不再赘述。
原来一直想写 mq,却不行动的原因就是想的太多,做的太少。
想一下把全部写完,结果就是啥都没写。
所以本次的开发,每个代码分支做的事情实际很少,只做一个功能点。
陆陆续续经过近一个月的完善,对 mq 框架有了自己的体会和进一步的认知。
代码实现功能,主要参考Apache Dubbo
文档将使用 markdown 文本的形式,补充 code 层面没有的东西。
【mq】从零开始实现 mq-02-如何实现生产者调用消费者?
【mq】从零开始实现 mq-03-引入 broker 中间人
【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat
【mq】从零开始实现 mq-07-负载均衡 load balance
【mq】从零开始实现 mq-09-消费者拉取消息 pull message
【mq】从零开始实现 mq-10-消费者拉取消息回执 pull message ack
【mq】从零开始实现 mq-11-消费者消息回执添加分组信息 pull message ack groupName
代码有详细的注释,便于阅读和后期维护。
目前测试代码算不上完善。后续将陆续补全。
| 模块 | 说明 |
|---|---|
| mq-common | 公共代码 |
| mq-broker | 注册中心 |
| mq-producer | 服务端 |
| mq-consumer | 客户端 |
| mq-test | 测试模块 |
这部分测试代码可以关注公众号【老马啸西风】,后台回复【mq】领取。
all 模块
check broker 启动检测
关闭时通知 register center
优雅关闭添加超时设置
heartbeat 心跳检测机制
完善 load-balance 实现 + shardingkey 粘性消费、请求
失败重试的拓展
消费者 pull 策略实现
pull 消息消费的 ACK 处理
broker springboot 实现
消息的 ack 处理,要基于 groupName 进行处理
消息的回溯消费 offset
消息的批量发送,批量 ACK
添加注册鉴权,保证安全性
顺序消息
事务消息
定时消息
流量控制 back-press 反压
消息可靠性
offline message 离线消息
dead message 死信队列
断线重连
About
The message queue in java.(java 简易版本 mq 实现)
Resources
License
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Releases
Packages0
Uh oh!
There was an error while loading.Please reload this page.
