Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

基于RocketMQ5.0版本的Grpc 客户端API封装,node.js client by grpc.

License

NotificationsYou must be signed in to change notification settings

JanYork/rocketmq-client-node

Repository files navigation

这是一个自封装的 Apache RocketMQ Node.js 客户端,基于 RocketMQ 5.x版本的gRPC协议实现。

  • 不依赖egg.js、rocketmq-nodejs-sdk等第三方库,它是纯粹的Node.js客户端。
  • 实现了分布式下顺序保障机制。
  • 自定义实现了类似的PushConsumer,通过长轮询的方式来获取消息。
  • 统一的日志记录器注入,方便调试。

注意

对于Macos or Linux,build操作失败,可以尝试使用sudo npm run build

对于Windows,build操作失败,可以尝试使用“以管理员身份运行”命令行。

概述

在开始客户端的部分之前,所需的一些前期工作(或者参照这里):

  1. 准备Node.js 环境。Node.js 16.19.0 是确保客户端运行的最小版本,Node.js >= 18.17.0 是推荐版本;
  2. 部署 namesrv,broker 以及proxy 组件。

快速开始

我们使用 npm 作为依赖管理和发布的工具。你可以在 npm 的官方网站了解到关于它的更多信息。这里是一些在开发阶段你会使用到的 npm 命令:

# 自动安装工程相关的依赖npm install# 初始化 grpc 代码npm run build# 安装rocketmq nodejs 客户端npm i rocketmq-grpc

开启 grpc-js 的调试日志:

GRPC_TRACE=compression GRPC_VERBOSITY=debug GRPC_TRACE=all npm run xxx or node xxx

示例

普通消息

发送消息

import{Producer}from"rocketmq-grpc";constsimpleProducer=newProducer({endpoints:'localhost:8081'});console.log('checkout:simpleProducer init success!');(async()=>{// 启动生产者simpleProducer.startup().then(()=>{console.log('checkout:simpleProducer startup success!');// 发送消息simpleProducer.send({topic:'checkout-topic',tag:'checkout',keys:['checkout-key'],body:Buffer.from('Hello, Checkout OK!')}).then(()=>{console.log('checkout:send message success!');process.exit(0);});});})();

消费消息

import{SimpleConsumer}from"rocketmq-grpc";constconsumer=newSimpleConsumer({consumerGroup:'checkout-group',endpoints:'192.168.1.162:8081',subscriptions:newMap().set('checkout-topic','*'),requestTimeout:3000,awaitDuration:30000// long polling});console.log('checkout:consumer init success!');constisShutdown=false;asyncfunctionstartAndConsumeMessages(){try{// 启动消费者awaitconsumer.startup();console.log('checkout:consumer startup success!');asyncfunctionconsumeMessages(){try{constmessages=awaitconsumer.receive(20);if(messages.length>0){console.log('got %d messages',messages.length);for(constmessageofmessages){console.log('body=%o',message.body.toString());awaitconsumer.ack(message);console.log('checkout:ack message success!');}}else{console.log('No messages received, waiting...');}}catch(error){console.error('An error occurred:',error);}finally{// // 等待一段时间后递归调用consumeMessages// await new Promise(resolve => setTimeout(resolve, 5000));console.log('checkout:waiting for messages...');if(!isShutdown){awaitconsumeMessages();}}}// 开始消费消息awaitconsumeMessages();}catch(error){console.error('An error occurred:',error);}finally{// 如果发生错误或者接收消息出现问题,可以选择重新启动消费者// 在这里你可以添加相应的逻辑}}startAndConsumeMessages().catch(console.error);

更多的示例可以参考这里

消息类型

  • NORMAL
  • FIFO
  • DELAY
  • TRANSACTION

消费类型

  • PRODUCER
  • SIMPLE_CONSUMER
  • PULL_CONSUMER
  • PUSH_CONSUMER

PushConsumer并非RocketMQ官方SDK的实现方式,官方采用本地维持一个内存队列组来不断同步拉取消息并分发给消费者,这样的实现是复杂的,只能等待官方SDK的实现。

此处的实现是基于5.x版本独有的SimpleConsumer,通过长轮询的方式来获取消息,SimpleConsumer是RocketMQ在此版本提出用于供消费端开发者做更加自定义的消费者实现的一种方式。

About

基于RocketMQ5.0版本的Grpc 客户端API封装,node.js client by grpc.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

[8]ページ先頭

©2009-2025 Movatter.jp