- Notifications
You must be signed in to change notification settings - Fork0
基于RocketMQ5.0版本的Grpc 客户端API封装,node.js client by grpc.
License
NotificationsYou must be signed in to change notification settings
JanYork/rocketmq-client-node
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
这是一个自封装的 Apache RocketMQ Node.js 客户端,基于 RocketMQ 5.x版本的gRPC协议实现。
- 不依赖egg.js、rocketmq-nodejs-sdk等第三方库,它是纯粹的Node.js客户端。
- 实现了分布式下顺序保障机制。
- 自定义实现了类似的PushConsumer,通过长轮询的方式来获取消息。
- 统一的日志记录器注入,方便调试。
对于Macos or Linux,build操作失败,可以尝试使用sudo npm run build。
对于Windows,build操作失败,可以尝试使用“以管理员身份运行”命令行。
在开始客户端的部分之前,所需的一些前期工作(或者参照这里):
我们使用 npm 作为依赖管理和发布的工具。你可以在 npm 的官方网站了解到关于它的更多信息。这里是一些在开发阶段你会使用到的 npm 命令:
# 自动安装工程相关的依赖npm install# 初始化 grpc 代码npm run build# 安装rocketmq nodejs 客户端npm i rocketmq-grpc
开启 grpc-js 的调试日志:
GRPC_TRACE=compression GRPC_VERBOSITY=debug GRPC_TRACE=all npm run xxx or node xxx
发送消息
import{Producer}from"rocketmq-grpc";constsimpleProducer=newProducer({endpoints:'localhost:8081'});console.log('checkout:simpleProducer init success!');(async()=>{// 启动生产者simpleProducer.startup().then(()=>{console.log('checkout:simpleProducer startup success!');// 发送消息simpleProducer.send({topic:'checkout-topic',tag:'checkout',keys:['checkout-key'],body:Buffer.from('Hello, Checkout OK!')}).then(()=>{console.log('checkout:send message success!');process.exit(0);});});})();
消费消息
import{SimpleConsumer}from"rocketmq-grpc";constconsumer=newSimpleConsumer({consumerGroup:'checkout-group',endpoints:'192.168.1.162:8081',subscriptions:newMap().set('checkout-topic','*'),requestTimeout:3000,awaitDuration:30000// long polling});console.log('checkout:consumer init success!');constisShutdown=false;asyncfunctionstartAndConsumeMessages(){try{// 启动消费者awaitconsumer.startup();console.log('checkout:consumer startup success!');asyncfunctionconsumeMessages(){try{constmessages=awaitconsumer.receive(20);if(messages.length>0){console.log('got %d messages',messages.length);for(constmessageofmessages){console.log('body=%o',message.body.toString());awaitconsumer.ack(message);console.log('checkout:ack message success!');}}else{console.log('No messages received, waiting...');}}catch(error){console.error('An error occurred:',error);}finally{// // 等待一段时间后递归调用consumeMessages// await new Promise(resolve => setTimeout(resolve, 5000));console.log('checkout:waiting for messages...');if(!isShutdown){awaitconsumeMessages();}}}// 开始消费消息awaitconsumeMessages();}catch(error){console.error('An error occurred:',error);}finally{// 如果发生错误或者接收消息出现问题,可以选择重新启动消费者// 在这里你可以添加相应的逻辑}}startAndConsumeMessages().catch(console.error);
更多的示例可以参考这里。
- NORMAL
- FIFO
- DELAY
- TRANSACTION
- PRODUCER
- SIMPLE_CONSUMER
- PULL_CONSUMER
- PUSH_CONSUMER
PushConsumer并非RocketMQ官方SDK的实现方式,官方采用本地维持一个内存队列组来不断同步拉取消息并分发给消费者,这样的实现是复杂的,只能等待官方SDK的实现。
此处的实现是基于5.x版本独有的SimpleConsumer,通过长轮询的方式来获取消息,SimpleConsumer是RocketMQ在此版本提出用于供消费端开发者做更加自定义的消费者实现的一种方式。
About
基于RocketMQ5.0版本的Grpc 客户端API封装,node.js client by grpc.
Topics
Resources
License
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Packages0
No packages published
Uh oh!
There was an error while loading.Please reload this page.