A lib to consume message from any Broker

Overview

Swarrot

Build Status Scrutinizer Quality Score Latest Stable Version Latest Unstable Version

Swarrot is a PHP library to consume messages from any broker.

Installation

The recommended way to install Swarrot is through Composer. Require the swarrot/swarrot package:

$ composer require swarrot/swarrot

Usage

Basic usage

First, you need to create a message provider to retrieve messages from your broker. For example, with a PeclPackageMessageProvider (retrieves messages from an AMQP broker with the pecl amqp package:

use Swarrot\Broker\MessageProvider\PeclPackageMessageProvider;

// Create connection
$connection = new \AMQPConnection();
$connection->connect();
$channel = new \AMQPChannel($connection);
// Get the queue to consume
$queue = new \AMQPQueue($channel);
$queue->setName('global');

$messageProvider = new PeclPackageMessageProvider($queue);

Once it's done you need to create a Processor to process messages retrieved from the broker. This processor must implement Swarrot\Processor\ProcessorInterface. For example:

use Swarrot\Processor\ProcessorInterface;
use Swarrot\Broker\Message;

class Processor implements ProcessorInterface
{
    public function process(Message $message, array $options)
    {
        echo sprintf("Consume message #%d\n", $message->getId());

        return true; // Continue processing other messages
    }
}

You now have a Swarrot\Broker\MessageProviderInterface to retrieve messages and a Processor to process them. So, ask the Swarrot\Consumer to do its job :

use Swarrot\Consumer;

$consumer = new Consumer($messageProvider, $processor);
$consumer->consume();

Using a stack

Heavily inspired by stackphp/builder you can use Swarrot\Processor\Stack\Builder to stack your processors. Using the built in processors or by creating your own, you can extend the behavior of your base processor. In this example, your processor is decorated by 2 other processors. The ExceptionCatcherProcessor which decorates your own with a try/catch block and the MaxMessagesProcessor which stops your worker when some messages have been consumed.

use Swarrot\Processor\ProcessorInterface;
use Swarrot\Broker\Message;

class Processor implements ProcessorInterface
{
    public function process(Message $message, array $options)
    {
        echo sprintf("Consume message #%d\n", $message->getId());
    }
}

$stack = (new \Swarrot\Processor\Stack\Builder())
    ->push('Swarrot\Processor\MaxMessages\MaxMessagesProcessor', new Logger())
    ->push('Swarrot\Processor\ExceptionCatcher\ExceptionCatcherProcessor')
    ->push('Swarrot\Processor\Ack\AckProcessor', $messageProvider)
;

$processor = $stack->resolve(new Processor());

Here is an illustration to show you what happens when this order is used:

this

Processors

Official processors

Create your own processor

To create your own processor and be able to use it with the StackProcessor, you just need to implement ProcessorInterface and to take another ProcessorInterface as first argument in constructor.

Deprecated processors & message providers / publishers

In order to reduce swarrot/swarrot dependencies & ease the maintenance, some processors & message providers / publishers have been deprecated in 3.x version. They will be deleted in 4.0.

If you use those deprecated classes you could create your own repository to keep them or we could create a dedicated repository under the swarrot organisation if you're willing to help to maintain them.

Message providers / publishers

  • SQS Message provider (in 3.5.0)
  • Stomp message providers (in 3.6.0)
  • Stomp message publishers (in 3.7.0)
  • Interop message publishers & providers (in 3.7.0)

Processors

  • SentryProcessor (in 3.5.0)
  • RPC related processors (in 3.5.0)
  • NewRelicProcessor (in 3.7.0)

Inspiration

License

Swarrot is released under the MIT License. See the bundled LICENSE file for details.

Comments
  • Queue interop producer\provider

    Queue interop producer\provider

    The PR adds provider\publisher that can work with any queue interop compatible transport.

    TODO:

    • [x] add tests
    • [x] add doc

    I welcome everyone who is interested in MQs to join the queue interop group

    opened by makasim 24
  • Properly set message properties when using the PhpAmqpLibMessageProvider

    Properly set message properties when using the PhpAmqpLibMessageProvider

    On #61, @odolbeau pointed that message properties were not properly set by the provider when using amqplib library.

    It appeared that the existing code was just setting additional headers and skipping all other properties.

    Rationale on implementation choices

    One way to solve that could have been this elegant (based on my very own and objective opinion :p) one-liner:

    $properties = union_merge($envelope->get_properties(), $envelope->delivery_info);
    

    Problem then comes when a client tries to access a key that has no value, as it raises an exception.

    The new implementation is less elegant but at least avoids this problem. I didn't delve too much into the C implementation for the pecl package, but does a function like $envelope->getContentType() return something or raises an exception when the content-type is not set at message publication?

    The second set of properties, delivery_info is tested using isset rather than has because, according to the doc:

    If you don't want to access the delivery_info array directly you can also use $msg->get('delivery_tag') but keep in mind that's slower than just accessing the array by key.

    To maintain uniformity between providers, some property names have been converted to the names used by the PeclPackageMessageProvider. For completeness, here they are:

    • is_redelivery (PeclPackageMessageProvider) / redelivered (PhpAmqpLibMessageProvider)
    • exchange_name (PeclPackageMessageProvider) / exchange (PhpAmqpLibMessageProvider)
    • headers (PeclPackageMessageProvider) / application_headers (PhpAmqpLibMessageProvider)

    For the same reason, the following fields have been added to the PeclPackageMessageProvider:

    • user_id
    • cluster_id
    • channel
    • consumer_tag

    Tests

    No tests so far as this PR will probably change, but do you have preferences / suggestions on this?

    opened by astorije 16
  • RPC Processors

    RPC Processors

    Thought this was missing. This is still a WIP though, as I just want to gather some feedbacks first.

    • [x] Add the RPC Server Processor
    • [x] Add the RPC Client Processor
    • [x] Add the README
    • [x] Add some mofo tests

    I'm not really sure on some points though (just trying to adapt from the official documentation and thought it could be useful here)

    opened by Taluu 15
  • [DRAFT][Need feedbacks] Consume POC

    [DRAFT][Need feedbacks] Consume POC

    Context

    In Go or Javascript I've notice that you can see how many consumers are connected, what are their names and with which arguments they have been popped

    capture d ecran 2018-01-09 a 15 20 38

    After searching a bit I understood that we can also achieve it in PHP and with swarrot, by using the consume method instead of the get method.

    Proposition

    So my suggestion would be to add the possibility to choose either the consume or the get method when consuming.

    Attached code is just an ugly draft to test it. I'd like to know if someone see an issue with this idea before going further ?

    | | Get | Consume | | --- | --- | --- | | Benefits | Non-Blocking | Long polling, Faster, show consumer in RMQ UI | | Drawbacks | Short polling, Treatments in SleepyInterface load the host | Blocking |

    Exemple

    capture d ecran 2018-01-10 a 21 19 43

    Measures

    Blackfire profiles

    • Get : https://blackfire.io/profiles/d63a7eef-fe40-476b-8850-d190334b6362/graph
    • Consume : https://blackfire.io/profiles/dd9bbb8f-f7e5-4aa7-83d9-bd036b74793d/graph

    Consume seems slightly more performant (~10%) than get

    vagrant@local:~/www/lafourchette-b2c-crm(master)$ time app/console swarrot:consume:customer_consumer --max-messages=1000 --method=get
    
    real	0m7.405s
    user	0m2.476s
    sys	0m0.228s
    vagrant@local:~/www/lafourchette-b2c-crm(master)$ time app/console swarrot:consume:customer_consumer --max-messages=1000 --method=consume
    
    real	0m5.400s
    user	0m1.952s
    sys	0m0.172s
    vagrant@local:~/www/lafourchette-b2c-crm(master)$ time app/console swarrot:consume:customer_consumer --max-messages=10000 --method=get
    
    real	1m0.685s
    user	0m20.997s
    sys	0m1.448s
    vagrant@local:~/www/lafourchette-b2c-crm(master)$ time app/console swarrot:consume:customer_consumer --max-messages=10000 --method=consume
    
    real	0m53.869s
    user	0m19.749s
    sys	0m1.196s
    
    opened by olaurendeau 13
  • The SignalHandlerProcessor is not php 5.3  compatible

    The SignalHandlerProcessor is not php 5.3 compatible

    In php 5.3 a lambda function is not aware of the context :

    <?php
      error_reporting(E_ALL);
      class SignalHandlerProcessor
      {
          protected static $shouldExit = false;
          public function process() 
          {
              $greet = function () {
                  SignalHandlerProcessor::$shouldExit = true;
              };
              $greet();
          }
    }
    
    $d = new SignalHandlerProcessor();
    $d->process();
    

    Will produce the error :

    PHP Fatal error:  Cannot access protected property SignalHandlerProcessor::$shouldExit on line 17
    

    I see only two fixes :

    • update the compatibility to php 5.4 on the composer.json
    • set shouldExit as public
    opened by JLepeltier 13
  • [enhancement] Replace Message with MessageInterface

    [enhancement] Replace Message with MessageInterface

    | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | no | BC breaks? | no | Deprecations? | no | Tests pass? | yes

    Currently we're restricted to using Swarrot\Broker\Message as our Message format. I would like to introduce Swarrot\Broker\MessageInterface to allow custom implementations of messages (and providers / publishers). Also added some basic functionality (setters, getProperty, setProperty) to the MessageInterface.

    Updated docs (including some old references to MessageProviders).

    Used https://github.com/queue-interop/queue-interop/blob/master/src/PsrMessage.php for inspiration.

    opened by Swahjak 12
  • add sqs provider

    add sqs provider

    I will share the provider I have written to use swarrot with amazon sqs.

    This code is already in production on our platform.

    As we are currently changing our queueing system, we didn't use a message publisher yet

    opened by nicolasThal 12
  • [ACKProcessor] Add queue name in log context

    [ACKProcessor] Add queue name in log context

    Hi,

    IMO it could be great to have such information in the log context What do you think? Also what do you think about rewriting all log message so they are invaraible, and moving variable(s) to the context?

    Thank you

    opened by noniagriconomie 11
  • Add support for publisher confirms

    Add support for publisher confirms

    This is adds support for publisher confirms (see here) while retaining backwards compatibility with custom publishers.

    If this is accepted I will also add support for publisher confirms in the symfony bundle.

    opened by borisbabic 11
  • Deprecate some processors & SQS message provider

    Deprecate some processors & SQS message provider

    In order to get rid of outdated dependencies (aws/aws-sdk-php & sentry/sentry) I'd like to deprecate some classes & remove them in 4.0.

    @nicolasThal are you still using swarrot with the SQS message provider? @notFloran are you still using the SentryProcessor? @Taluu are you still using RPC processors?

    Any concern @stof @lyrixx?

    opened by odolbeau 10
  • Use static log messages

    Use static log messages

    This plays better with logging systems: those will often group logs by message, showing how many time similar events happened.

    Fixes #187

    Not using placeholders for now because I don't know how widely used they are, tell me if you want me to.

    I fixed 2 out 18 occurrences for now, but I like early feedback.

    EDIT: 18/18

    opened by greg0ire 10
  • [Question] why Swarrot\Broker\Message::$id binds on delivery_tag ?

    [Question] why Swarrot\Broker\Message::$id binds on delivery_tag ?

    I can't use application's message identifier like this :

    $publisher->publish(new Message("body", [], 34));
    $message = $consumer->get();
    $message->getId(); // 34
    

    The Message identifier return the delivery_tag.

    I can make a PR but I don't understand this behavior, so I'm asking first.

    opened by rflavien 2
  • [Question] Reason for RPC processors deprecation and removal

    [Question] Reason for RPC processors deprecation and removal

    Hi,

    I want to ask if there any reason other than removing the outdated dependencies, that the RPC processors were deprecated and removed?

    I am thinking of using an RpcServer-like processor.

    PS: I've seen the comments on this PR: https://github.com/swarrot/swarrot/pull/197/files

    Thanks in advance, Val

    opened by koutsoumposval 2
  • [Feature request] Add a way to choose between different behaviors depending on the exception

    [Feature request] Add a way to choose between different behaviors depending on the exception

    In my current project, I want to retry the message for some errors (temporary failure of the SMTP server for instance), while some other errors should never be retried (malformed message which cannot be handled by my application at all for instance, and would just fail again when retrying). IT would be great to provide a way to achieve this (it might be done as a documentation page though if it requires too much custom logic to be added in existing processors)

    opened by stof 6
Releases(v4.14.0)
Owner
null
Laravel Enqueue message queue extension. Supports AMQP, Amazon SQS, Kafka, Google PubSub, Redis, STOMP, Gearman, Beanstalk and others

Laravel queue package You can use all transports built on top of queue-interop including all supported by Enqueue. It also supports extended AMQP feat

Enqueue 204 Dec 22, 2022
This is an implementation of PSR specification. It allows you to send and consume message with Redis store as a broker.

This is an implementation of PSR specification. It allows you to send and consume message with Redis store as a broker.

Enqueue 35 Nov 4, 2022
Disque is a distributed message broker

Disque, an in-memory, distributed job queue Disque is an ongoing experiment to build a distributed, in-memory, message broker. Its goal is to capture

Salvatore Sanfilippo 7.9k Jan 7, 2023
Magento 2 Message Queue OS AMQP Broker Implementation

Magento 2 Message Queue AMQP Backend AMQP message queue backend implementation for Rcason_Mq. Installation Require the module via Composer $ composer

Renato 8 Jul 12, 2022
A Laravel Wrapper for the CoinDCX API. Now easily connect and consume the CoinDCX Public API in your Laravel apps without any hassle.

This package provides a Laravel Wrapper for the CoinDCX API and allows you to easily communicate with it. Important Note This package is in early deve

Moinuddin S. Khaja 2 Feb 16, 2022
A Laravel Wrapper for the Binance API. Now easily connect and consume the Binance Public & Private API in your Laravel apps without any hassle.

This package provides a Laravel Wrapper for the Binance API and allows you to easily communicate with it. Important Note This package is in early deve

Moinuddin S. Khaja 7 Dec 7, 2022
jMQTT is a plugin for Jeedom aiming to connect Jeedom to an MQTT broker to subscribe and publish messages

jMQTT is a plugin for Jeedom aiming to connect Jeedom to an MQTT broker to subscribe and publish messages

null 19 Dec 27, 2022
Tiny PHP lib to transform a number into french words.

Number To FR Words English I've written this tiny library to easily transform a number into french words. This project came up when I had to automatic

sicaa 6 Apr 27, 2022
Tiny php mysql lib (PDO-based) with handy fetch/update functionality, supports both SQL and parametric queries

Micro PHP mysql lib (~ 200 lines of code) with ultra powerful CRUD for faster than ever development: parametric fetch/insert/update/delete (based on a

Mr Crypster 18 Dec 10, 2022
Slim Framework 3 Skeleton Application + PagSeguro Lib

Slim Framework 3 Skeleton Application + PagSeguro Lib Aplicação simples para geração do Token para pagamentos no PagSeguro (método transparente) e env

Raí Siqueira 1 Feb 26, 2018
A simplified SMPP client lib for sending or receiving smses through SMPP v3.4.

PHP-based SMPP client lib This is a simplified SMPP client lib for sending or receiving smses through SMPP v3.4. In addition to the client, this lib a

Qranio.com 2 Jul 26, 2022
High performance Clickhouse PHP lib with progress tracking, parametric queries and compression support

Clickhousy High performance Clickhouse PHP library featuring: Tiny memory footprint based on static class (times better than smi2 client) High level m

Denys Golotiuk (hacking) 5 Dec 24, 2022
A CLI program that helps you check your endpoints by requesting the given servers and send a report message in any supported channel like Telegram

API Monitor A CLI program that help you check your endpoints by requesting the given servers and send a report message in any supported channel ( Tele

Hussein Feras 51 Aug 21, 2022
This API aims to present a brief to consume a API resources, mainly for students in the early years of Computer Science courses and the like.

Simple PHP API v.1.0 This API aims to present a brief to consume a API resources, mainly for students in the early years of Computer Science courses a

Edson M. de Souza 14 Nov 18, 2021
A Symfony bundle built to schedule/consume repetitive tasks

Daily runs Code style Infection PHPUnit Rector Security Static analysis A Symfony bundle built to schedule/consume repetitive tasks Main features Exte

Guillaume Loulier 98 Jan 4, 2023
Builds nice, normalized and easy to consume REST JSON responses for Laravel powered APIs.

REST API Response Builder for Laravel Master branch: Development branch: Table of contents Introduction Why should I use it? Usage examples Features E

Marcin Orlowski 614 Dec 26, 2022
Symfony kafka bundle to produce and consume messages.

Technology stack Quick start Example project Basic Configuration Consuming messages Retrying failed messages Handling offsets Decoders Denormalizers V

STS Gaming Group 25 Oct 3, 2022
A PHP 7.4+ library to consume the Confluent Schema Registry REST API

A PHP 7.4+ library to consume the Confluent Schema Registry REST API. It provides low level functions to create PSR-7 compliant requests that can be used as well as high level abstractions to ease developer experience.

Flix.TECH 38 Sep 1, 2022
EasyRdf is a PHP library designed to make it easy to consume and produce RDF.

EasyRdf EasyRdf is a PHP library designed to make it easy to consume and produce RDF. It was designed for use in mixed teams of experienced and inexpe

EasyRdf 578 Dec 23, 2022
Client library to consume the 42 Intranet's API

ft-client Client library to consume the 42 Intranet's API Installation composer require mehdibo/ft-client Usage examples Using the Authorization Code

Mehdi Bounya 3 Nov 23, 2022