Asynchronous iterators and operators.

Overview

pipeline

Asynchronous iterators and operators.

Installation

This package can be installed as a Composer dependency.

composer require amphp/pipeline

Versioning

amphp/sync follows the semver semantic versioning specification like all other amphp packages.

Security

If you discover any security related issues, please email [email protected] instead of using the issue tracker.

License

The MIT License (MIT). Please see LICENSE for more information.

Comments
  • "Pipeline source destroyed without completing the pipeline" when walking through iterator concurrently.

    Hi, I'm trying to migrate from Amp\Sync\ConcurrentIterator\each() to Amp\Pipeline\concurrent(), but can't get it works correctly. Inspired by https://github.com/amphp/pipeline/blob/master/examples/concurrent.php i wrote something like

    <?php declare(strict_types=1);
    
    require __DIR__ . '/../vendor/autoload.php';
    
    use Amp\Pipeline;
    use Amp\File;
    use Amp\ByteStream\ReadableResourceStream;
    use Amp\Sync\LocalSemaphore;
    
    $pipeline = Pipeline\fromIterable(function (): \Generator {
        $input = new ReadableResourceStream(\fopen('/dev/zero', 'rb'), 512 << 10);
    
        for ($i = 0; null !== $chunk = $input->read(), $i < 1000; ++$i) {
            yield [$i, $chunk];
        }
    
        $input->close();
    });
    
    $results = $pipeline->pipe(Pipeline\concurrent(new LocalSemaphore(10),Pipeline\tap(function (array $input) {
        [$part, $chunk] = $input;
    
        echo "{$part}.";
    
        File\write('/dev/null', $chunk);
    })));
    
    iterator_count($results);
    

    When amount of chunks is small (e.g. 10) it works perfectly, with larger value (e.g. 100) it fails from time to time, but when conditions is $i < 1000 it fails constantly with exception:

    Fatal error: Uncaught Error: Pipeline source destroyed without completing the pipeline in /app/vendor/amphp/pipeline/src/Emitter.php:34
    Stack trace:
    #0 /app/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(587): Amp\Pipeline\Emitter->__destruct()
    #1 [internal function]: Revolt\EventLoop\Internal\AbstractDriver::Revolt\EventLoop\Internal\{closure}()
    #2 /app/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(585): Fiber->resume(NULL)
    #3 [internal function]: Revolt\EventLoop\Internal\AbstractDriver::Revolt\EventLoop\Internal\{closure}()
    #4 /app/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(503): Fiber->resume(Array)
    #5 /app/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(414): Revolt\EventLoop\Internal\AbstractDriver->invokeMicrotasks()
    #6 /app/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(480): Revolt\EventLoop\Internal\AbstractDriver->invokeCallback(Object(Revolt\EventLoop\Internal\DeferCallback))
    #7 /app/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(551): Revolt\EventLoop\Internal\AbstractDriver->tick()
    #8 [internal function]: Revolt\EventLoop\Internal\AbstractDriver->Revolt\EventLoop\Internal\{closure}()
    #9 /app/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(100): Fiber->resume()
    #10 /app/vendor/revolt/event-loop/src/EventLoop/Internal/DriverSuspension.php(80): Revolt\EventLoop\Internal\AbstractDriver->Revolt\EventLoop\Internal\{closure}()
    #11 /app/vendor/amphp/pipeline/src/Internal/EmitSource.php(110): Revolt\EventLoop\Internal\DriverSuspension->suspend()
    #12 /app/vendor/amphp/pipeline/src/Internal/AutoDisposingPipeline.php(78): Amp\Pipeline\Internal\EmitSource->continue()
    #13 [internal function]: Amp\Pipeline\Internal\AutoDisposingPipeline->getIterator()
    #14 /app/bin/upload(29): iterator_count(Object(Amp\Pipeline\Internal\AutoDisposingPipeline))
    #15 {main}
      thrown in /app/vendor/amphp/pipeline/src/Emitter.php on line 34
    

    Packages versions:

    amphp/pipeline         dev-master 9dcb792 Asynchronous iterators and operators.
    amphp/sync             v2.x-dev 8933d35   Mutex, Semaphore, and other synchronization tools for Amp.
    

    Could you please advise what I'm doing wrong?

    opened by hlib-kuznetsov 2
  • Remove DisposedException

    Remove DisposedException

    This removes the need to wrap Queue::push() in try/catch if the iterator is disposed. If the producer cares if the iterator is disposed, they can check with Queue::isDisposed(), but often piping into a blackhole is acceptable (e.g., Pushing websocket frame data into a message stream. The data must be consumed anyway, and it doesn't matter if no one is listening on the other end).

    If an iterable is used as the source, iteration stops once the iterator is disposed.

    I have verified the behavior here works well in the postgres and websocket libs, which make heavy use of this library.

    opened by trowski 1
  • Add isComplete() to ConcurrentIterator

    Add isComplete() to ConcurrentIterator

    This adds two methods to the ConcurrentIterator interface:

    • hasPending() returns true if there are buffered values available on the iterator, false otherwise.
    • isConsumed() returns true once the iterator has been completed or disposed.
    opened by trowski 1
  • Review: fromIterable

    Review: fromIterable

    I guess this could move to a static method on Pipeline? Maybe even split it into fromClosure and fromIterable?

    /** @psalm-suppress RedundantConditionGivenDocblockType */
    if (!$iterable instanceof \Generator) {
        $iterable = (static fn () => yield from $iterable)();
    }
    

    I'm not sure why this code exists.

    Our current intermediate operations do all nothing until there's a terminal operation, but fromIterable starts iterating right away currently.

    opened by kelunik 1
  • Remove operator API

    Remove operator API

    The operator API isn't really nice to use, as it requires wrapping everything in ->pipe(...).

    The new API makes things easier to use, while keeping it possible to add new operators for us. ConcurrentIterator is the new low level interface for concurrent iteration.

    I think the concurrent.php example illustrates this quite well:

    image

    opened by kelunik 0
  • Support null by use of fiber locals

    Support null by use of fiber locals

    This introduces a separate get() method like the previous getCurrent() method in our Amp v2 Iterator. continue() can still be called concurrently.

    This allows using null like any other value instead of special casing it for an end marker.

    opened by kelunik 0
Releases(v1.0.0)
  • v1.0.0(Dec 23, 2022)

    Initial stable release 🎉

    Changes from 1.0.0 Beta 7

    • Marked ConcurrentArrayIterator, ConcurrentChainedIterator, and ConcurrentIterableIterator as @internal. Instead of these classes, use Pipeline::fromIterable() or Pipeline::concat()
    • Pipeline::concat() now accepts an array of any iterable, not only other Pipeline objects
    Source code(tar.gz)
    Source code(zip)
  • v1.0.0-beta.7(Nov 18, 2022)

    • Removed failing a Queue that is destructed without being completed. PHP's random destruct order sometimes will lead to the Queue destructor being invoked before another destructor that would have completed the queue.
    Source code(tar.gz)
    Source code(zip)
  • v1.0.0-beta.6(Nov 7, 2022)

  • v1.0.0-beta.5(Apr 10, 2022)

    • Added isComplete() to the ConcurrentIterator interface that returns true when the iterator has been completed (either successfully or with an error) and no further values are pending)
    • Fixed an issue where a reference to the prior value emitted on a ConcurrentIterator was held while awaiting the next value.
    Source code(tar.gz)
    Source code(zip)
  • v1.0.0-beta.4(Feb 24, 2022)

    • PHP 8.1 is now required.
    • Fixed circular references in ConcurrentIterableIterator and ConcurrentFlatMapIterator that prevented quick garbage collection, particularly problematic with instances created from Pipeline::fromIterable() using a generator.
    Source code(tar.gz)
    Source code(zip)
  • v1.0.0-beta.3(Jan 30, 2022)

    • Pipeline has been changed from an interface to a final class. ConcurrentIterator acts as the interface replacement
    • Pipeline::pipe() has been removed in favor of operator methods directly on Pipeline, such as map() and filter()
    • Emitter has been renamed to Queue
      • yield() has been renamed to push()
      • emit() has been renamed to pushAsync()
    • All functions in the Amp\Pipeline have been removed.
      • fromIterable() is available as Pipeline::fromIterable()
      • concat() is now Pipeline::concat()
      • Most operators are available directly on Pipeline
    • Added Pipeline::generate() that invokes a closure to create each pipeline value.

    Example of using Pipeline for concurrency:

    use Amp\Pipeline\Pipeline;
    use function Amp\delay;
    
    $pipeline = Pipeline::fromIterable(function (): \Generator {
        for ($i = 0; $i < 100; ++$i) {
            yield $i;
        }
    });
    
    $results = $pipeline->concurrent(10)
            ->tap(fn () => delay(\random_int(1, 10) / 10))  // Delay for 0.1 to 1 seconds, simulating I/O.
            ->map(fn (int $input): int => $input * 10)
            ->filter(fn (int $input) => $input % 3 === 0); // Filter only values divisible by 3.
    
    foreach ($results as $value) {
        echo $value, "\n";
    }
    
    Source code(tar.gz)
    Source code(zip)
  • v1.0.0-beta.2(Dec 10, 2021)

    • Pipeline back-pressure has been modified to be relieved immediately when the value is consumed from the pipeline. Prior, another value had to be requested from the pipeline before back-pressure was relieved.
    • Removed AsyncGenerator class. Instead, fromIterable now also accepts a Closure returning an iterable, which can be a generator function.
    • concurrentOrdered has been removed and concurrentUnordered renamed to concurrent. Unfortunately, ordered iteration broke if using operators that would not always emit a value, so support has been dropped.
    • Added an optional $bufferSize parameter to the Emitter constructor that sets a number of items that can be emitted before awaiting back-pressure from the pipeline consumer. This value defaults to 0, which will await back-pressure with every emitted value.
    Source code(tar.gz)
    Source code(zip)
  • v1.0.0-beta.1(Dec 7, 2021)

