- Notifications
You must be signed in to change notification settings - Fork4
🎉 v2 out! AMQP module for NestJS with decorator support. Battle-tested for RabbitMQ.
License
EnriqCG/nestjs-amqp
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
AMQP module for NestJS with decorator support.
This project is still a work-in-progress and is beingactively developed. Issues and PRs are welcome!
This module injects a channel fromamqp-connection-manager. Please check theChannel documentation for extra insight on how to publish messages.
Connections are recovered when the connection with the RabbitMQ broker is lost.
$ npm i --save @enriqcg/nestjs-amqp$ npm i --save-dev @types/amqplib
This library was built to solve for the use case of wanting to load balance messages published to a 'topic' across multiple replicas of a same service. The way we make that possible is using aservice definition. We consider a service a collection of replicas that run copies of the same codebase.
Using a service defiition in @enriqcg/nestjs-amqp is totally optional if you don't need to balance messages across replicas.
This library leverages RabbitMQ's exchanges, routing keys and queue bindigs to achieve this goal. Start by defining a service when importingAMQPModule
by providing a name and an exchange name.
@Module({imports:[AMQPModule.forRoot({hostname:'rabbitmq',assertQueuesByDefault:true,assertExchanges:[// we are making sure our exchange is ready// this is optional{name:'example_exchange',type:'topic',},],service:{name:'example_service',exchange:'example_exchange',},}),],})exportclassAppModule{}
The service name is used to register and identify replicas of a same service. You can run multiple services using this library on the same exchange (in fact, that is really powerful as one message can end up in multiple services).
Then we can set up our consumer:
@Consumer()@Controller()exportclassAppController{ @Consume('test.event')asynctestHandler(body:unknown){console.log(this.appService.getHello())returntrue}}
The resulting effect of defining the service and using the @Consume decorator in this setup will be the creation of a queue with nametest.event-example_service
. If other replicas of this same code were to be created, they would join as consumers of the same queue, thus balancing the load oftest.event
messages across multiple instances.
Register the AMQPModule inapp.module.ts
and pass a configuration object:
import{Module}from'@nestjs/common'import{AMQPModule}from'@enriqcg/nestjs-amqp'@Module({imports:[AMQPModule.forRoot({hostname:'rabbitmq',username:'guest',password:'guest',assertQueuesByDefault:true,assertExchanges:[// these exchanges will be asserted on startup{name:'example_exchange',type:'topic',},{name:'fanout_exchange',type:'fanout',},],}),],})exportclassAppModule{}
You can also check documentation on amqplib'sExchange andQueueassertion.
You can now inject an AMQP Channel in your services and use it to push messages into an exchange or a queue.
import{Injectable}from'@nestjs/common'import{InjectAMQPChannel}from'@enriqcg/nestjs-amqp'import{Channel}from'amqplib'@Injectable()exportclassExampleService{constructor( @InjectAMQPChannel()privatereadonlyamqpChannel:Channel,){}asyncsendToExchange(){this.amqpChannel.publish('exchange_name','routing_key',Buffer.from(JSON.stringify({test:true})),)}}
Check amqplib's reference onchannel.publish().
@enriqcg/nestjs-amqp allows you to define consumer functions using decorators in your controllers.
import{Consume}from'@enriqcg/nestjs-amqp'@Consumer('user')// event prefix@Controller()exportclassExampleController{constructor(privatereadonlyexampleService:ExampleService){} @Consume('created')// handler for user.createdhandleCreatedEvent(content:string){console.log(JSON.parse(content))returnfalse// message will not be ackedreturntrue//message will be acked// no return? -> message will be acked}// handler for user.updated.address @Consume({queueName:'updated.address',noAck:false,// queue will be deleted after all consumers are droppedassertQueue:true,autoDelete:true,})handleUpdatedAddressEvent(content:string){constpayload=JSON.parse(content)try{// pass data to your servicesthis.exampleService.update(payload)}catch(e){console.error(e)returnfalse// message will not be acked}// message will be automatically acked}}
The message content isdecoded to a string and provided to decorated methods. Depending on what content you published, further deserialization might be needed. (Building decorators to help decode JSON payloads is on the TODO).
If automatic acknowledgment is disabled for a queue (noAck = true
), to ack a message, the decorated method should returna non-false value. Anything else than afalse value will acknowledge a message (even void).
interfaceAMQPModuleOptions{/** * The host URL for the connection * * Default value: 'localhost' */hostname?:string/** * The port of the AMQP host * * Default value: 5672 */port?:number/** * The name of the connection. Only really relevant in multiple * connection contexts * * Default value: 'default' */name?:string/** * Service definition. Please see README.md to learn about how services * work in @enriqcg/nestjs-amqp * * Default value: {} */service?:{name:stringexchange:string}/** * Makes sure that the exchanges are created and are of the same * type on application startup. * * Default value: [] */assertExchanges?:[{/** * Name of the exchange to bind queues to * * A value is required */name:string/** * Name of the exchange to bind queues to * * A value is only required if the exchange is asserted */type?:'direct'|'topic'|'headers'|'fanout'|'match'},]/** * Assert queues by default using the @Consume decorator * Consumer options defined in @Consume decorator take priority * * Default value: 'default' */assertQueuesByDefault?:boolean/** * Username used for authenticating against the server * * Default value: 'guest' */username?:string/** * Password used for authenticating against the server * * Default value: 'guest' */password?:string/** * The period of the connection heartbeat in seconds * * Default value: 0 */heartbeat?:number/** * What VHost shall be used * * Default value: '/' */vhost?:string/** * Wait for a full connection to the AMQP server before continuing * with the rest of the NestJS app initialization. * * This prevents HTTP requests and other entry-points from reaching * the server until there is a valid AMQP connection. * * Default value: false */wait?:boolean}
Copyright (c) 2021-present, Enrique Carpintero
About
🎉 v2 out! AMQP module for NestJS with decorator support. Battle-tested for RabbitMQ.