League\Pipeline

Overview

League\Pipeline

Author Maintainer Build Status Coverage Status Quality Score Software License Packagist Version Total Downloads

This package provides a pipeline pattern implementation.

Pipeline Pattern

The pipeline pattern allows you to easily compose sequential stages by chaining stages.

In this particular implementation the interface consists of two parts:

  • StageInterface
  • PipelineInterface

A pipeline consists of zero, one, or multiple stages. A pipeline can process a payload. During the processing the payload will be passed to the first stage. From that moment on the resulting value is passed on from stage to stage.

In the simplest form, the execution chain can be represented as a foreach:

$result = $payload;

foreach ($stages as $stage) {
    $result = $stage($result);
}

return $result;

Effectively this is the same as:

$result = $stage3($stage2($stage1($payload)));

Immutability

Pipelines are implemented as immutable stage chains. When you pipe a new stage, a new pipeline will be created with the added stage. This makes pipelines easy to reuse, and minimizes side-effects.

Usage

Operations in a pipeline, stages, can be anything that satisfies the callable type-hint. So closures and anything that's invokable is good.

$pipeline = (new Pipeline)->pipe(function ($payload) {
    return $payload * 10;
});

Class based stages.

Class based stages are also possible. The StageInterface can be implemented which ensures you have the correct method signature for the __invoke method.

use League\Pipeline\Pipeline;
use League\Pipeline\StageInterface;

class TimesTwoStage implements StageInterface
{
    public function __invoke($payload)
    {
        return $payload * 2;
    }
}

class AddOneStage implements StageInterface
{
    public function __invoke($payload)
    {
        return $payload + 1;
    }
}

$pipeline = (new Pipeline)
    ->pipe(new TimesTwoStage)
    ->pipe(new AddOneStage);

// Returns 21
$pipeline->process(10);

Re-usable Pipelines

Because the PipelineInterface is an extension of the StageInterface pipelines can be re-used as stages. This creates a highly composable model to create complex execution patterns while keeping the cognitive load low.

For example, if we'd want to compose a pipeline to process API calls, we'd create something along these lines:

$processApiRequest = (new Pipeline)
    ->pipe(new ExecuteHttpRequest) // 2
    ->pipe(new ParseJsonResponse); // 3
    
$pipeline = (new Pipeline)
    ->pipe(new ConvertToPsr7Request) // 1
    ->pipe($processApiRequest) // (2,3)
    ->pipe(new ConvertToResponseDto); // 4 
    
$pipeline->process(new DeleteBlogPost($postId));

Pipeline Builders

Because pipelines themselves are immutable, pipeline builders are introduced to facilitate distributed composition of a pipeline.

The pipeline builders collect stages and allow you to create a pipeline at any given time.

use League\Pipeline\PipelineBuilder;

// Prepare the builder
$pipelineBuilder = (new PipelineBuilder)
    ->add(new LogicalStage)
    ->add(new AnotherStage)
    ->add(new LastStage);

// Build the pipeline
$pipeline = $pipelineBuilder->build();

Exception handling

This package is completely transparent when dealing with exceptions. In no case will this package catch an exception or silence an error. Exceptions should be dealt with on a per-case basis. Either inside a stage or at the time the pipeline processes a payload.

$pipeline = (new Pipeline)->pipe(function () {
    throw new LogicException();
});
    
