Command and event buses interface and logic.

Overview

CoreBus - Command and Event buses interfaces

Discrete command bus and domain event dispatcher interfaces for message based architectured projects.

Discrete means that your domain code will not be tainted by this component hard-dependency, aside attributes used for targeting command handler methods and event listener methods. Your domain business code remains dependency-free.

Event bus features:

  • Internal synchronous event dispatcher.
  • Event listener locator based upon PHP attributes.
  • Event listener locator fast dumped PHP cache.
  • External event dispatcher (unplugged yet).

Command bus features:

  • Transactional synchronous command bus that handles your transactions.
  • Event buffering during transactions, which flushes events to the external event dispatcher only in case of transaction success.
  • Command handler locator based upon PHP attributes.
  • Command handler locator fast dumped PHP cache.
  • Default transaction implementation using makinacorpus/goat-query.
  • Command asynchronous dispatcher with implementation that plugs to makinacorpus/message-broker message broker interface.

Other various features:

  • Worker object for consuming asynchronous events in CLI.
  • Symfony integration for everything, including console commands for the command bus worker.
  • Global attributes for aspect-driven domain code configuration.
  • Simple command bus interface.

Design

Basic design

Expected runtime flow is the following:

  • Commands may be dispatched to trigger writes in the system.
  • Commands are always asynchronously handled, they may return a response.
  • One command implies one transaction on your database backend.
  • During a single command processing, the domain code may raise one or many domain events.
  • Domain events are always dispatched synchronously within your domain code, within the triggering command transaction.

During the whole command processing, the database transaction will be isolated if the backend permits it. Commit is all or nothing, including events being emitted and listener execution during the process.

Transaction and event buffer

Transaction handling will be completely hidden in the implementations, your business code will never see it, here is how it works:

  • Domain events while emitted and dispatched internally are stored along the way into a volatile in-memory temporary buffer.
  • Once command is comsumed and task has ended, transaction will commit.
  • In case of success, buffer is flushed and events may be sent to a bus for external application to listen to.
  • In case of failure, transaction rollbacks, event buffer is emptied, events are discarded without further action.

Transactions can be disabled on a per-command basis, using PHP attributes on the command class.

Optional event store

If required for your project, you may plug an event store on the event dispatcher. Two options are possible:

  • Plug in into the internal event dispatcher, events will be stored along the way, this requires that the event store works on the same database transaction, hence connection, than your domain repositories.
  • Plug in into the event buffer output, which means events will be stored after commit, there is no consistency issues anymore, but if event storage procedure fails, you will loose history.

Implementations

Two implementations are provided:

  • In-memory bus, along with null transaction handling (no transaction at all) ideal for prototyping and unit-testing.
  • PostgreSQL bus implementation using makinacorpus/goat for message broker, and makinacorpus/goat-query for transaction handling using the same database connection, reliable and guaranteing data consistency.

Everything is hidden behind interfaces and different implementations are easy to implement. Your projects are not required to choose either one of those implementations, in the opposite, is encouraged implementing its own.

Roadmap

Short-term

  • Import and rewrite MessageBroker from makinacorpus/goat, or create a new package for it.
  • Write a dispatcher implementation that uses MessageBroker.
  • Import and rewrite RetryStrategy from makinacorpus/goat.
  • Import and rewrite RetryDispatcherDecorator from makinacorpus/goat.
  • Plug RetryDispatcherDecorator via Symfony bundle.
  • Import and rewrite the Worker from makinacorpus/goat.
  • Plug worker as a Symfony command.
  • Write proper documentation.

Middle-term

  • Implement profiling decorator for event bus using makinacorpus/profiling.
  • Implement profiling decorator for command bus using makinacorpus/profiling.

Long-term

  • Allow multiple message brokers to co-exist, one for each queue.
  • Discriminate queues in Symfony commands.
  • Implement dead letter queue routing.
  • Add message routing capabilities via the broker.
  • Create a retry strategy chain for having more than one instance.
  • Implement retry strategy using our attributes.

Setup

Standalone

There is no standalone setup guide for now. Refer to provided Symfony configuration for a concrete example.

Using Symfony

Simply enable the bundle in your config/bundles.php file:

return [
    // ... your other bunbles.
    MakinaCorpus\CoreBus\Bridge\Symfony\CoreBusBundle::class => ['all' => true],
];

