Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
forked fromamqp/rhea

A reactive messaging library based on the AMQP protocol

License

NotificationsYou must be signed in to change notification settings

kornys/rhea

 
 

Repository files navigation

Node.js CI

rhea

A reactive library for theAMQP protocol, for easydevelopment of both clients and servers.

Hello World!

Brief example of sending and receiving a message through abroker/server listening on port 5672:

varcontainer=require('rhea');container.on('message',function(context){console.log(context.message.body);context.connection.close();});container.once('sendable',function(context){context.sender.send({body:'Hello World!'});});varconnection=container.connect({'port':5672});connection.open_receiver('examples');connection.open_sender('examples');

output:

Hello World!

Dependencies

  • debug (For simple debug logging - may be replaced in the nearterm. To enable set e.g. DEBUG=rhea* or DEBUG=rhea:events for morequalified debugging)

Examples

There are some examples of using the library under the examplesfolder. These include:

  • helloworld.js - essentially the code above, whichsends and receives a single message through a broker

  • direct_helloworld.js - an exampleshowing the sending of a single message without the use of a broker,by listening on a port and then openning a connection to itself overwhich the message is transfered.

  • send_raw.js - explicitly set thedata section of the message body

  • simple_send.js - connects to a specifiedport then sends a number of messages to a given address

  • simple_recv.js - connects to a specifiedport then subscribes to receive a number of messages from a givenaddress

These last two can be used together to demsontrate sending messagesfrom one process to another, using a broker or similar intermediary towhich they both connect.

  • direct_recv.js - listens on a given portfor incoming connections over which it will then receive a number ofmessages

The direct_recv.js example can be used in conjunction with thesimple_send.js example to demonstrate sending messages betweenprocesses without the use of any intermediary. Note however the thedefault port of one or ther other will need to be changed through the-p command line option.

  • client.js andserver.js

    • A request-response example where the 'client' sends messages to a'server' (or service) which converts them to upper case and sendsthem back. This demonstrates the use of temporary addresses amongother things. Using these two together requires a broker or similarintermediary.
  • In durable_subscription, asubscriber and apublisherwhichdemonstrate the notion of a durable subscription when used inconjunction with a broker such as ActiveMQ

  • In selector areceiver that uses aselector - a SQL like query string that restricts the set ofmessages delivered - and an accompanyingsender

  • In sasl asasl client showinghow to authenticate to the service you connect to. This can be usedagainst any broker as well as either of two example servers showinganonymous andplain mechanisms.

  • A tlsclient andserver demonstrating connecting (andpossibly authenticating) over a tls secured socket.

  • Aclient to demonstrate the built inautomatic reconnection functionality along with a simpleechoserver against which it can be run. Itcan of course also be run against a broker instead (or as well!).

  • Bothnode based andwebbased websocket clients along withaserver which will echo back anyrequests received. The clients can also be used against a websocketenabled AMQP broker with a queue or topic called 'examples'. Thenode based scritps require the 'ws' node module to be installed. Thebrowser based example requires a browserified version of the rhealibrary (this can be created e.g. by calling npm run-scriptbrowserify or make browserify). The browserified and minimizedjavascript library is stored under the dist/ directory.

To run the examples you will need the dependencies installed: thelibrary itself depends on the 'debug' module, and some of the examplesdepend on the 'yargs' module for command line option parsing.