Owner
Amp
Asynchronous Multitasking PHP
Amp
Asynchronous WebSocket client

Pawl An asynchronous WebSocket client in PHP Install via composer: composer require ratchet/pawl Usage Pawl as a standalone app: Connect to an echo s

Ratchet 528 Dec 15, 2022
Artax is an asynchronous HTTP client for PHP based on Amp

Artax is an asynchronous HTTP client for PHP based on Amp. Its API simplifies standards-compliant HTTP resource traversal and RESTful web service consumption without obscuring the underlying protocol. The library manually implements HTTP over TCP sockets; as such it has no dependency on ext/curl.

AMPHP 21 Dec 14, 2022
MOP is a php query handling and manipulation library providing easy and reliable way to manipulate query and get result in a fastest way

Mysql Optimizer mysql optimizer also known as MOP is a php query handling and manipulation library providing easy and reliable way to manipulate query

null 2 Nov 20, 2021
This project is very diverse and based upon many languages and libraries such as C++, Python, JavaScript, PHP and MQTT

ADMS-Real-time-project This project is very diverse and based upon many languages and libraries such as C++, Python, JavaScript, PHP and MQTT Advance_

Nitya parikh 1 Dec 1, 2021
Private, self-hosted Composer/Satis repository with unlimited private and open-source packages and support for Git, Mercurial, and Subversion.