You may add an additional config/packages/corebus.yaml file, althought configuration options remain very limited at this time:

corebus:
    #
    # Default adapter.
    #
    # Since only the "goat" one is implemented, this is the default value
    # so in fact, you probably should not write this.
    #
    adapter: goat

    #
    # Adapter options.
    #
    # All values here are arbitrary and will depend from the adapter.
    # As of today, the only existing option is "event_store" (boolean)
    # for the "goat" adapter, which plugs or unplugs the event store
    # onto the dispatcher.
    #
    adapter_options:
        event_store: true

Usage

Commands and events

Commands are plain PHP object and don't require any dependency.

Just write a Data Transport Object:

declare(strict_types=1);

namespace App\Domain\SomeBusiness\Command;

final class SayHelloCommand
{
    public readonly string $name;

    public function __construct(string $name)
    {
        $this->name = $name;
    }
}

Same goes with events, so just write:

declare(strict_types=1);

namespace App\Domain\SomeBusiness\Event;

final class HelloWasSaidEvent
{
    public readonly string $name;

    public function __construct(string $name)
    {
        $this->name = $name;
    }
}

Register handlers using base class

Tie a single command handler:

declare(strict_types=1);

namespace App\Domain\SomeBusiness\Handler;

use MakinaCorpus\CoreBus\CommandBus\AbstractCommandHandler;

final class SayHelloHandler extends AbstractCommandHandler
{
    /*
     * Method name is yours, you may have more than one handler in the
     * same class, do you as wish. Only important thing is to implement
     * the Handler interface (here via the AbstractHandler class).
     */
    public function do(SayHelloCommand $command)
    {
        echo "Hello, ", $command->name, "\n";

        $this->notifyEvent(new HelloWasSaidEvent($command->name));
    }
}

Please note that using the AbstractCommandHandler base class is purely optional, it's simply an helper for being able to use the event dispatcher and command bus from within your handlers.

Alternatively, if you don't require any of those, you may just:

  • Either set the #[MakinaCorpus\CoreBus\Attr\CommandHandler] attribute on the class, case in which all of its methods will be considered as handlers.
  • Either set the #[MakinaCorpus\CoreBus\Attr\CommandHandler] attribute on each method that is an handler.

You may also write as many event listeners as you wish, then even may emit events themselves:

declare(strict_types=1);

namespace App\Domain\SomeBusiness\Listener;

use MakinaCorpus\CoreBus\EventBus\EventListener;

final class SayHelloListener implements EventListener
{
    /*
     * Method name is yours, you may have more than one handler in the
     * same class, do you as wish. Only important thing is to implement
     * the EventListener interface.
     */
    public function on(HelloWasSaidEvent $event)
    {
        $this->logger->debug("Hello was said to {name}.", ['name' => $event->name]);
    }
}

Same goes for event listeners, the base class is just here to help but is not required, you may just:

  • Either set the #[MakinaCorpus\CoreBus\Attr\EventListener] attribute on the class, case in which all of its methods will be considered as listeners.
  • Either set the #[MakinaCorpus\CoreBus\Attr\EventListener] attribute on each method that is an listener.

This requires that your services are known by the container. You have three different options for this.

First one, which is Symfony's default, autoconfigure all your services:

services:
    _defaults:
        autowire: true
        autoconfigure: true
        public: false

    everything:
        namespace: App\Domain\
        resource: '../src/Domain/*'

Or if you wish to play it subtle:

services:
    _defaults:
        autowire: true
        autoconfigure: true
        public: false

    handler_listener:
        namespace: App\Domain\
        resource: '../src/Domain/*/{Handler,Listener}'

Or if you want to do use the old ways:

services:
    App\Domain\SomeBusiness\Handler\SayHelloHandler: ~
    App\Domain\SomeBusiness\Listener\SayHelloListener: ~

In all cases, you don't require any tags or any other metadata as long as you either extend the base class, or use the attributes.

Register handlers using attributes

Tie a single command handler:

declare(strict_types=1);

namespace App\Domain\SomeBusiness\Handler;

use MakinaCorpus\CoreBus\EventBus\EventBusAware;
use MakinaCorpus\CoreBus\EventBus\EventBusAwareTrait;

final class SayHelloHandler implements EventBusAware
{
    use EventBusAwareTrait;

