Performant pure-PHP AMQP (RabbitMQ) sync/async (ReactPHP) library

Related tags

Queue bunny
Overview

BunnyPHP

Build Status Downloads this Month Latest stable

Performant pure-PHP AMQP (RabbitMQ) sync/async (ReactPHP) library

Requirements

BunnyPHP requires PHP 7.1 and newer.

Installation

Add as Composer dependency:

$ composer require bunny/bunny:@dev

Comparison

You might ask if there isn't a library/extension to connect to AMQP broker (e.g. RabbitMQ) already. Yes, there are multiple options:

Why should you want to choose BunnyPHP instead?

  • You want nice idiomatic PHP API to work with (I'm looking at you, php-amqplib). BunnyPHP interface follows PHP's common coding standards and naming conventions. See tutorial.

  • You can't (don't want to) install PECL extension that has latest stable version in 2014. BunnyPHP isn't as such marked as stable yet. But it is already being used in production.

  • You have both classic CLI/FPM and ReactPHP applications and need to connect to RabbitMQ. BunnyPHP comes with both synchronous and asynchronous clients with same PHP-idiomatic interface. Async client uses react/promise.

Apart from that BunnyPHP is more performant than main competing library, php-amqplib. See benchmark/ directory and php-amqplib's benchmark/.

Benchmarks were run as:

