Hi, I'm trying to migrate from Amp\Sync\ConcurrentIterator\each()
to Amp\Pipeline\concurrent()
, but can't get it works correctly.
Inspired by https://github.com/amphp/pipeline/blob/master/examples/concurrent.php i wrote something like
<?php declare(strict_types=1);
require __DIR__ . '/../vendor/autoload.php';
use Amp\Pipeline;
use Amp\File;
use Amp\ByteStream\ReadableResourceStream;
use Amp\Sync\LocalSemaphore;
$pipeline = Pipeline\fromIterable(function (): \Generator {
$input = new ReadableResourceStream(\fopen('/dev/zero', 'rb'), 512 << 10);
for ($i = 0; null !== $chunk = $input->read(), $i < 1000; ++$i) {
yield [$i, $chunk];
}
$input->close();
});
$results = $pipeline->pipe(Pipeline\concurrent(new LocalSemaphore(10),Pipeline\tap(function (array $input) {
[$part, $chunk] = $input;
echo "{$part}.";
File\write('/dev/null', $chunk);
})));
iterator_count($results);
When amount of chunks is small (e.g. 10) it works perfectly, with larger value (e.g. 100) it fails from time to time, but when conditions is $i < 1000
it fails constantly with exception:
Fatal error: Uncaught Error: Pipeline source destroyed without completing the pipeline in /app/vendor/amphp/pipeline/src/Emitter.php:34
Stack trace:
#0 /app/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(587): Amp\Pipeline\Emitter->__destruct()
#1 [internal function]: Revolt\EventLoop\Internal\AbstractDriver::Revolt\EventLoop\Internal\{closure}()
#2 /app/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(585): Fiber->resume(NULL)
#3 [internal function]: Revolt\EventLoop\Internal\AbstractDriver::Revolt\EventLoop\Internal\{closure}()
#4 /app/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(503): Fiber->resume(Array)
#5 /app/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(414): Revolt\EventLoop\Internal\AbstractDriver->invokeMicrotasks()
#6 /app/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(480): Revolt\EventLoop\Internal\AbstractDriver->invokeCallback(Object(Revolt\EventLoop\Internal\DeferCallback))
#7 /app/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(551): Revolt\EventLoop\Internal\AbstractDriver->tick()
#8 [internal function]: Revolt\EventLoop\Internal\AbstractDriver->Revolt\EventLoop\Internal\{closure}()
#9 /app/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(100): Fiber->resume()
#10 /app/vendor/revolt/event-loop/src/EventLoop/Internal/DriverSuspension.php(80): Revolt\EventLoop\Internal\AbstractDriver->Revolt\EventLoop\Internal\{closure}()
#11 /app/vendor/amphp/pipeline/src/Internal/EmitSource.php(110): Revolt\EventLoop\Internal\DriverSuspension->suspend()
#12 /app/vendor/amphp/pipeline/src/Internal/AutoDisposingPipeline.php(78): Amp\Pipeline\Internal\EmitSource->continue()
#13 [internal function]: Amp\Pipeline\Internal\AutoDisposingPipeline->getIterator()
#14 /app/bin/upload(29): iterator_count(Object(Amp\Pipeline\Internal\AutoDisposingPipeline))
#15 {main}
thrown in /app/vendor/amphp/pipeline/src/Emitter.php on line 34
Packages versions:
amphp/pipeline dev-master 9dcb792 Asynchronous iterators and operators.
amphp/sync v2.x-dev 8933d35 Mutex, Semaphore, and other synchronization tools for Amp.
Could you please advise what I'm doing wrong?