    /*
     * Method name is yours, you may have more than one handler in the
     * same class, do you as wish. Only important thing is to implement
     * the Handler interface (here via the AbstractHandler class).
     */
    #[MakinaCorpus\CoreBus\Attr\CommandHandler]
    public function do(SayHelloCommand $command)
    {
        echo "Hello, ", $command->name, "\n";

        $this->notifyEvent(new HelloWasSaidEvent($command->name));
    }
}

You may also write as many event listeners as you wish, then even may emit events themselves:

declare(strict_types=1);

namespace App\Domain\SomeBusiness\Listener;

final class SayHello
{
    /*
     * Method name is yours, you may have more than one handler in the
     * same class, do you as wish. Only important thing is to implement
     * the EventListener interface.
     */
    #[MakinaCorpus\CoreBus\Attr\EventListener]
    public function on(HelloWasSaidEvent $event)
    {
        $this->logger->debug("Hello was said to {name}.", ['name' => $event->name]);
    }
}

If you correctly plug the Symfony container machinery, glue will be completely transparent.

Using attributes

This package comes with an attribute support for annotating commands and events in order to infer behaviors to the bus. This allows to declare commands or event behaviour without tainting the domain code.

Command attributes

  • #[MakinaCorpus\CoreBus\Attr\Async] forces the command to always be dispatched asynchronously.

  • #[MakinaCorpus\CoreBus\Attr\NoTransaction] disables transaction handling for the command. Use it wisely.

  • #[MakinaCorpus\CoreBus\Attr\Retry(?int)] allows the command to be retried in case an error happen. First parameter is the number of retries allowed, default is 3. Warning, this is not implemented yet, and is an empty shell.

Domain event attributes.

  • #[MakinaCorpus\CoreBus\Attr\Aggregate(string, ?string)] allows the developer to explicitely tell which aggregate (entity or model) this event targets. First argument must be a property name of the event that is the aggregate identifier, second argument is optional, and is the target aggregate class or logicial name. If you are using an event store, aggregate type is only mandatory for aggregate stream creation events, identifier will be enough for appending event in an existing stream.

Configuration attributes

  • #[MakinaCorpus\CoreBus\Attr\CommandHandler] if set on a class, will force the bus to introspect all methods and register all its methods as command handlers, if on a single method, will register this explicit method as being a command handler.

  • #[MakinaCorpus\CoreBus\Attr\EventListener] if set on a class, will force the bus to introspect all methods and register all its methods as event listeners, if on a single method, will register this explicit method as being an event listener.

For all those attributes, parameters are optional, but you might set the target parameter to disambiguate which class the handler or listener catches. Using this, you can use interfaces for matching instead of concrete classes.

Overriding implementations

Any interface in this package is a service in the dependency injection container you will use. You may replace or decorate any of them.

Future work

Provide a bare PostgreSQL message broker implementation

Because when you have simple needs, you need simple implementations, and the current makinacorpus/goat implementation is too complex.

Add a GenericCommand class

Goal

This class will be a simple but dynamic DTO whoses values are dynamically hydrated as an array from the decoded command input.

This class will yield its logical name and raw values.

This class in conjunction with the Handler attribute will allow developers to define and consume commands without writing their equivalent PHP classes.

Downside of using such generic command class is that the user will not be able to use access or any other attributes on their commands.

Generic command definition

A new command registry will carry all user-defined commands, using their logical names as keys, and their mapped PHP class. For generic commands, it may also carry values list and values types, for hydration. This will pave the way for an automatic input data validation based upon the definition.

Handler argument resolver

This pluggable component will allow users to rely upon automatic service injection as handler method typed arguments. Of course implementation and details will depend upon the framework implementation.

You might also like...
Notify instructors about unconfirmed event registrations.
Notify instructors about unconfirmed event registrations.

SAC Event Registration Reminder Folgende Idee: Es wird ein Reminder-Tool benötigt, welches Tourenleitende und Bergführer/innen per E-Mail daran erinne

A Laravel artisan based package to create the AWS (SES + SNS) infrastructure to receive email event notifications with Http/Https endpoint.
A Laravel artisan based package to create the AWS (SES + SNS) infrastructure to receive email event notifications with Http/Https endpoint.

Laravel SES Tracking Setup the AWS infrastructure to handle email events using SES/SNS and http/s endpoints with a single Laravel artisan command. Thi

Allows installing Drupal extensions event if not compatible with installed drupal/core package