$ php benchmark/producer.php N & php benchmark/consumer.php
Library N (# messages) Produce sec Produce msg/sec Consume sec Consume msg/sec
php-amqplib 100 0.0131 7633 0.0446 2242
bunnyphp 100 0.0128 7812 0.0488 2049
bunnyphp +/- +2.3% -8.6%
php-amqplib 1000 0.1218 8210 0.4801 2082
bunnyphp 1000 0.1042 9596 0.2919 3425
bunnyphp +/- +17% +64%
php-amqplib 10000 1.1075 9029 5.1824 1929
bunnyphp 10000 0.9078 11015 2.9058 3441
bunnyphp +/- +22% +78%
php-amqplib 100000 20.7005 4830 69.0360 1448
bunnyphp 100000 9.7891 10215 35.7305 2789
bunnyphp +/- +111% +92%

Tutorial

Connecting

When instantiating the BunnyPHP Client accepts an array with connection options:

$connection = [
    'host'      => 'HOSTNAME',
    'vhost'     => 'VHOST',    // The default vhost is /
    'user'      => 'USERNAME', // The default user is guest
    'password'  => 'PASSWORD', // The default password is guest
];

$bunny = new Client($connection);
$bunny->connect();

Connecting with SSL/TLS

Options for SSL-connections should be specified as array ssl:

$connection = [
    'host'      => 'HOSTNAME',
    'vhost'     => 'VHOST',    // The default vhost is /
    'user'      => 'USERNAME', // The default user is guest
    'password'  => 'PASSWORD', // The default password is guest
    'ssl'       => [
        'cafile'      => 'ca.pem',
        'local_cert'  => 'client.cert',
        'local_pk'    => 'client.key',
    ],
];

$bunny = new Client($connection);
$bunny->connect();

For options description - please see SSL context options.

Note: invalid SSL configuration will cause connection failure.

See also common configuration variants.

Publish a message

Now that we have a connection with the server we need to create a channel and declare a queue to communicate over before we can publish a message, or subscribe to a queue for that matter.

$channel = $bunny->channel();
$channel->queueDeclare('queue_name'); // Queue name

With a communication channel set up, we can now publish a message to the queue:

$channel->publish(
    $message,    // The message you're publishing as a string
    [],          // Any headers you want to add to the message
    '',          // Exchange name
    'queue_name' // Routing key, in this example the queue's name
);

Subscribing to a queue

Subscribing to a queue can be done in two ways. The first way will run indefinitely:

$channel->run(
    function (Message $message, Channel $channel, Client $bunny) {
        $success = handleMessage($message); // Handle your message here

        if ($success) {
            $channel->ack($message); // Acknowledge message
            return;
        }

        $channel->nack($message); // Mark message fail, message will be redelivered
    },
    'queue_name'
);

The other way lets you run the client for a specific amount of time consuming the queue before it stops:

$channel->consume(
    function (Message $message, Channel $channel, Client $client){
        $channel->ack($message); // Acknowledge message
    },
    'queue_name'
);
$bunny->run(12); // Client runs for 12 seconds and then stops

Pop a single message from a queue

$message = $channel->get('queue_name');

// Handle message

$channel->ack($message); // Acknowledge message

Prefetch count

A way to control how many messages are prefetched by BunnyPHP when consuming a queue is by using the channel's QOS method. In the example below only 5 messages will be prefetched. Combined with acknowledging messages this turns into an effective flow control for your applications, especially asynchronous applications. No new messages will be fetched unless one has been acknowledged.

$channel->qos(
    0, // Prefetch size
    5  // Prefetch count
);

Asynchronous usage

Bunny supports both synchronous and asynchronous usage utilizing ReactPHP. The following example shows setting up a client and consuming a queue indefinitely.

(new Async\Client($eventLoop, $options))->connect()->then(function (Async\Client $client) {
   return $client->channel();
})->then(function (Channel $channel) {
   return $channel->qos(0, 5)->then(function () use ($channel) {
       return $channel;
   });
})->then(function (Channel $channel) use ($event) {
   $channel->consume(
       function (Message $message, Channel $channel, Async\Client $client) use ($event) {
           // Handle message

           $channel->ack($message);
       },
       'queue_name'
   );
});

AMQP interop

There is amqp interop compatible wrapper(s) for the bunny library.

Testing

You need access to a RabbitMQ instance to run the test suite. You can either connect to an existing instance or use the provided Docker Compose setup to create an isolated environment, including a RabbitMQ container, to run the test suite in.

Local RabbitMQ

  • Change TEST_RABBITMQ_CONNECTION_URI in phpunit.xml to fit your environment. Then run:

    $ vendor/bin/phpunit
    

Docker Compose

  • Use Docker Compose to create a network with a RabbitMQ container and a PHP container to run the tests in. The project directory will be mounted into the PHP container.

    $ docker-compose up -d
    

    To test against different SSL configurations (as in CI builds), you can set environment variable CONFIG_NAME=rabbitmq.ssl.verify_none before running docker-compose up.

  • Optionally use docker ps to display the running containers.

    $ docker ps --filter name=bunny
    [...] bunny_rabbit_node_1_1
    [...] bunny_bunny_1
    
  • Enter the PHP container.

    $ docker exec -it bunny_bunny_1 bash
    
  • Within the container, run:

    $ vendor/bin/phpunit
    

Contributing

  • Large part of the PHP code (almost everything in Bunny\Protocol namespace) is generated from spec in file spec/amqp-rabbitmq-0.9.1.json. Look for DO NOT EDIT! in doc comments.

    To change generated files change spec/generate.php and run:

    $ php ./spec/generate.php

Broker compatibility

Works well with RabbitMQ

Does not work with ActiveMQ because it requires AMQP 1.0 which is a completely different protocol (Bunny is implementing AMQP 0.9.1)

License

BunnyPHP is licensed under MIT license. See LICENSE file.

Comments
  • Heartbeat

    Heartbeat

    According to https://www.rabbitmq.com/heartbeats.html

    Heartbeat frames are sent about every timeout / 2 seconds. After two missed heartbeats, the peer is considered to be unreachable

    So https://github.com/jakubkulhan/bunny/blob/master/src/Bunny/Async/Client.php#L220 is wrong

    opened by sm2017 16
  • Throw uncaught  Exception

    Throw uncaught Exception

    I ask this question https://github.com/reactphp/promise/issues/63 In last reply

    The code is probably wrapped into another promise which catches the exception. You must ensure that you call done() on any promise which you don't return for comsumption, eg. from a function. See done() vs. then().

    I what to know where is the wrapper promise?

    $request = my request object
    $this->_channel->queueDeclare($request->getQueueName(), false, $request::PERSISTENT)->then(function () use ($request) {
                    $this->publish($request);
                }, function (ClientException $e) {
                    throw new \Exception();
    
                })->done();
    

    Why react/promise/src/FulfilledPromise.php:26 catch my exception?

    opened by sm2017 12
  • Typehint to DateTimeInterface to allow DateTimeImmutable values

    Typehint to DateTimeInterface to allow DateTimeImmutable values

    This is for the PHP->AMQP type conversion only. Most of our code uses DateTimeImmutable types ( as our dates are not mutable :)). This safes us from having to cast them. From what I understood this part is not auto-generated. This should also be no BC break.

    opened by fritz-gerneth 10
  • Unhandled method frame Bunny\Protocol\MethodConnectionCloseFrame

    Unhandled method frame Bunny\Protocol\MethodConnectionCloseFrame

    I faced with an Exception sometimes

    'Bunny\Exception\ClientException' with message 'Unhandled method frame Bunny\Protocol\MethodConnectionCloseFrame.' in ~/.composer/vendor/bunny/bunny/src/Bunny/AbstractClient.php:410

    What does it mean and how can I avoid it?

    opened by pprishchepa 10
  • Mixture of properties and headers

    Mixture of properties and headers

    Note: I have omitted some output fields for the sake of readability.

    It seems that when Bunny consumes messages with custom headers that were not published by Bunny the properties and headers fields are flattened.

    Given the screenshot: screen shot 2016-05-26 at 3 41 37 pm Bunny's output is:

    Bunny\Message Object
    (
        [routingKey] => props
        [headers] => Array
            (
                [user_id] => 96ce7157f70de72e08b0a8c78300bf1e
                [delivery-mode] => 1
                [user-id] => admin
            )
    
        [content] => Wat?
    )
    

    But if Bunny publishes the same message like so:

    $ch->publish('Hello World!', [
        'user-id' => 'admin',
        'headers' => [
          'user_id' => '96ce7157f70de72e08b0a8c78300bf1e'
        ]
      ], '', 'props');
    

    The output is as expected:

    Bunny\Message Object
    (
        [routingKey] => props
        [headers] => Array
            (
                [headers] => Array
                    (
                        [user_id] => 96ce7157f70de72e08b0a8c78300bf1e
                    )
    
                [user-id] => admin
            )
        [content] => Hello World!
    )
    

    Here is the output from a Node AMQP lib which looks the same when consuming messages published by Bunny, node.amqp, and RabbitMQ web admin:

     { fields: 
       { routingKey: 'props',
         messageCount: 0 },
      properties: 
       { headers: { user_id: '96ce7157f70de72e08b0a8c78300bf1e' },
         userId: 'admin' },
      content: <Buffer 57 61 74 3f> }
    

    And the output from RabbitMQ web admin screen shot 2016-05-26 at 3 48 16 pm

    opened by cboden 9
  • Added catch to __destruct to prevent fatal error

    Added catch to __destruct to prevent fatal error

    Hello

    please check the changes I made. I know it's far from being optimal as that will catch any logical error, not just client exception etc but the fatal error in the __destruct was quite hard to find - the application was returning all data in the request yet the HTTP status was 500.

    I recently migrated to https://github.com/cloudamqp/amqproxy since php cannot keep the connection opened. After the migration around 30% of requests started to fail with https://github.com/jakubkulhan/bunny/blob/master/src/Bunny/AbstractClient.php#L311

    Apart from my solution what would you suggest is the best way to treat exceptions in destructor?

    opened by jakubvojacek 7
  • Add .gitattributes file to prevent test's and spec's downloading, add correct getting channelId

    Add .gitattributes file to prevent test's and spec's downloading, add correct getting channelId

    Hi there. I just add .gitattributes file to prevent test's, spec's and tutorials downloading when composer install. Because https://www.reddit.com/r/PHP/comments/2jzp6k/i_dont_need_your_tests_in_my_production .

    Also, i add correct getting channelId and able to reuse already opened channels. I change \Bunny\AbstractClient::channel method for backward capability.

    opened by kuai6 7
  • Support PHP 8

    Support PHP 8

    Fixes the issues raised in https://github.com/jakubkulhan/bunny/pull/101.

    There's one issue about dropping PHP 7.0 support:

    • Add void return type to PHPUnit setUp methods: this makes the syntax incompatible with PHP 7.0
    enhancement 
    opened by edudobay 6
  • feat: add support for SSL

    feat: add support for SSL

    Added usage of stream_context_set_option for setting SSL options. Added tests with certificate generation and ssl-enabled configuration for RabbitMQ. Added some configuration examples for common use cases see examples/ssl directory. Mentioned SSL support in main README.md.

    Some notes:

    • RabbitMQ added sysctl format since 3.7.0, so there are both configuration file versions prior 3.7.0 and sysctl.
    • Tests covers only connections, any other tests should works same way as via plain tcp.
    • There is no way to detect any errors in SSL options, so in case of error we will get exception with Could not connect to... message.

    Resolves #77

    opened by rtm-ctrlz 5
  • TCP_NODELAY landed in php 7.1

    TCP_NODELAY landed in php 7.1

    Being curious i've written a benchmark comparing php-amqplib vs amqp extension vs bunny.

    The winner was amqp extension with socket connection being a bit behind, then bunny, then stream connection. Apparently such amazing results of socket connection in php-amqplib were due to using TCP_NODELAY. Digging further into stream connection looks like the same has landed for stream connections in 7.1.

    Would be nice if you could add TCP_NODELAY to greatly improve performance of 1st message send/consume

    opened by mente 5
  • `$channel->queueDeclare($queueName);` never resolves

    `$channel->queueDeclare($queueName);` never resolves

    This may be a bug or simply misuse by me (or misunderstanding).

    I've taken the example code in consumer_async.php and built following script:

    
    $clientConfig  = [
        \React\EventLoop\Factory::create(),
        ['host' => 'localhost', 'user' => 'guest', 'password' => 'guest'],
    ];
    
    (new BunnyClient(...$clientConfig))
        ->connect() // works
        ->then(function (BunnyClient $client) {
            return $client->channel(); // works
        })
        ->then(function (Channel $channel) {
            return \React\Promise\all([
                $channel,
                $channel->queueDeclare('queuename') // hangs
            ]);
        })
        ->then(function (array $channels) use ($app) {
            // never reached
            $channels[0]->consume(function (Message $msg) use ($app) {
                echo 'Message received!' . PHP_EOL;
            }, 'queuename');
        });
    

    The call $channel->queueDeclare('queuename') hangs, and the returned promise is never fulfilled nor rejected.

    A few notes on (in my opinion) expected behavior:

    • removing the queueDeclare code makes the code work, but only if the channel was already declared by another process
    • if the channel doesn't exist, the app silently hangs without receiving any messages at all
    opened by Ocramius 5
  • The best way for non-blocking publishing

    The best way for non-blocking publishing

    Can you suggest the best way to forward messages from stdin to rabbitmq. Unfortunately, in the example below, stdin is blocked for a significant amount of time.

    <?php
    declare(strict_types=1);
    
    use Bunny\Client;
    
    require_once __DIR__ . '/vendor/autoload.php';
    
    $channel = (new Client())->connect()->channel();
    
    while (($record = fgetcsv(STDIN, 4096, ',')) !== false) {
        $channel->publish(json_encode($record), [], '', $config['queue']['name']);
    }
    

    Logically, I need to use an asynchronous client but did not find a similar example.

    opened by gegorov2030 1
  • Fix drop connection by RabbitMQ

    Fix drop connection by RabbitMQ

    opened by viras777 4
  • fread slow when consuming a message

    fread slow when consuming a message

    Good morning,

    For some reason the line $s = @fread($this->stream, $this->frameMax); in AbstractClient::read() just waits for data for some messages being consumed. Bunny is used in a project which communicates with a DNS Management Service by RabbitMQ using the RPC pattern. For some request (retrieving a DNS zone) the consumer from the reply queue waits for a long (10s for example). First I thought that the consumer in the DNS Management was slow, but this puts the reply message onto the reply queue within 25ms. When I look what is going one with the reply queue in RabbitMQ admin panel, I see that the message is consumed, but the state is unacked. The strange thing is, that this only happens for a few zones at a specific environment. Zone A on production is slow, but the same zone on acceptance is fast. The only difference is that the environments are on a different server, but with the same software and configuration. The both use the same RabbitMQ server. My question is, what can I do to see why the process waits at the fread call? Is there a way to see in RabbitMQ if the message is consumed and the data is send to it?

    Hopefully you can give me some pointers. I would appreciate that.

    Thank you in advance.

    Cheers,

    Frank

    opened by frankvanhest 5
  • Change state to error when ClientException is throw

    Change state to error when ClientException is throw

    Hi,

    I was wondering if the state could be changed to error when a ClientException is thrown. It is probably only needed for specific cases. Let me explain. Sometimes I get a ClientException with the message Broken pipe or closed connection. or Could not write data to socket.. when using the synchronise Client. I tried different heartbeat settings, but sometimes due to synchronise blocking processing in the application the heartbeat comes to late and the connection is dropped by RabbitMQ. The internal state of the client will remain in a connected state. Calling disconnect or unsetting the Client instance will result in the same Exception. I would be happy to provide a PR, but before I do that I want to make sure the idea is a good one ;) In the synchronise Client in the methods read and write the ClientException is thrown. At that point before the exception is thrown, I'm planning to set the state there to error.

    Let me know if this is clear enough.

    opened by frankvanhest 0
  • Perhaps an example how to put multiple workers in one process

    Perhaps an example how to put multiple workers in one process

    When replying to a tweet from ReactPHP, @WyriHaximus responded with https://twitter.com/wyrihaximus/status/1411053618703314948?s=21 I was wondering if an example could be added on how to consume from multiple queues within one process using async. Currently I use multiple PHP processes to handle this. This is quite memory expensive when running 20 consumers. Hopefully there is an example or pointers on how to do this. I’m not quite experienced with async PHP. 😉

    opened by frankvanhest 0