The 'rhea' module itself must also be findable by node. You can dothis either by checking out the code from git and setting NODE_PATH toinclude the directory to which you do so (i.e. the directory in which'a directory named 'rhea' can be found, or you can install the moduleusing npm.

Some of the examples assume an AMQP compatible broker, such as thoseoffered by the ActiveMQ or Qpid Apache projects, is running.

API

There are five core types of object in the API:

Each of these inherits all the methods of EventEmitter, allowinghandlers for particular events to be attached. Events that are nothandled at sender or receiver scope are then propagated up to possiblybe handled at session scope. Events that are not handled at sessionscope are then propagated up to possibly be handled at connectionscope, and if not there then in container scope.

Two other relevant objects are:


Container

An AMQP container from which outgoing connections can be made and/orto which incoming connections can be accepted. The module exports adefault instance of a Container which can be used directly. Otherinstances can be created from that if needed using thecreate_container method. A container is identified by theid property. By default a uuid is used, but the propertycan be set to something more specific if desired before making oraccepting any connections.

methods:

connect(options)

Connects to the server specified by the host and port supplied in theoptions and returns aConnection.

The options argument is an object that may contain node librarysocket.connectandtls.connectoptions and any of the following fields:

  • host -socket.connect option, defaults to localhost
  • port -socket.connect option, defaults to 5672
  • transport - undefined, 'tcp' or 'tls', determines ifsocket.connect ortls.connect options are accepted
  • username
  • password
  • sasl_init_hostname
  • reconnect
    • if true (the default), the library will automatically attempt toreconnect if disconnected
    • if false, automatic reconnect will be disabled
    • if it is a numeric value, it is interpreted as the delay betweenreconnect attempts (in milliseconds)When enabled, reconnect can be further controlled via thefollowing options:
    • initial_reconnect_delay (in milliseconds)
    • max_reconnect_delay (in milliseconds)
    • reconnect_limit (maximum number of reconnect attempts)
  • connection_details - a function which if specified will be invokedto get the options to use (e.g. this can be used to alternatebetween a set of different host/port combinations)

As well as Container options common for both client and server:

  • id - connection name
  • container_id - (overrides the container identifier)
  • hostname - to present to remote in the open frame (defaults to host)
  • max_frame_size
  • channel_max
  • idle_time_out
  • outgoing_locales - in open frame
  • incoming_locales - in open frame
  • offered_capabilities - in open frame
  • desired_capabilities - in open frame
  • properties - in open frame
  • sender_options - defaults for open_sender
  • receiver_options - defaults for open_receiver
  • non_fatal_errors - an array of error conditions which if receivedon connection close from peer should not prevent reconnect (bydefault this only includes amqp:connection:forced)
  • all_errors_non_fatal - a boolean which determines if rhea's auto-reconnect should attempt reconnection on all fatal errors

If options is undefined, the client will attempt to obtain defaultoptions from a JSON config file. This file is of similar structure tothat used by Apache Qpid Proton clients. The location of the file canbe specified through the MESSAGING_CONNECT_FILE environment variable.If that is not specified it will look for a file called connect.jsonin the current directory, in /.config/messaging or/etc/messaging/.

The config file offers only limited configurability, specifically:

  • scheme
  • host
  • port
  • user - (note not username)
  • password
  • sasl - (a nested object with field enabled)
  • sasl_mechanisms
  • tls - (a nested object with fields key, cert, ca for paths tocorrespoding files)
  • verify
listen(options)

Starts a server socket listening for incoming connections on the port(and optionally interface) specified in the options.

The options argument is an object that may contain node librarynet.createServerand itsserver.listenortls.createServerand itsserver.listenoptions, most AMQP Container fields listed forconnect and any of thefollowing fields:

The options argument is an object that may contain any of thefollowing fields:

  • transport - undefined, 'tcp' or 'tls', determines ifnet.createServer ortls.createServer options are accepted
  • host
  • port
create_container()

Returns a new container instance. The method takes an options objectwhich can contain the following field:

  • id

If no id is specified a new uuid will be generated.

generate_uuid()

Simple utility for generating a stringified uuid, useful if you wishto specify distinct container ids for different connections.

websocket_connect()

Returns a function that can be used to create another functionsuitable for use as the value of 'connection_details' in a connectcall in order to connect over websockets. The function returned heretakes a websocket url and optional arguments. The websocket_connectmethod itself take the constructor of the WebSocket implementation touse. It has been tested with the implementation in firefox and alsothat in the node module 'ws'.

websocket_accept()

Used to start handling an incoming websocket connection as an AMQPconnection. See thewebsocket echo serverexample for how to use it.


Connection

methods:

open_receiver(address|options)

Establishes a link over which messages can be received and returns aReceiver representing that link. A receivinglink is a subscription, i.e. it expresses a desire to receivemessages.

The argument to this method can either be a simple string indicatingthe source of messages of interest (e.g. a queue name), or an optionsobject that may contain any of the following fields:

  • source - The source from which messages are received. This can bea simple string address/name or a nested object itself containingthe fields:
    • address
    • dynamic
    • expiry_policy
    • durable
  • target - The target of a receiving link is the localidentifier. It is often not needed, but can be set if it is,
  • name - The name of the link. This should be unique for thecontainer. If not specified a unqiue name is generated.
  • credit_window - A 'prefetch' window controlling the flow ofmessages over this receiver. Defaults to 500 if not specified. Avalue of 0 can be used to turn of automatic flow control andmanage it directly.
  • autoaccept - Whether received messages should be automaticallyaccepted. Defaults to true. If set to false, the applicationshould call accept, release or reject on thedelivery field of the context passed to themessage event.
  • autosettle - Whether received messages should be automaticallysettled once the remote settles them. Defaults to true.

And attach frame fields:

  • snd_settle_mode
  • rcv_settle_mode
  • unsettled
  • max_message_size
  • offered_capabilities
  • desired_capabilities
  • properties

Note: If the link doesn't specify a value for the credit_window andautoaccept options, the connection options are consulted followed bythe container options. The default is used only if an option is notspecified at any level.

open_sender(address|options)

Establishes a link over which messages can be sent and returns aSender representing that link. A sending link is ananalogous concept to a subscription for outgoing rather than incomingmessages. I.e. it expresses a desire to send messages.

The argument to this method can either be a simple string indicatingthe target for messages of interest (e.g. a queue name), or an optionsobject that may contain any of the following fields:

  • target - The target to which messages are sent. This can be asimple string address/name or a nested object itself containingthe fields:
    • address
    • dynamic
    • expiry_policy
    • durable
  • source - The source of a sending link is the local identifier. Itis usually not needed, but can be set if it is,
  • name - The name of the link. This should be unique for thecontainer. If not specified a unqiue name is generated.
  • autosettle - Whether sent messages should be automaticallysettled once the peer settles them. Defaults to true.

And attach frame fields as foropen_receiver.

Note: If the link doesn't specify a value for the autosettle option,the connection options are consulted followed by the containeroptions. The default is used only if an option is not specified at anylevel.

send(message)

Sends the specified message over the default sender, which is asending link whose target address is null. The use of this methoddepends on the peer supporting so-called 'anonymous relay' semantics,which most AMQP 1.0 brokers do. The message should have the 'to' fieldset to the intended destination.

close()

Closes a connection (may take an error object which is an objectthat consists of condition and description fields).

is_open()/is_closed()

Provide information about the connection status. If it's opened orclosed.

create_session()

Creates a new session if you want to manage sessions by yourself.

events:

connection_open

Raised when the remote peer indicates the connection is open.This occurs also on reconnect.

connection_close

Raised when the remote peer indicates the connection is closed.This can happen either as a response to our close, or by itself.The connection and sessions will not be reconnected.

connection_error

Raised when the remote peer indicates the connection is closed andspecifies an error. Aconnection_close event will always follow thisevent, so it only needs to be implemented if there are specificactions to be taken on a close with an error as opposed to a close.The error is available as a property on the event context.

If neither the connection_error or the connection_close is handled bythe application, anerror event will be raised. This can be handledon the connection or the container. If this is also unhandled, theapplication process will exit.

protocol_error

Raised when a protocol error is received on the underlying socket.Adisconnected event will follow with any reconnect as configured.

error

Raised when an error is received on the underlying socket. Thiscatches any errors otherwise not handled.

disconnected

Raised when the underlying tcp connection is lost or nonfatal errorwas received. The context has areconnecting property which is trueif the library is attempting to automatically reconnect and false ifit has reached the reconnect limit. If reconnect has not been enabledor if the connection is a tcp server, then thereconnecting propertyis undefined. The context may also have anerror property givingsome information about the reason for the disconnect. If thedisconnect event is not handled, a warning will be logged to theconsole.

You should update the application state to resend any unsettledmessages again once the connection is recovered.

settled

Raised when remote settled the message.


Session

Session is an aggregation ofReceiver andSender links and provides the context andsequencing of messages for all the links it contains. AConnection creates a default session for you ifyou create receivers and senders on the Connection. You only need touse this object if you want to group your links into more than onesession.

methods:

open_receiver(address|options)

This adds a receiver on the session. Theopen_receiver on theConnection object finds the session and callsthis.

open_sender(address|options)

This adds a sender on the session. Theopen_sender on theConnection object finds the session and callsthis.

close()

End a session (may take an error object which is an object thatconsists of condition and description fields).

is_open()/is_closed()

Provide information about the session status. If it's opened orclosed.

events:

session_open

Raised when the remote peer indicates the session is open (i.e. begunin AMQP parlance).

session_close

Raised when the remote peer indicates the session is closed (i.e.ended in AMQP parlance). The session will be removed from theconnection after the event.

session_error

Raised when the remote peer indicates the session has ended andspecifies an error. Asession_close event will always follow thisevent, so it only needs to be implemented if there are specificactions to be taken on a close with an error as opposed to a close.The error is available aserror property on the session object.

If neither the session_error or the session_close is handled by theapplication, anerror event will be raised on the container. If thisis also unhandled, the application process will exit.

settled

Raised when remote settled the message.


Receiver

methods:

close()

Closes a receiving link (i.e. cancels the subscription). (May take anerror object which is an object that consists of condition anddescription fields).

detach()

Detaches a link without closing it. For durable subscriptions thismeans the subscription is inactive, but not cancelled.

add_credit(n)

By default, receivers have a prefetch window that is movedautomatically by the library. However if desired the application canset the prefecth to zero and manage credit itself. Each invocation ofadd_credit() method issues credit for a further 'n' messages to besent by the peer over this receiving link. [Note: flow()is an aliasfor add_credit()]

credit()

Returns the amount of outstanding credit that has been issued.

events:

message event

Raised when a message is received. The context passed will have amessage, containing the received content, and adelivery which can be used to acknowledge receiptof the message if autoaccept has been disabled.

receiver_open

Raised when the remote peer indicates the link is open (i.e. attachedin AMQP parlance).

receiver_drained

Raised when the remote peer indicates that it has drained all credit(and therefore there are no more messages at present that it can send).

receiver_flow

Raised when a flow is received for receiver.

receiver_error

Raised when the remote peer closes the receiver with an error. Areceiver_close event will always follow this event, so it only needsto be implemented if there are specific actions to be taken on a closewith an error as opposed to a close. The error is available as anerror property on the receiver.

receiver_close

Raised when the remote peer indicates the link is closed (i.e.detached in AMQP parlance).

settled

Raised when remote settled the message.


Sender

methods:

send(msg)

Sends amessage. The link need not be yet opennor is any credit needed, but there is a limit of 2048 deliveries intheSession queue before it raises an exceptionfor buffer overflow.

Unsettled messages, whether transmitted or not, are lost on reconnectand there will be noaccepted,released,rejected events. Youmay need to resend the messages on adisconnected event.

If the messages to be sent can be generated or fetched on demand orthere is large number of messages, it is recommendedsend is calledonly while the sender issendable(). When sender is no longersendable, continue sending in thesendable event.

close()

Closes a sending link (may take an error object which is an objectthat consists of condition and description fields).

detach()

Detaches a link without closing it.

sendable()

Returns true if the sender has available credits for sending amessage. Otherwise it returns false.

set_drained(bool)

This must be called in response tosender_draining event to tellpeer we have drained our messages or credit.

events:

sendable

Raised when the sender has received credit to be able to transmitmessages to its peer. You will not receive a new event until the peersends more credit, even if you have some credit left.

accepted

Raised when a sent message is accepted by the peer.

released

Raised when a sent message is released by the peer.

rejected

Raised when a sent message is rejected by the peer.context.delivery.remote_state.error may carry diagnostics to explainrejection, for example acondition property with valueamqp:unauthorized-access.

modified

Raised when a sent message is modified by the peer. Thecontext.delivery.remote_state may havedelivery_failed andundeliverable_here boolean andmessage_annotations map propertiesto guide any message retransmission as specified in the AMQP 1.0specification.

sender_open

Raised when the remote peer indicates the link is open (i.e. attachedin AMQP parlance).

sender_draining

Raised when the remote peer requests that the sender drain its credit;sending all available messages within the credit limit and callingset_drained(true). After this the sender has no credit left.

sender_flow

Raised when a flow is received for sender.sender_draining andsendable events may follow this event, so it only needs to beimplemented if there are specific actions to be taken.

sender_error

Raised when the remote peer closes the sender with an error. Asender_close event will always follow this event, so it only needsto be implemented if there are specific actions to be taken on a closewith an error as opposed to a close. The error is available as anerror property on the sender.

sender_close

Raised when the remote peer indicates the link is closed (i.e.detached in AMQP parlance).

settled

Raised when remote settled the message.

Message

A message is an object that may contain the following fields:

  • durable
  • priority
  • ttl
  • first_acquirer
  • delivery_count
  • delivery_annotations, an object/map of non-standard deliveryannotations sent to link recipient peer that should be negotiatedat link attach
  • message_annotations, an object/map of non-standard deliveryannotations propagated across all steps that should be negotiatedat link attach
  • message_id
  • user_id
  • to
  • subject
  • reply_to
  • correlation_id
  • content_type
  • content_encoding
  • absolute_expiry_time
  • creation_time
  • group_id
  • group_sequence
  • reply_to_group_id
  • application_properties, an object/map which can take arbitrary,application defined named simple values
  • body, which can be of any AMQP type type ordata_section,data_sections,sequence_section orsequence_sections fromrhea.message.
  • footer, an objec`t/map for HMACs or signatures or similar

Messages are passed to the send() method of Connection or Sender, andare made available asmessage on the event context for themessageevent on a Receiver or its parent(s).

Delivery

The delivery object provides information on- and enables control over-the state of a message transfer.

The methods on a delivery object are:

  • accept, which will positively acknowledge the receipt of themessage
  • release, which will inform the sender that the message can beredelivered (to this or to any other receiver). The release can becontrolled through an object passed in with one or more fo thefollowing fields:
    • delivery_failed, if true the sender should increment thedelivery_count on the next redelivery attempt, if false itshould not
    • undeliverable_here, if true the sender should not try toredeliver the same message to this receiver
  • reject, which will inform the sender that the message is invalidin some way.
  • modified, which sets the modified outcome as defined in the AMQP1.0 specification.

If autoaccept is disabled on a receiver, the application should ensurethat it accepts (or releases or rejects) all messages received.


Note: For detailed options and types, please refer to the typedefinitions in thetypings directory.

About

A reactive messaging library based on the AMQP protocol

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • JavaScript52.1%
  • TypeScript47.6%
  • Makefile0.3%

[8]ページ先頭

©2009-2025 Movatter.jp