Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

🎉 v2 out! AMQP module for NestJS with decorator support. Battle-tested for RabbitMQ.

License

NotificationsYou must be signed in to change notification settings

EnriqCG/nestjs-amqp

Repository files navigation

Project Status: ActiveLicense: MITNPM PullsGitHub release (latest by date)Milestone Progress

AMQP module for NestJS with decorator support.

Note

This project is still a work-in-progress and is beingactively developed. Issues and PRs are welcome!


This module injects a channel fromamqp-connection-manager. Please check theChannel documentation for extra insight on how to publish messages.

Connections are recovered when the connection with the RabbitMQ broker is lost.

Installation

$ npm i --save @enriqcg/nestjs-amqp$ npm i --save-dev @types/amqplib

The concept of a Service in @enriqcg/nestjs-amqp

This library was built to solve for the use case of wanting to load balance messages published to a 'topic' across multiple replicas of a same service. The way we make that possible is using aservice definition. We consider a service a collection of replicas that run copies of the same codebase.

Using a service defiition in @enriqcg/nestjs-amqp is totally optional if you don't need to balance messages across replicas.

This library leverages RabbitMQ's exchanges, routing keys and queue bindigs to achieve this goal. Start by defining a service when importingAMQPModule by providing a name and an exchange name.

@Module({imports:[AMQPModule.forRoot({hostname:'rabbitmq',assertQueuesByDefault:true,assertExchanges:[// we are making sure our exchange is ready// this is optional{name:'example_exchange',type:'topic',},],service:{name:'example_service',exchange:'example_exchange',},}),],})exportclassAppModule{}

The service name is used to register and identify replicas of a same service. You can run multiple services using this library on the same exchange (in fact, that is really powerful as one message can end up in multiple services).

Then we can set up our consumer:

@Consumer()@Controller()exportclassAppController{  @Consume('test.event')asynctestHandler(body:unknown){console.log(this.appService.getHello())returntrue}}

The resulting effect of defining the service and using the @Consume decorator in this setup will be the creation of a queue with nametest.event-example_service. If other replicas of this same code were to be created, they would join as consumers of the same queue, thus balancing the load oftest.event messages across multiple instances.

Getting Started

Register the AMQPModule inapp.module.ts and pass a configuration object:

import{Module}from'@nestjs/common'import{AMQPModule}from'@enriqcg/nestjs-amqp'@Module({imports:[AMQPModule.forRoot({hostname:'rabbitmq',username:'guest',password:'guest',assertQueuesByDefault:true,assertExchanges:[// these exchanges will be asserted on startup{name:'example_exchange',type:'topic',},{name:'fanout_exchange',type:'fanout',},],}),],})exportclassAppModule{}

You can also check documentation on amqplib'sExchange andQueueassertion.

Publisher

You can now inject an AMQP Channel in your services and use it to push messages into an exchange or a queue.

import{Injectable}from'@nestjs/common'import{InjectAMQPChannel}from'@enriqcg/nestjs-amqp'import{Channel}from'amqplib'@Injectable()exportclassExampleService{constructor(    @InjectAMQPChannel()privatereadonlyamqpChannel:Channel,){}asyncsendToExchange(){this.amqpChannel.publish('exchange_name','routing_key',Buffer.from(JSON.stringify({test:true})),)}}

Check amqplib's reference onchannel.publish().

Consumer

@enriqcg/nestjs-amqp allows you to define consumer functions using decorators in your controllers.

import{Consume}from'@enriqcg/nestjs-amqp'@Consumer('user')// event prefix@Controller()exportclassExampleController{constructor(privatereadonlyexampleService:ExampleService){}  @Consume('created')// handler for user.createdhandleCreatedEvent(content:string){console.log(JSON.parse(content))returnfalse// message will not be ackedreturntrue//message will be acked// no return? -> message will be acked}// handler for user.updated.address  @Consume({queueName:'updated.address',noAck:false,// queue will be deleted after all consumers are droppedassertQueue:true,autoDelete:true,})handleUpdatedAddressEvent(content:string){constpayload=JSON.parse(content)try{// pass data to your servicesthis.exampleService.update(payload)}catch(e){console.error(e)returnfalse// message will not be acked}// message will be automatically acked}}

The message content isdecoded to a string and provided to decorated methods. Depending on what content you published, further deserialization might be needed. (Building decorators to help decode JSON payloads is on the TODO).

Message Acknowledgment

If automatic acknowledgment is disabled for a queue (noAck = true), to ack a message, the decorated method should returna non-false value. Anything else than afalse value will acknowledge a message (even void).

Connection options

interfaceAMQPModuleOptions{/**   * The host URL for the connection   *   * Default value: 'localhost'   */hostname?:string/**   * The port of the AMQP host   *   * Default value: 5672   */port?:number/**   * The name of the connection. Only really relevant in multiple   * connection contexts   *   * Default value: 'default'   */name?:string/**   * Service definition. Please see README.md to learn about how services   * work in @enriqcg/nestjs-amqp   *   * Default value: {}   */service?:{name:stringexchange:string}/**   * Makes sure that the exchanges are created and are of the same   * type on application startup.   *   * Default value: []   */assertExchanges?:[{/**       * Name of the exchange to bind queues to       *       * A value is required       */name:string/**       * Name of the exchange to bind queues to       *       * A value is only required if the exchange is asserted       */type?:'direct'|'topic'|'headers'|'fanout'|'match'},]/**   * Assert queues by default using the @Consume decorator   * Consumer options defined in @Consume decorator take priority   *   * Default value: 'default'   */assertQueuesByDefault?:boolean/**   * Username used for authenticating against the server   *   * Default value: 'guest'   */username?:string/**   * Password used for authenticating against the server   *   * Default value: 'guest'   */password?:string/**   * The period of the connection heartbeat in seconds   *   * Default value: 0   */heartbeat?:number/**   * What VHost shall be used   *   * Default value: '/'   */vhost?:string/**   * Wait for a full connection to the AMQP server before continuing   * with the rest of the NestJS app initialization.   *   * This prevents HTTP requests and other entry-points from reaching   * the server until there is a valid AMQP connection.   *   * Default value: false   */wait?:boolean}

License

MIT License

Copyright (c) 2021-present, Enrique Carpintero


[8]ページ先頭

©2009-2025 Movatter.jp