mglaman/composer-drupal-lenient Lenient with it, Drupal 10 with it. Why? The Drupal community introduced a lenient Composer facade that modified the d

JsonCollectionParser - Event-based parser for large JSON collections (consumes small amount of memory)

Event-based parser for large JSON collections (consumes small amount of memory). Built on top of JSON Streaming Parser This packa

A high-performance event loop library for PHP
A high-performance event loop library for PHP

🐇 A high-performance event loop library for PHP 🐇

PHP Event Emitter

InitPHP EventEmitter This library has been designed to emit events in its simplest and simplest form. Requirements PHP 5.6 or higher Installation comp

A "from scratch" PHP-based implementation of Event-Sourcing

In here, you will find a "from scratch" PHP-based implementation of Event-Sourcing, kept to a minimum on purpose, to allow workshop attendees to explore and experiment with its concepts.

The easiest way to get started with event sourcing in Laravel
The easiest way to get started with event sourcing in Laravel

Event sourcing for Artisans 📽 This package aims to be the entry point to get started with event sourcing in Laravel. It can help you with setting up

PHP OOP interface for writing Slack Block Kit messages and modals
PHP OOP interface for writing Slack Block Kit messages and modals

Slack Block Kit for PHP 👉 For formatting messages and modals for Slack using their Block Kit syntax via an OOP interface 👈 By Jeremy Lindblom (@jere

Comments
Owner
Makina Corpus
Consulting & development services for web and mobile applications. Working on environmental issues, spatial analysis and data visualisation.
Makina Corpus
YL MVC Structure (PHP MVC) is a pattern made in PHP used to implement user interfaces, data, and controlling logic.

YL MVC Structure (PHP MVC) is a pattern made in PHP used to implement user interfaces, data, and controlling logic. It is built based on the combination of ideas from the Yii framework and Laravel framework (yl).

Tan Nguyen 3 Jan 3, 2023
A collection of command line scripts for Magento 2 code generation, and a PHP module system for organizing command line scripts.

What is Pestle? Pestle is A PHP Framework for creating and organizing command line programs An experiment in implementing python style module imports

Alan Storm 526 Dec 5, 2022
True coroutines for PHP>=8.1 without worrying about event loops and callbacks.

Moebius Pure coroutines for PHP 8.1. To promises and callbacks needed. Just pure parallel PHP code inside coroutines. Moebius Band: A loop with only o

Frode Børli 204 Dec 21, 2022
Implement event systems, signal slots, intercepting filters, and observers.

zend-eventmanager Repository abandoned 2019-12-31 This repository has moved to laminas/laminas-eventmanager. zend-eventmanager is designed for the fol

Zend Framework 1.7k Dec 9, 2022
True coroutines for PHP>=8.1 without worrying about event loops and callbacks.

Moebius Pure coroutines for PHP 8.1. No promises and callbacks needed. Just pure parallel PHP code inside coroutines. Moebius Band: A loop with only o

Moebius for PHP 141 Jun 16, 2022
Because every Wedding RSVP website needs to follow DDD, CQRS, Hexagonal Architecture, Event Sourcing, and be deployed on Lambda.

Our Wedding Website Because every Wedding RSVP website needs to follow DDD, CQRS, Hexagonal Architecture, Event Sourcing, and be deployed on Lambda. ?

Edd Mann 3 Aug 21, 2022
Ecotone Framework is Service Bus Implementation. It enables message driven architecture and DDD, CQRS, Event Sourcing PHP

This is Read Only Repository To contribute make use of Ecotone-Dev repository. Ecotone is Service Bus Implementation, which enables message driven arc

EcotoneFramework 308 Dec 29, 2022
An Infection + Last Man Standing Event plugin written for OwnagePE

KitPvPEvent An Infection + Last Man Standing Event plugin written for OwnagePE This plugin was a speedcode. I kinda woke up really late on the day I w

OwnagePE Network 2 May 26, 2022
Psalm plugin for patchlevel/event-sourcing

event-sourcing-psalm-plugin psalm plugin for event-sourcing library. installation composer require --dev patchlevel/event-sourcing-psalm-plugin confi

patchlevel 4 Dec 14, 2022
This library is an implementation of League\Event for Slim Framework

Slim Event Dispatcher This library is an implementation of League\Event for Slim Framework. This works with the latest version of Slim (V3). Installat

Aneek Mukhopadhyay 7 Aug 23, 2022