Private, self-hosted Composer/Satis repository with unlimited private and open-source packages and support for Git, Mercurial, and Subversion. HTTP API, HTTPs support, webhook handler, scheduled builds, Slack and HipChat integration.

Łukasz Lach 112 Nov 24, 2022
YesilCMS is based on BlizzCMS and specifically adapted for VMaNGOS Core and includes new features and many bug fixes.

YesilCMS · YesilCMS is based on BlizzCMS and specifically adapted for VMaNGOS Core and includes new features and many bug fixes. Features In addition

yesilmen 12 Jan 4, 2023
WPForms coding standards are based on the WordPress Coding Standards and the PHPCompatibility Coding Standards and help create strict and high-quality code.

WPForms coding standards are based on the WordPress Coding Standards and the PHPCompatibility Coding Standards and help create strict and high-quality code.

Awesome Motive, Inc. 7 Nov 29, 2022
:globe_with_meridians: List of all countries with names and ISO 3166-1 codes in all languages and data formats.

symfony upgrade fixer • twig gettext extractor • wisdom • centipede • permissions handler • extraload • gravatar • locurro • country list • transliter

Saša Stamenković 5k Dec 22, 2022
A redacted PHP port of Underscore.js with additional functions and goodies – Available for Composer and Laravel

Underscore.php The PHP manipulation toolbelt First off : Underscore.php is not a PHP port of Underscore.js (well ok I mean it was at first). It's does