Releases(v0.5.4)
Owner
Jakub Kulhan
Jakub Kulhan
anik/amqp wrapper for Laravel-ish frameworks

anik/laravel-amqp anik/amqp wrapper for Laravel-ish frameworks. Laravel Lumen Laravel Zero Examples Checkout the repository for example. Documentation

Syed Sirajul Islam Anik 22 Sep 7, 2022
PHP Library that implements several messaging patterns for RabbitMQ

Thumper Thumper is a PHP library that aims to abstract several messaging patterns that can be implemented over RabbitMQ. Inside the examples folder yo

php-amqplib 276 Nov 20, 2022
The most widely used PHP client for RabbitMQ

php-amqplib This library is a pure PHP implementation of the AMQP 0-9-1 protocol. It's been tested against RabbitMQ. The library was used for the PHP

php-amqplib 4.2k Jan 6, 2023
RabbitMQ driver for ThinkPHP6 Queue.

RabbitMQ driver for ThinkPHP6 Queue.

null 2 Sep 14, 2022
RabbitMQ driver for Laravel Queue. Supports Laravel Horizon.

RabbitMQ Queue driver for Laravel Support Policy Only the latest version will get new features. Bug fixes will be provided using the following scheme:

Vladimir Yuldashev 1.6k Dec 31, 2022
STOMP bindings for ReactPHP.