try {
    $pipeline->process($payload);
} catch(LogicException $e) {
    // Handle the exception.
}
Comments
  • Adding Support for Variadic Arguments. Deprecating support for PHP 5.5

    Adding Support for Variadic Arguments. Deprecating support for PHP 5.5

    Following on https://github.com/thephpleague/pipeline/issues/18, this PR adds support for Variadic functions to be used as with the pipeline.

    Unfortunately, this would require to drop support for PHP 5.5, as this feature is only available on 5.6+.

    opened by ingluisjimenez 15
  • Use callable instead of StageInterface::process

    Use callable instead of StageInterface::process

    In order to be more compatible with other packages, like tactician, the StageInterface was problematic and required additional wrapping to play nice with others. Stages, in concept, are already pretty close to callable ... things. The StageInterface is still in place, but now enforces the __invokable methods with a single method parameter signature. This is, not coincidentally, is exactly the signature of the "old" StageInterface::process.

    So, in short. Let's make this package more open to work with others.

    UPGRADE!

    So simple. Every implementation of the StageInterface should replace the method name process with __invoke. Also, the CallableStage is now not needed anymore and can be unwrapped, so those are quite easy to port over. So, not much change, many benefits.

    EXAMPLES!

    // BEFORE
    $pipeline->pipe(CallableStage::forCallable(function ($payload) {
        return $payload;
    }))->process($payload);
    
    // After
    $pipeline->pipe(function ($payload) {
        return $payload;
    })->process($payload);
    

    Stages:

    // BEFORE
    class MyStage implements StageInterface
    {
        public function process($payload)
        {
            return $payload;
         }
    }
    
    // AFTER
    class MyStage implements StageInterface
    {
        public function __invoke($payload)
        {
            return $payload;
         }
    }
    

    The rest of the API hasn't changed.

    THOUGHTS!

    Thoughts? .. /cc @rosstuck

    opened by frankdejonge 8
  • PHP 5.4 compatibility

    PHP 5.4 compatibility

    Hi,

    I'm currently stuck on PHP 5.4 and wanted to use this package. The changes to make the package PHP 5.4 compatible were quite trivial, if you want to support 5.4 I can wrap up a PR if you like. If not, I understand as PHP 5.4 is near EOL.

    opened by RonRademaker 5
  • [Discussion] Renamed Operation to Stage

    [Discussion] Renamed Operation to Stage

    In the pattern definition the operations are called stages. Implementations in other languages use this terminology as well. I think this, and the benefit of clarity can justify a rename.

    opened by frankdejonge 5
  • Error handling

    Error handling

    Hello,

    I am curious of how pipeline suggests we handle errors in our processors? I think the documentation should cover this case, even if it's just "wrap your calls to the pipeline in your own try/catches" :)

    question 
    opened by r4j4h 4
  • Remove InterruptibleProcessor and Rename FingersCrossedProcessor

    Remove InterruptibleProcessor and Rename FingersCrossedProcessor

    I would like to mention a criticism of recent decisions on this awesome package, and request input from others.

    The recent addition of the additional processors I feel was kind of messy compared to the resistance to bloat this project has taken in the recent past. I think the addition of the Processor interface was a fantastic idea; however, I feel the implementation was overreaching. The InterruptibleProcessor should have been an implementation detail of someone's project utilizing pipelines. Taking this approach would allow the library to resist the bloat of trying to predict/satisfy every users desired implementation. I hate BC breaks, but this library is less than 1.0, is this something the maintainers would consider allowing modification of?

    To go along with this, since the FingersCrossedProcessor is the default Processor and the one we have all loved so much that we have been using this package Pre-1.0, doesn't it seem more fitting that its named with the respect it deserves? Such as Processor or DefaultProcessor.

    Of course I mean no disrespect to the author of the changes, because the code was thoughtful and well written.

    I will happily be providing more of my input in the PRs on this project in the future because I am passionate about the potential of this library, but I am certainly interested in other people's thoughts on this issue.

    opened by GreatOwl 3
  • Add PipelineBuilderInterface

    Add PipelineBuilderInterface

    I feel that every part of this library deserves to be Interfaced so that individual applications can choose to implement unique functionality. Hence, I propose interfacing the PipelineBuilder, I'm willing to open a PR myself, but I want to get a sense of what the maintainer's feeling on acceptance would be.

    opened by GreatOwl 3
  • [Feature Request] Fork-Join

    [Feature Request] Fork-Join

    Sometimes based on some condition, a pipeline needs to be forked, meaning there is a need to follow one of several disparate paths which have nothing in common. Optionally, later the disjoint paths may join again.

    $pipeline = (new Pipeline)
        ->pipe(new TimeTwoStage)
        ->pipe(new AddOneStage)
        ->fork(function($payload) {
                if($payload == 0) return "zero";
                if($payload < 0) return "-";
                if($payload > 0) return "+";
                return false; // for short-circuit
            })
            ->disjoin("zero", $zeroProcessingPipeline)
            ->disjoin("-", $negativeNumberProcessingPipeline)
            ->disjoin("+", $positiveNumberProcessingPipeline)
            ->join()
        ->pipe(new DivideByFiveStage);
    
    opened by juzerali 2
  • [Feature Request] Attach a listener to a pipeline

    [Feature Request] Attach a listener to a pipeline

    The concept of a pipe sounds very good but I was wondering if we can attach listeners for each stage

    For example, we can have like PielineListenerInteface

    interface PipelineListenerInterface
    {
        function before($context, $stageName);
    
        function onError($context, $stageName, $exception);
    }
    

    Then we can easily log all the stages instead of catching all the pipelines.

    It will be very cool if we can register the listener on project level (not on a pipeline).

    opened by alexandrubeu 2
  • quick question on the pipe methos

    quick question on the pipe methos

    I noticed when you pipe, you must do:

    $pipeline = $pipeline->pipe(WHATEVER);
    

    as the pipe clones the old pipeline and returns the cloned pipeline.

    Now if we have 100 pipes (just an example)

    isn't it better to simply return the same pipeline and not wasting memory on never used old pipelines that were cloned....

    opened by tzookb 2
  • simplify syntax for adding stages

    simplify syntax for adding stages

    Adding __call method to Pipeline allows for a simplified syntax for adding Stages. For example:

    $pipe = (new Pipeline())->StageOne()->StageTwo(10);
    

    The name of the class (implementation of StageInterface) to add is simply provided as the method name, then __call does the magic of newing it up and calling ->pipe().

    NB: composer.json updated as variadic operator requires PHP 5.6

    opened by darrenmothersele 2
  • [Feature Request] Lazily retrieve pipeline stages from PSR-11 container

    [Feature Request] Lazily retrieve pipeline stages from PSR-11 container

    Description

    Pipelines are great for data processing. However, there may be cases where the data fed into the pipeline is invalid, causing any stage to fail. That means there can be quite a few pipeline stages that we loaded, configured, et cetera, that are not going to be called. We can optimize performance in these cases by lazily initializing pipeline stages.

    Instead of coming up with some bespoke interface to do so, we can instead delegate this to an existing PSR-11 container implementation. PSR-11 can be considered quite mature at this point, and seems like a good match.

    So, instead of doing:

    $pipeline = (new PipelineBuilder())
        ->add($container->get(MyFirstStage::class))
        ->add($container->get(MySecondStage::class))
        ->add(function ($payload) {
            return $payload * 10;
        })
        ->add($container->get(MyFourthStage::class))
        ->build();
    // Every stage has now gone through initialization
    

    We might have something like:

    $pipeline = (new ContainerAwarePipelineBuilder($container))
        ->add(MyFirstStage::class)
        ->add(MySecondStage::class)
        ->add(function ($payload) {
            // Adding callable stages directly still works
            return $payload * 10;
        })
        ->add(MyFourthStage::class)
        ->build();
    // Stages from the container will not be initialized at this point,
    // they will be initialized when the stage is invoked
    

    What problem does this solve

    As mentioned; lazy loading can do a lot for performance in larger applications. This idea came up because in my application I have a data processing pipeline with various stages that can fail. There are also (class based) stages that interact with a remote database, use configuration files, etc, which are expensive to initialize.

    The cleanest way to write these stages would usually be a simple class where dependencies are passed to the constructor and initialization like preparing SQL statements, parsing a configuration file, etc are done in the constructor as well. Then the __invoke() method is ready to just do its work.

    However, that setup is expensive: not only the initialization that happens within the stage itself, but also the dependencies the stage depends upon need already be resolved. For example, if a stage depends on a PDO object to do it's database work, we need to already set up a connection to the database.

    That means that if the pipeline is processing some payload that fails during the very first stage (i.e. a validation step fails), we already have done the expensive initialization for all the stages that follow it but that are never going to be invoked.

    (A currently possible workaround is passing a container instance into the stages and have them lazily load their dependencies and do setup lazily whenever the stage is first invoked. This adds a lot of code complexity to the stages, and passing a container around like that is a bit of an anti-pattern. Solving this within the Pipeline abstraction would generally make for much nicer code.)

    Brainstorm: If we implement this, how?

    • We can probably support both already ready to go stages and stages that need to be fetched from the container with a single interface: We would widen the types allowable for a stage to callable|string: if it's a callable, it is used directly as a stage. If it's a non-callable string, then it's used as a key to retrieve the stage from the container.
    • Alternatively, we add this functionality to a separate class (for example ContainerAwarePipeline(Builder) as in my example above). We would still need to widen the callable type used in the interfaces.
    • For those interacting with the classes/interfaces widening the types wouldn't be a problem, but for those implementing custom classes based on them that would be a BC break. So yes, this would need a new major version.
    • In documentation and examples, we can use the excellent League Container, which itself already provides documentation on how to do lazy-loading.
    • Another design choice: Do we retrieve the stages once and cache them? Or do we delegate that to the container as well, and just retrieve the stage every time we need it? Probably the latter, but might be worth discussing.

    I'd be happy to do the initial work and make a pull request, if the Pipeline maintainers are interested to have this kind of functionality added.

    opened by DvdGiessen 3
  • How to use $check in InterruptibleProcessor.php?

    How to use $check in InterruptibleProcessor.php?

    Hi first let me thank you for this code, seems like this is exactly what I was looking for! I just wanted to implement some kind of dependency for the individual stages of a pipeline with some extensive try catch error handling but then stumbled across the InterruptibleProcessor.php and I was wondering if there might be a better way to do that, but I am not sure how to use the $check callable - do I understand the code right that it allows for one $check callable for each pipeline and not for one $check for each $stage?

    I just want to let pipelines stop or skip or trigger a different pipeline based on the results of one stage.

    Also I am wondering: what would be the best way to build fallback chains? Something like "if stage_x fails, try again with $stage_y" or "if $stage_x produces result x skip to stage_z"?

    I am just experimenting to learn if it might make sense to transform an existing large if-then-else monstrosity into a few pipelines and do not immediately see how to implement such kind of fallbacks. Also of course I would like to avoid writing lots of try catch error handling code - the result should be less code than the if-then-else horror I would like to replace... :)

    If you could find the time for providing a little example how you would implement such a thing, it would be a great help! Thanks for your attention!

    opened by wowcut 1
  • Pass any variables between stages

    Pass any variables between stages

    Work example:

    require __DIR__ . '/vendor/autoload.php';
    
    use League\Pipeline\Pipeline;
    
    $pipeline = (new Pipeline)
        ->pipe(function ($payload, $dto) {
            $dto['ten'] = 10;
            return $payload * 2;
        })
        ->pipe(function ($payload, $dto) {
            return $payload + $dto['ten'];
        });
    
    echo $pipeline->process(5, new ArrayObject());
    // 20
    

    Also I added docblocks for better IDE autocomplete. Full backward compatibility.

    opened by roquie 0
  • Add variadic variable to __invoke

    Add variadic variable to __invoke

    Hi! I suggest to add variadic variable to __invoke() method.

    Example usages (DTO - Data Transfer Object):

    $pipeline = (new Pipeline)
    ->pipe(function ($payload, DTO $dto) {
        $dto['ten'] = 10;
        return $payload * 2;
    })
    ->pipe(function ($payload, DTO $dto) {
        return $payload + $dto['key'];
    });
    
    $pipeline->process(5, new DTO);
    //returns 20
    

    process method has ...$params

    //https://github.com/thephpleague/pipeline/blob/master/src/Pipeline.php
    /**
         * Process the payload.
         * @param $payload
         * @return mixed
         */
        public function process($payload, ...$params)
        {
            foreach ($this->stages as $stage) {
                $payload = $stage($payload, ...$params);
            }
            return $payload;
        }
    

    My example (old code, before update): https://github.com/Roquie/pipeline/commits/master

    Note: PHP 5.6 required. What do you think about it?

    enhancement 
    opened by roquie 4
Owner
The League of Extraordinary Packages
A group of developers who have banded together to build solid, well tested PHP packages using modern coding standards.
The League of Extraordinary Packages