- Notifications
You must be signed in to change notification settings - Fork1k
The most widely used PHP client for RabbitMQ
License
php-amqplib/php-amqplib
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
This library is apure PHP implementation of theAMQP 0-9-1 protocol.It's been tested againstRabbitMQ.
The library was used for the PHP examples ofRabbitMQ in Action and theofficial RabbitMQ tutorials.
Please note that this project is released with aContributor Code of Conduct. By participating in this project you agree to abide by its terms.
Thanks tovidelalvaro andpostalservice14 for creatingphp-amqplib.
The package is now maintained byRamūnas Dronga,Luke Bakken and several VMware engineers working on RabbitMQ.
Starting with version 2.0 this library usesAMQP 0.9.1 by default and thus requiresRabbitMQ 2.0 or later version.Usually server upgrades do not require any application code changes sincethe protocol changes very infrequently but please conduct your own testing before upgrading.
Since the library usesAMQP 0.9.1 we added support for the following RabbitMQ extensions:
- Exchange to Exchange Bindings
- Basic Nack
- Publisher Confirms
- Consumer Cancel Notify
Extensions that modify existing methods likealternate exchanges are also supported.
enqueue/amqp-lib is aamqp interop compatible wrapper.
AMQProxy is a proxy library with connection and channel pooling/reusing. This allows for lower connection and channel churn when using php-amqplib, leading to less CPU usage of RabbitMQ.
Ensure you havecomposer installed, then run the following command:
composer require php-amqplib/php-amqplib
That will fetch the library and its dependencies inside your vendor folder. Then you can add the following to your.php files in order to use the library
require_once__DIR__.'/vendor/autoload.php';
Then you need touse the relevant classes, for example:
usePhpAmqpLib\Connection\AMQPStreamConnection;usePhpAmqpLib\Message\AMQPMessage;
With RabbitMQ running open two Terminals and on the first one execute the following commands to start the consumer:
cd php-amqplib/demophp amqp_consumer.phpThen on the other Terminal do:
cd php-amqplib/demophp amqp_publisher.php some text to publishYou should see the message arriving to the process on the other Terminal
Then to stop the consumer, send to it thequit message:
php amqp_publisher.php quit
If you need to listen to the sockets used to connect to RabbitMQ then see the example in the non blocking consumer.
php amqp_consumer_non_blocking.php
Please seeCHANGELOG for more information what has changed recently.
http://php-amqplib.github.io/php-amqplib/
To not repeat ourselves, if you want to learn more about this library,please refer to theofficial RabbitMQ tutorials.
amqp_ha_consumer.php: demos the use of mirrored queues.amqp_consumer_exclusive.phpandamqp_publisher_exclusive.php: demos fanout exchanges using exclusive queues.amqp_consumer_fanout_{1,2}.phpandamqp_publisher_fanout.php: demos fanout exchanges with named queues.amqp_consumer_pcntl_heartbeat.php: demos signal-based heartbeat sender usage.basic_get.php: demos obtaining messages from the queues by using thebasic get AMQP call.
If you have a cluster of multiple nodes to which your application can connect,you can start a connection with an array of hosts. To do that you should usethecreate_connection static method.
For example:
$connection = AMQPStreamConnection::create_connection([ ['host' =>HOST1,'port' =>PORT,'user' =>USER,'password' =>PASS,'vhost' =>VHOST], ['host' =>HOST2,'port' =>PORT,'user' =>USER,'password' =>PASS,'vhost' =>VHOST]],$options);
This code will try to connect toHOST1 first, and connect toHOST2 if thefirst connection fails. The method returns a connection object for the firstsuccessful connection. Should all connections fail it will throw the exceptionfrom the last connection attempt.
Seedemo/amqp_connect_multiple_hosts.php for more examples.
Let's say you have a process that generates a bunch of messages that are going to be published to the sameexchange using the samerouting_key and options likemandatory.Then you could make use of thebatch_basic_publish library feature. You can batch messages like this:
$msg =newAMQPMessage($msg_body);$ch->batch_basic_publish($msg,$exchange);$msg2 =newAMQPMessage($msg_body);$ch->batch_basic_publish($msg2,$exchange);
and then send the batch like this:
$ch->publish_batch();
Let's say our program needs to read from a file and then publish one message per line. Depending on the message size, you will have to decide when it's better to send the batch.You could send it every 50 messages, or every hundred. That's up to you.
Another way to speed up your message publishing is by reusing theAMQPMessage message instances. You can create your new message like this:
$properties =array('content_type' =>'text/plain','delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT);$msg =newAMQPMessage($body,$properties);$ch->basic_publish($msg,$exchange);
Now let's say that while you want to change the message body for future messages, you will keep the same properties, that is, your messages will still betext/plain and thedelivery_mode will still beAMQPMessage::DELIVERY_MODE_PERSISTENT. If you create a newAMQPMessage instance for every published message, then those properties would have to be re-encoded in the AMQP binary format. You could avoid all that by just reusing theAMQPMessage and then resetting the message body like this:
$msg->setBody($body2);$ch->basic_publish($msg,$exchange);
AMQP imposes no limit on the size of messages; if a very large message is received by a consumer, PHP's memory limit may be reachedwithin the library before the callback passed tobasic_consume is called.
To avoid this, you can call the methodAMQPChannel::setBodySizeLimit(int $bytes) on your Channel instance. Body sizes exceeding this limit will be truncated,and delivered to your callback with aAMQPMessage::$is_truncated flag set totrue. The propertyAMQPMessage::$body_size will reflect the true body size ofa received message, which will be higher thanstrlen(AMQPMessage::getBody()) if the message has been truncated.
Note that all data above the limit is read from the AMQP Channel and immediately discarded, so there is no way to retrieve it within yourcallback. If you have another consumer which can handle messages with larger payloads, you can usebasic_reject orbasic_nack to tellthe server (which still has a complete copy) to forward it to a Dead Letter Exchange.
By default, no truncation will occur. To disable truncation on a Channel that has had it enabled, pass0 (ornull) toAMQPChannel::setBodySizeLimit().
Some RabbitMQ clients using automated connection recovery mechanisms to reconnectand recover channels and consumers in case of network errors.
Since this client is using a single-thread, you can set up connection recoveryusing exception handling mechanism.
Exceptions which might be thrown in case of connection errors:
PhpAmqpLib\Exception\AMQPConnectionClosedExceptionPhpAmqpLib\Exception\AMQPIOException\RuntimeException\ErrorException
Some other exceptions might be thrown, but connection can still be there. It'salways a good idea to clean up an old connection when handling an exceptionbefore reconnecting.
For example, if you want to set up a recovering connection:
$connection =null;$channel =null;while(true){try {$connection =newAMQPStreamConnection(HOST,PORT,USER,PASS,VHOST);// Your application code goes here.do_something_with_connection($connection); }catch(AMQPRuntimeException$e) {echo$e->getMessage();cleanup_connection($connection);usleep(WAIT_BEFORE_RECONNECT_uS); }catch(\RuntimeException$e) {cleanup_connection($connection);usleep(WAIT_BEFORE_RECONNECT_uS); }catch(\ErrorException$e) {cleanup_connection($connection);usleep(WAIT_BEFORE_RECONNECT_uS); }}
A full example is indemo/connection_recovery_consume.php.
This code will reconnect and retry the application code every time theexception occurs. Some exceptions can still be thrown and should not be handledas a part of reconnection process, because they might be application errors.
This approach makes sense mostly for consumer applications, producers willrequire some additional application code to avoid publishing the same messagemultiple times.
This was a simplest example, in a real-life application you might want tocontrol retr count and maybe gracefully degrade wait time to reconnection.
You can find a more excessive example in#444
If you have installedPCNTL extension dispatching of signal will be handled when consumer is not processing message.
$pcntlHandler =function ($signal) {switch ($signal) {case \SIGTERM:case \SIGUSR1:case \SIGINT:// some stuff before stop consumer e.g. delete lock etcpcntl_signal($signal,SIG_DFL);// restore handlerposix_kill(posix_getpid(),$signal);// kill self with signal, see https://www.cons.org/cracauer/sigint.htmlcase \SIGHUP:// some stuff to restart consumerbreak;default:// do nothing }};pcntl_signal(\SIGTERM,$pcntlHandler);pcntl_signal(\SIGINT,$pcntlHandler);pcntl_signal(\SIGUSR1,$pcntlHandler);pcntl_signal(\SIGHUP,$pcntlHandler);
To disable this feature just define constantAMQP_WITHOUT_SIGNALS astrue
<?phpdefine('AMQP_WITHOUT_SIGNALS',true);... more code
If you have installedPCNTL extension and are using PHP 7.1 or greater,you can register a signal-based heartbeat sender.
<?php$sender =newPCNTLHeartbeatSender($connection);$sender->register();... code$sender->unregister();
If you want to know what's going on at a protocol level then add the following constant to your code:
<?phpdefine('AMQP_DEBUG',true);... more code?>
To run the publishing/consume benchmark type:
make benchmark
Please seeCONTRIBUTING for details.
If you still want to use the old version of the protocol then you can do it by setting the following constant in your configuration code:
define('AMQP_PROTOCOL','0.8');
The default value is'0.9.1'.
If for some reason you don't want to use composer, then you need to have an autoloader in place fo the library classes. People havereported to use thisautoloader with success.
Below is the original README file content. Credits goes to the original authors.
PHP library implementing Advanced Message Queuing Protocol (AMQP).
The library is port of python code of py-amqplibhttp://barryp.org/software/py-amqplib/
It have been tested with RabbitMQ server.
Project home page:http://code.google.com/p/php-amqplib/
For discussion, please join the group:
http://groups.google.com/group/php-amqplib-devel
For bug reports, please use bug tracking system at the project page.
Patches are very welcome!
Author: Vadim Zalivalord@crocodile.org
About
The most widely used PHP client for RabbitMQ
Topics
Resources
License
Code of conduct
Contributing
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Uh oh!
There was an error while loading.Please reload this page.