React/STOMP STOMP bindings for React. STOMP is a messaging protocol. It is supported by most message queue brokers, such as RabbitMQ, Apollo and many

Friends of ReactPHP 115 Sep 9, 2022
Bernard is a multi-backend PHP library for creating background jobs for later processing.

Bernard makes it super easy and enjoyable to do background processing in PHP. It does this by utilizing queues and long running processes. It supports

Bernard 1.2k Jan 2, 2023
PHP client for beanstalkd queue

Pheanstalk Next (5) The master branch will be a WIP branch for v5 of this library until it is released. If any patches are needed in v4 they should be

null 1.9k Dec 21, 2022
PHP bindings for Tarantool Queue.

Tarantool Queue Tarantool is a NoSQL database running in a Lua application server. It integrates Lua modules, called LuaRocks. This package provides P

Tarantool PHP 62 Sep 28, 2022
Message Queue, Job Queue, Broadcasting, WebSockets packages for PHP, Symfony, Laravel, Magento. DEVELOPMENT REPOSITORY - provided by Forma-Pro

Supporting Enqueue Enqueue is an MIT-licensed open source project with its ongoing development made possible entirely by the support of community and

Enqueue 2.1k Dec 22, 2022
PHP client for beanstalkd queue

Pheanstalk Pheanstalk is a pure PHP 7.1+ client for the beanstalkd workqueue. It has been actively developed, and used in production by many, since la

