- Notifications
You must be signed in to change notification settings - Fork19
A promisified layer over rhea AMQP client
License
amqp/rhea-promise
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
A Promisified layer overrhea AMQP client.
- Node.js version: 6.x or higher.
- We wouldstill encourage you to install the latest available LTS version at any given time fromhttps://nodejs.org.It is a good practice to always install the latest available LTS version of node.js.
- Installing node.js onWindows or macOS is very simple with available installers on thenode.js website. If you are using alinux based OS, then you can find easy to follow, one step installation instructions overhere.
npm install rhea-promise
You can set the following environment variable to get the debug logs.
- Getting debug logs from this library
export DEBUG=rhea-promise*
- Getting debug logs from this and the rhea library
export DEBUG=rhea*
- If you arenot interested in viewing the message transformation (which consumes lot of console/disk space) then you can set the
DEBUG
environment variable as follows:
export DEBUG=rhea*,-rhea:raw,-rhea:message,-rhea-promise:eventhandler,-rhea-promise:translate
- Set the
DEBUG
environment variable as shown above and then run your test script as follows:- Logging statements from you test script go to
out.log
and logging statement from the sdk go todebug.log
.node your-test-script.js> out.log2>debug.log
- Logging statements from your test script and the sdk go to the same file
out.log
by redirecting stderr to stdout (&1), and then redirect stdout to a file:node your-test-script.js>out.log2>&1
- Logging statements from your test script and the sdk go to the same file
out.log
.node your-test-script.js&> out.log
- Logging statements from you test script go to
- In
AMQP
, for two peers to communicate successfully, different entities (Container, Connection, Session, Link) need to be created. There is a relationship between those entities.- 1 Container can have 1..* Connections.
- 1 Connection can have 1..* Sessions.
- 1 Session can have 1..* Links.
- A Link can have the role of Receiver or Sender.
- Each entity (connection, session, link) maintains its own state to let other entities know about what it is doing. Thus,
- if the connection goes down then, everything on the connection - sessions, links are down.
- if a session goes down then, all the the links on that session are down.
- When an entity goes down rhea emits *_error and *_close events, where * can be "sender", "receiver", "session", "connection". If event listeners for the aforementioned events are not added at the appropriate level, then
rhea
propagates those events to its parent entity.If they are not handled at theContainer
level (uber parent), then they are transformed into anerror
event. This would cause yourapplication to crash if there is no listener added for theerror
event. - In
rhea-promise
, the library creates, equivalent objectsConnection, Session, Sender, Receiver
and wraps objects fromrhea
within them.It adds event listeners to all the possible events that can occur at any level and re-emits those events with the same arguments as one wouldexpect from rhea. This makes it easy for consumers ofrhea-promise
to use theEventEmitter pattern. Users can efficiently use differentevent emitter methods like.once()
,.on()
,.prependListeners()
, etc. Sincerhea-promise
add those event listeners onrhea
objects,the errors will never be propagated to the parent entity. This can be good as well as bad depending on what you do.- Good -
*_error
events and*_close
events emitted on an entity will not be propagated to it's parent. Thus ensuring that errors are handled at the right level. - Bad - If you do not add listeners for
*_error
and*_close
events at the right level, then you will never know why an entity shutdown.
- Good -
We believe our design enforces good practices to be followed while using the event emitter pattern.
Please take a look at thesample.env file for examples on how to provide the values for differentparameters like host, username, password, port, senderAddress, receiverAddress, etc.
- Running the example from terminal:
> ts-node ./examples/send.ts
.
NOTE: If you are running the sample with.env
config file, then please run the sample from the directory that contains.env
config file.
import{Connection,Sender,EventContext,Message,ConnectionOptions,Delivery,SenderOptions}from"rhea-promise";import*asdotenvfrom"dotenv";// Optional for loading environment configuration from a .env (config) filedotenv.config();consthost=process.env.AMQP_HOST||"host";constusername=process.env.AMQP_USERNAME||"sharedAccessKeyName";constpassword=process.env.AMQP_PASSWORD||"sharedAccessKeyValue";constport=parseInt(process.env.AMQP_PORT||"5671");constsenderAddress=process.env.SENDER_ADDRESS||"address";asyncfunctionmain():Promise<void>{constconnectionOptions:ConnectionOptions={transport:"tls",host:host,hostname:host,username:username,password:password,port:port,reconnect:false};constconnection:Connection=newConnection(connectionOptions);constsenderName="sender-1";constsenderOptions:SenderOptions={name:senderName,target:{address:senderAddress},onError:(context:EventContext)=>{constsenderError=context.sender&&context.sender.error;if(senderError){console.log(">>>>> [%s] An error occurred for sender '%s': %O.",connection.id,senderName,senderError);}},onSessionError:(context:EventContext)=>{constsessionError=context.session&&context.session.error;if(sessionError){console.log(">>>>> [%s] An error occurred for session of sender '%s': %O.",connection.id,senderName,sessionError);}}};awaitconnection.open();constsender:Sender=awaitconnection.createSender(senderOptions);constmessage:Message={body:"Hello World!!",message_id:"12343434343434"};// Please, note that we are not awaiting on sender.send()// You will notice that `delivery.settled` will be `false`.constdelivery:Delivery=sender.send(message);console.log(">>>>>[%s] Delivery id: %d, settled: %s",connection.id,delivery.id,delivery.settled);awaitsender.close();awaitconnection.close();}main().catch((err)=>console.log(err));
- Running the example from terminal:
> ts-node ./examples/awaitableSend.ts
.
import{Connection,Message,ConnectionOptions,Delivery,AwaitableSenderOptions,AwaitableSender}from"rhea-promise";import*asdotenvfrom"dotenv";// Optional for loading environment configuration from a .env (config) filedotenv.config();consthost=process.env.AMQP_HOST||"host";constusername=process.env.AMQP_USERNAME||"sharedAccessKeyName";constpassword=process.env.AMQP_PASSWORD||"sharedAccessKeyValue";constport=parseInt(process.env.AMQP_PORT||"5671");constsenderAddress=process.env.SENDER_ADDRESS||"address";asyncfunctionmain():Promise<void>{constconnectionOptions:ConnectionOptions={transport:"tls",host:host,hostname:host,username:username,password:password,port:port,reconnect:false};constconnection:Connection=newConnection(connectionOptions);constsenderName="sender-1";constawaitableSenderOptions:AwaitableSenderOptions={name:senderName,target:{address:senderAddress},};awaitconnection.open();// Notice that we are awaiting on the message being sent.constsender:AwaitableSender=awaitconnection.createAwaitableSender(awaitableSenderOptions);for(leti=0;i<10;i++){constmessage:Message={body:`Hello World -${i}`,message_id:i};// Note: Here we are awaiting for the send to complete.// You will notice that `delivery.settled` will be `true`, irrespective of whether the promise resolves or rejects.constdelivery:Delivery=awaitsender.send(message,{timeoutInSeconds:10});console.log("[%s] await sendMessage -> Delivery id: %d, settled: %s",connection.id,delivery.id,delivery.settled);}awaitsender.close();awaitconnection.close();}main().catch((err)=>console.log(err));
- Running the example from terminal:
> ts-node ./examples/receive.ts
.
NOTE: If you are running the sample with.env
config file, then please run the sample from the directory that contains.env
config file.
import{Connection,Receiver,EventContext,ConnectionOptions,ReceiverOptions,delay,ReceiverEvents}from"rhea-promise";import*asdotenvfrom"dotenv";// Optional for loading environment configuration from a .env (config) filedotenv.config();consthost=process.env.AMQP_HOST||"host";constusername=process.env.AMQP_USERNAME||"sharedAccessKeyName";constpassword=process.env.AMQP_PASSWORD||"sharedAccessKeyValue";constport=parseInt(process.env.AMQP_PORT||"5671");constreceiverAddress=process.env.RECEIVER_ADDRESS||"address";asyncfunctionmain():Promise<void>{constconnectionOptions:ConnectionOptions={transport:"tls",host:host,hostname:host,username:username,password:password,port:port,reconnect:false};constconnection:Connection=newConnection(connectionOptions);constreceiverName="receiver-1";constreceiverOptions:ReceiverOptions={name:receiverName,source:{address:receiverAddress},onSessionError:(context:EventContext)=>{constsessionError=context.session&&context.session.error;if(sessionError){console.log(">>>>> [%s] An error occurred for session of receiver '%s': %O.",connection.id,receiverName,sessionError);}}};awaitconnection.open();constreceiver:Receiver=awaitconnection.createReceiver(receiverOptions);receiver.on(ReceiverEvents.message,(context:EventContext)=>{console.log("Received message: %O",context.message);});receiver.on(ReceiverEvents.receiverError,(context:EventContext)=>{constreceiverError=context.receiver&&context.receiver.error;if(receiverError){console.log(">>>>> [%s] An error occurred for receiver '%s': %O.",connection.id,receiverName,receiverError);}});// sleeping for 2 mins to let the receiver receive messages and then closing it.awaitdelay(120000);awaitreceiver.close();awaitconnection.close();}main().catch((err)=>console.log(err));
- Clone the repo
git clone https://github.com/amqp/rhea-promise.git
- Install typescript, ts-node globally
npm i -g typescriptnpm i -g ts-node
- NPM install from the root of the package
npm i
- Build the project
npm run build
Amqp protocol specification can be foundhere.
About
A promisified layer over rhea AMQP client