Emma Fabre 1.1k Dec 11, 2022
:date: The VObject library for PHP allows you to easily parse and manipulate iCalendar and vCard objects

sabre/vobject The VObject library allows you to easily parse and manipulate iCalendar and vCard objects using PHP. The goal of the VObject library is

sabre.io 532 Dec 25, 2022
The Current US Version of PHP-Nuke Evolution Xtreme v3.0.1b-beta often known as Nuke-Evolution Xtreme. This is a hardened version of PHP-Nuke and is secure and safe. We are currently porting Xtreme over to PHP 8.0.3

2021 Nightly Builds Repository PHP-Nuke Evolution Xtreme Developers TheGhost - Ernest Allen Buffington (Lead Developer) SeaBeast08 - Sebastian Scott B

Ernest Buffington 7 Aug 28, 2022
Get mobile app version and other related data from Google Play Store, Apple App Store and Huawei AppGallery

Mobile App Version Get mobile app version and other related data from Google Play Store, Apple App Store and Huawei AppGallery. Installation Add to co

Omer Salaj 11 Mar 15, 2022
Fact Extraction and VERification Over Unstructured and Structured information

Repository for Fact Extraction and VERification Over Unstructured and Structured information (FEVEROUS), used for the FEVER Workshop Shared Task at EMNLP2021.

Rami 49 Dec 9, 2022
JSON schema models and generated code to validate and handle various data in PocketMine-MP

DataModels JSON schema models and generated code to validate and handle various data in PocketMine-MP This library uses php-json-schema-model-generato

PMMP 2 Nov 9, 2022
Allows generate class files parse from json and map json to php object, including multi-level and complex objects;

nixihz/php-object Allows generate class files parse from json and map json to php object, including multi-level and complex objects; Installation You

zhixin 2 Sep 9, 2022
A story about SQLinject and a demonstration of some vulnerabilities and tools

Предысловие Если не умру,буду дальше развивать эту тему Идея которая пришла мне в голову,<<А почему бы не рассказать об уязвимостях SQL?>>.Поэтому я б

null 0 Jun 11, 2022
actionMaster is a new faction plugin that aims at flexibility and customization of the plugin by the user and the developers.

FactionMaster is a new faction plugin that aims at flexibility and customization of the plugin by the user and the developers. It includes all the basic functionality of a faction plugin and data storage in MySQL or SQLITE. This is done by adding an extension system and a translation system. FactionMaster has a will of accessibility to the players and especially not to have to remember a lot of commands to play, all is done via interface.

FactionMaster 21 Dec 26, 2022
Nextcloud AIO stands for Nextcloud All In One and provides easy deployment and maintenance with most features included in this one Nextcloud instance.

Nextcloud All In One Beta This is beta software and not production ready. But feel free to use it at your own risk! We expect there to be rough edges

Nextcloud 1.1k Jan 4, 2023
Easy management of Virtualization technologies including KVM, Xen, OpenVZ, Virtuozzo, and LXC/LXD including unified commands, monitoring, template management, and many more features.

ProVirted About Easy management of Virtualization technologies including KVM, Xen, OpenVZ, Virtuozzo, and LXC/LXD including unified commands, monitori

null 2 Aug 22, 2022