null 1.9k Dec 21, 2022
PHP-Queue: A unified front-end for different queuing backends. Includes a REST server, CLI interface and daemon runners.

A unified front-end for different queuing backends. Includes a REST server, CLI interface and daemon runners. Why PHP-Queue? Implementing a

CoderKungfu 646 Dec 30, 2022
Columnar analytics for PHP - a pure PHP library to read and write simple columnar files in a performant way.

Columnar Analytics (in pure PHP) On GitHub: https://github.com/envoymediagroup/columna About the project What does it do? This library allows you to w

Envoy Media Group 2 Sep 26, 2022
Async Redis client implementation, built on top of ReactPHP.

clue/reactphp-redis Async Redis client implementation, built on top of ReactPHP. Redis is an open source, advanced, in-memory key-value database. It o

Christian Lück 240 Dec 20, 2022
Async MySQL database client for ReactPHP.

MySQL Async MySQL database client for ReactPHP. This is a MySQL database driver for ReactPHP. It implements the MySQL protocol and allows you to acces

Friends of ReactPHP 302 Dec 11, 2022
Async HTTP proxy connector, tunnel any TCP/IP-based protocol through an HTTP CONNECT proxy server, built on top of ReactPHP.

clue/reactphp-http-proxy Async HTTP proxy connector, tunnel any TCP/IP-based protocol through an HTTP CONNECT proxy server, built on top of ReactPHP.

Christian Lück 43 Dec 25, 2022
Simple, async SOAP webservice client, built on top of ReactPHP.

clue/reactphp-soap Simple, async SOAP web service client library, built on top of ReactPHP. Most notably, SOAP is often used for invoking Remote proce

Christian Lück 62 Jul 5, 2022
Laravel Enqueue message queue extension. Supports AMQP, Amazon SQS, Kafka, Google PubSub, Redis, STOMP, Gearman, Beanstalk and others

Laravel queue package You can use all transports built on top of queue-interop including all supported by Enqueue. It also supports extended AMQP feat

Enqueue 204 Dec 22, 2022
anik/amqp wrapper for Laravel-ish frameworks

anik/laravel-amqp anik/amqp wrapper for Laravel-ish frameworks. Laravel Lumen Laravel Zero Examples Checkout the repository for example. Documentation

Syed Sirajul Islam Anik 22 Sep 7, 2022
Magento 2 Message Queue OS AMQP Broker Implementation

Magento 2 Message Queue AMQP Backend AMQP message queue backend implementation for Rcason_Mq. Installation Require the module via Composer $ composer

Renato 8 Jul 12, 2022