A non-blocking stream abstraction for PHP based on Amp.

Overview

byte-stream

amphp/byte-stream is a stream abstraction to make working with non-blocking I/O simple.

Installation

This package can be installed as a Composer dependency.

composer require amphp/byte-stream

Requirements

  • PHP 7.0+

Documentation

Documentation is bundled within this repository in the ./docs directory.

Versioning

amphp/byte-stream follows the semver semantic versioning specification like all other amphp packages.

Security

If you discover any security related issues, please email [email protected] instead of using the issue tracker.

License

The MIT License (MIT). Please see LICENSE for more information.

Comments
  • feof may hang

    feof may hang

    I encountered hanging feof resulting in StreamException("The stream was closed by the peer"). This happens when doing requests in Kubernetes to Facebook.

    Omitting the feof check "fixes" it.

    A workaround (and better code) would be to not check on @feof before the write but setting an error handler for the fwrite and cathing warnings and converting them to stream exceptions.

    Example:

                        $eh = set_error_handler([$this, 'onSocketWriteError']);
                        
                        if ($chunkSize) {
                            $written = \fwrite($stream, $data, $chunkSize);
                        } else {
                            $written = \fwrite($stream, $data);
                        }
    
                        set_error_handler($eh);
    
    bug upstream 
    opened by brstgt 16
  • As soon as ResourceInputStream detects EOF, it ignores subsequent requests to read the resource

    As soon as ResourceInputStream detects EOF, it ignores subsequent requests to read the resource

    Since a4739c8a6dd96d772cfe49a95d5dfda8c7417595, following hangs with no output

    <?php
    
    use Amp\ByteStream\ResourceInputStream;
    use Amp\ByteStream\ResourceOutputStream;
    use Amp\Loop;
    
    require __DIR__ . "/../vendor/autoload.php";
    
    $writeCoRoutine2 = \Amp\asyncCoroutine(function() {
        $middleReadStream = new ResourceInputStream(fopen('middle', 'r'));
    
        while (true) {
            echo yield $middleReadStream->read();
        }
    });
    
    Loop::run(function () use ($writeCoRoutine2) {
        \Amp\ByteStream\pipe(
            new ResourceInputStream(fopen('/home/g9735/Downloads/somefile.avi', 'r')),
            new ResourceOutputStream(fopen('middle', 'w'))
        );
    
        $writeCoRoutine2();
    });
    

    as soon as read() in $writeCoRoutine2 is executed, event loop will never return to pipe

    wontfix 
    opened by ostrolucky 15
  • Throw new exception carrying $chunk when fwrite returns 0

    Throw new exception carrying $chunk when fwrite returns 0

    Fixes #51

    I tried hard to write a test for simulating such write failure, but any trick I tried, stream_select will not work with those. Another option would be to use unqualified fwrite call, but I don't think that would be approved.

    opened by ostrolucky 14
  • Issue with suppressed errors and empty error_get_last

    Issue with suppressed errors and empty error_get_last

    https://github.com/amphp/byte-stream/blob/6d6c89f58c213e600e2ab5e3b1678fb61333eeb7/lib/ResourceOutputStream.php#L70

    I currently debugged an issue in IpcLogger of aerys. It hang with 100% CPU as the socket was not detected as dead. In my setup @fwrite does NOT populate error_get_last for some reason. Furthermore it also was not populated if the registered error handler returned 'true'.

    The only really reliable solution (in IpcLogger) was like that:

            $eh = set_error_handler([$this, 'onDeadIpcSock']);
            $bytes = fwrite($this->ipcSock, $this->writeBuffer);
            set_error_handler($eh);
    

    If it affects that code then I am pretty sure it also affects the byte-stream code, right? Not sure if this is a bug in PHP because in other places, the @ / error_reporting(0) DID populate the error_get_last

    Any other idea to catch fwrite errors in a better way?

    bug 
    opened by brstgt 13
  • ResourceOutputStream interrupts writing without consumer giving information what was written/unwritten

    ResourceOutputStream interrupts writing without consumer giving information what was written/unwritten

    I'm dealing with situation that following line https://github.com/amphp/byte-stream/blob/6bbfcb6f47e92577e739586ba0c87e867be70a23/lib/ResourceOutputStream.php#L95 throws exception, but doesn't expose information what was written/what is left to write, which makes it very difficult to handle such situation in case of incomplete chunk write. My aim is handling gracefully such exception and re-writing this data to different stream.

    I suggest to create custom exception which carries $data it was not succeeded to write.

    feature request 
    opened by ostrolucky 11
  • Replace hard final with soft, annotation-based @final

    Replace hard final with soft, annotation-based @final

    I need to depend on ResourceOutputStream, because I rely on methods missing from interface. And I need to mock it for my tests. Hence I am proposing to drop hard final and replace it with annotation based final, which is same strategy Symfony follows. This gives you best from both worlds - you are still not obligated to keep an eye for inheritance based BC issues, but now people can mock it

    opened by ostrolucky 9
  • Prevent infinite loop on writing to broken pipes

    Prevent infinite loop on writing to broken pipes

    The assertion "Trying to write on a previously fclose()'d resource. Do NOT manually fclose() resources the loop still has a reference to." happens to me even if I have not a single fclose(), stream_XXX() or socket_XXX() call in my code.

    My client and server thin wrappers around Server::listen() and connect() are running with Docker containers on Windows. So it might be related to docker networking internals, breaking sockets under some circumstances.

    It happens occassionaly. I spent a whole day to find steps to reproduce it with no luck.

    Anyway, since the case does really happen sometimes in runtime, I think it would be better to handle it and prevent the infinite loop.

    opened by dangoodman 8
  • Throw new exception when fwrite fails, carrying chunk position

    Throw new exception when fwrite fails, carrying chunk position

    Fixes #51

    Test script below. It assumes you execute it in different mount than your /tmp/. Feel free to modify and use it to copy some actual files instead of str_repeat and change it so it does not fill up both disks. Ideally run from tmpfs.

    <?php
    
    use Amp\ByteStream\CannotWriteChunkException;
    use Amp\ByteStream\ResourceOutputStream;
    use Amp\Loop;
    
    require __DIR__ . '/../vendor/autoload.php';
    
    Loop::run(function () {
        $chunkSize = null;
        $out = new ResourceOutputStream(\fopen('foo', 'wb'), $chunkSize);
        $fallback = new ResourceOutputStream(\fopen('/tmp/bar', 'wb'), $chunkSize);
        $str = str_repeat('a', 8192);
        $written = 0;
    
        try {
    	    while (($size = yield $out->write($str)) !== null) {
    	        $written += $size;
    	    }
        } catch (CannotWriteChunkException $exception) {
            $written += $exception->getFailingPositionInChunk();
            $written += yield $fallback->write(substr($str, $exception->getFailingPositionInChunk()));
            $out = $fallback;
    	}
    
    	echo 'written '.$written.PHP_EOL.'file sizes: '.(filesize('foo') + filesize('/tmp/bar'));
    });
    

    I don't see a good way how to write useful phpunit test for this, gave a lot of tries. Closest I could come up with is

    public function testCannotWriteChunkException()
    {
        $server = \stream_socket_server('localhost:8111', $errno, $errstr, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN);
    
        $watcherId = Loop::onReadable($server, static function () use ($server, &$watcherId) {
            fread(\stream_socket_accept($server, 0), 1);
            Loop::cancel($watcherId);
        });
    
        $stream = new ResourceOutputStream(stream_socket_client('localhost:8111'));
    
        self::expectException(CannotWriteChunkException::class);
        wait($stream->write(str_repeat('-', 8 ** 7)));
    }
    

    But even that works only on MacOS and Windows, not Linux

    opened by ostrolucky 8
  • feof(): supplied resource is not a valid stream resource

    feof(): supplied resource is not a valid stream resource

    kelunik@kelunik ❯ ~/GitHub/amphp/artax ❯ 10:24:18 ❯ master
    $ php examples/1-get-request.php
    HTTP/1.1 200 OK
    
    {
      "user-agent": "Mozilla/5.0 (compatible; Artax)"
    }
    
    PHP Warning:  feof(): supplied resource is not a valid stream resource in /home/kelunik/GitHub/amphp/artax/vendor/amphp/byte-stream/lib/ResourceInputStream.php on line 60
    

    I guess we can just suppress that one?

    bug 
    opened by kelunik 8
  • Problem writing to stream

    Problem writing to stream

    I see this in my logs:

    Amp\ByteStream\StreamException Failed to write to stream after multiple attempts; fwrite(): send of 7302 bytes failed with errno=32 Broken pipe

    This happens like every few seconds, I can try to provide a reproducible script, but that may take some time (currently on a business trip).

    But I thought maybe you have an idea what to check.

    question 
    opened by prolic 6
  • 100% CPU with PHP built-in streams

    100% CPU with PHP built-in streams

    php://temp and php://memory streams result in 100% CPU usage. It's probably due to them not being actual streams that can be used with stream_select, so there should probably be a test for that.

    https://gist.github.com/umbri/2a3e558d487b5e6756768547e660f95a

    <?php
    
    use Amp\ByteStream\ResourceInputStream;
    use Amp\ByteStream\ResourceOutputStream;
    use Amp\Loop;
    
    require_once __DIR__ . "/../vendor/autoload.php";
    
    Loop::run(function () {
        $memory = fopen("php://temp", "r+");
    
        $stdin = new ResourceInputStream($memory);
        $stdout = new ResourceOutputStream($memory);
    
        Loop::delay(1000, function () use ($stdout) {
            echo "write" . PHP_EOL;
    
            $stdout->write("Hello, World!");
        });
    
        while (($chunk = yield $stdin->read()) !== null) {
            echo "data -> $chunk" . PHP_EOL;
        }
    });
    
    opened by kelunik 6
  • Low bandwidth v2.0.0-beta.13, v2.0.0-beta.14

    Low bandwidth v2.0.0-beta.13, v2.0.0-beta.14

    I use Ridge V2 for RabbitMQ. When I upgrade the package from version 12 to version v2.0.0-beta.14, message processing on the production server is greatly reduced (see point 1 in the figure). When I return version v2.0.0-beta.12 (see point 2 in the figure), the processing speed returns to the required value. Снимок экрана 2022-12-28 в 16 12 56

    opened by PNixx 4
  • Assertion fail: Trying to read from a previously fclose()'d resource (Windows)

    Assertion fail: Trying to read from a previously fclose()'d resource (Windows)

    When trying to do a simple HTTP request with HttpClient, I invariable receive a DnsException thrown by Amp\Dns\Rfc1035StubResolver:197, which is part of a MultiReasonException containing two such exceptions, whose previous exception is an AssertionError from byte-stream\lib\ResourceInputStream.php:135 where an assertion containing the following message fails:

    Trying to read from a previously fclose()'d resource. Do NOT manually fclose() resources the loop still has a reference to.

    Libraries

    amphp/amp                             v2.5.2
    amphp/beanstalk                       v0.3.2
    amphp/byte-stream                     v1.8.1
    amphp/cache                           v1.4.0
    amphp/dns                             v1.2.3
    amphp/hpack                           v3.1.0
    amphp/http                            v1.6.3
    amphp/http-client                     v4.5.5
    amphp/http-client-cookies             v1.1.0
    amphp/parallel                        v1.4.0
    amphp/parser                          v1.0.0
    amphp/postgres                        v1.3.3
    amphp/process                         v1.1.1
    amphp/serialization                   v1.0.0
    amphp/socket                          v1.1.3
    amphp/sql                             v1.0.1
    amphp/sql-common                      v1.1.2
    amphp/sync                            v1.4.0
    amphp/uri                             v0.1.4
    amphp/windows-registry                v0.3.3
    
    opened by Bilge 3
  • Ability to read particular length from stream

    Ability to read particular length from stream

    Hi. I think it's would be great to have possibility to read chunks with specified length since commonly dealing with binary protocols looks like

    $lengthBytes = fread($fd, 4); // read Int32 Length
    $length = unpack('Llength', $lengthBytes)['length'];
    $packet = fread($fd, $length);
    

    Not sure about possibility to make InputStream behave like public function read(?int $length = null): Promise;, but maybe some buffering class similar to LineReader would be useful, for example:

    class LengthReader
    {
        public function __construct(InputStream $inputStream);
    
        public function readLine(int $length): Promise;
    }
    
    feature request 
    opened by userqq 3
Releases(v2.0.0-beta.14)
  • v2.0.0-beta.14(Dec 25, 2022)

  • v2.0.0-beta.13(Nov 16, 2022)

  • v2.0.0-beta.12(Nov 7, 2022)

  • v2.0.0-beta.11(Sep 11, 2022)

    • Add CompressingReadableStream (https://github.com/amphp/byte-stream/pull/99)
    • Add DecompressingWritableStream (https://github.com/amphp/byte-stream/pull/99)
    • Improve StreamChannel implementation to properly support backpressure, cancellation and avoid keeping the read watcher alive (https://github.com/amphp/byte-stream/pull/100)
    Source code(tar.gz)
    Source code(zip)
  • v2.0.0-beta.10(Jun 25, 2022)

    • Reverted ReadableIterableStream transforming exceptions from the given iterable into StreamException, as this change made the class less flexible for implementing specialized streams throwing domain-specific exceptions.
    Source code(tar.gz)
    Source code(zip)
  • v2.0.0-beta.9(Jun 16, 2022)

    • Fixed cancelling a read in ReadableIterableStream causing all subsequent reads to fail
    • If the iterable in ReadableIterableStream throws an exception that is not an instance of StreamException, a StreamException is thrown with the prior thrown exception set as the previous exception
    • Renamed the $options param of parseLineDelimitedJson to $flags to match the json_decode function.
    Source code(tar.gz)
    Source code(zip)
  • v2.0.0-beta.8(Apr 3, 2022)

  • v2.0.0-beta.7(Mar 26, 2022)

    • Renamed ClosableStream to Closable and added an onClose method to register callbacks that are invoked when the stream closes (https://github.com/amphp/byte-stream/pull/97)
    • Improved exception backtraces for write failures in WritableResourceStream (https://github.com/amphp/byte-stream/pull/96)
    Source code(tar.gz)
    Source code(zip)
  • v2.0.0-beta.6(Mar 5, 2022)

  • v2.0.0-beta.5(Feb 7, 2022)

  • v2.0.0-beta.4(Feb 2, 2022)

    • Compatibility with revolt/event-loop v0.2.x
    • Added BufferedReader, a helper class for reading from ReadableStream using fixed lengths or delimiters found within the stream (#94)
    • Renamed IterableStream to ReadableIterableStream
    • Renamed EmitterStream to WritableIterableStream and removed the constructor Emitter param. Use WritableIterableStream::getIterator() to retrieve the iterable for reading
    • Fixed error handling writing to AsyncWriter after the stream has closed (#93)
    Source code(tar.gz)
    Source code(zip)
  • v2.0.0-beta.3(Dec 22, 2021)

  • v2.0.0-beta.2(Dec 17, 2021)

    • Removed the optional $bytes parameter from WritableStream::end().
    • ReadableResourceStream::read() now performs a read immediately on the stream to avoid leaving data in PHP's internal stream buffers.
    • Renamed $length param on ReadableResourceStream::read() to $limit to better reflect its purpose.
    • Added a $limit param to Amp\ByteStream\buffer() and Payload::buffer() to set a limit on the maximum bytes that can be buffered.
    Source code(tar.gz)
    Source code(zip)
  • v2.0.0-beta.1(Dec 11, 2021)

    Initial beta release compatible with amphp v3.

    There are a number of renaming and compatibility breaks with v1.x.

    • Added interfaces ClosableStream and ResourceStream
    • InputStream has been renamed to ReadableStream and now extends ClosableStream. The read() method now supports an optional Cancellation parameter.
    • OutputStream has been renamed to WritableStream and now extends ClosableStream.
    • IteratorStream has been repalced by IterableStream, which accepts any iterable of strings (particularly useful with Generator or Pipeline).
    • ResourceInputStream has been renamed to ReadableResourceStream. The read() method has an additional, optional $length parameter to specify the maximum number of bytes to read.
    • ResourceOutputStream has been renamed to WritableResourceStream.
    • InMemoryStream has been renamed to ReadableBuffer.
    • OutputBuffer has been renamed to WritableBuffer.
    • Payload now accepts a string in addition to a ReadableStream. Payload::buffer() may only be called once.
    • Added Pipe and EmitterStream classes.
    • The zlib streams have been moved into the Compression sub-namespace and renamed to CompressingWritableStream and DecompressingReadableStream.
    • The base-64 streams have been renamed to reflect the new interface names.
    • InputStreamChain has been renamed to ReadableStreamChain.
    Source code(tar.gz)
    Source code(zip)
  • v1.8.1(Mar 30, 2021)

    • Fixed ResourceInputStream::close() if resource is already closed on PHP 8
    • Fixed ResourceOutputStream::close() if resource is already closed on PHP 8
    Source code(tar.gz)
    Source code(zip)
  • v1.8.0(Jul 23, 2020)

  • v1.7.3(Apr 4, 2020)

  • v1.7.2(Jan 29, 2020)

  • v1.7.1(Oct 28, 2019)

  • v1.7.0(Oct 7, 2019)

  • v1.6.1(Jul 26, 2019)

    • Fixed ResourceOutputStream::write() or ResourceOutputStream::end() when writing to a peer closed stream throwing from the method call instead of failing the returned promise.
    Source code(tar.gz)
    Source code(zip)
  • v1.6.0(Jun 3, 2019)

    • Added setChunkSize() to ResourceInputStream and ResourceOutputStream (#50)
    • Added getInputBufferStream() and getOutputBufferStream() (#61)
    • Fixed closed detection for ResourceOutputStream, allowing usage with systemd (#58)
    Source code(tar.gz)
    Source code(zip)
  • v1.5.1(Dec 27, 2018)

  • v1.5.0(Oct 22, 2018)

    • Added functions getStdin(), getStdout(), and getStderr() which each return a single instance of ResourceInputStream or ResourceOutputStream associated with the currently active event loop for the standard streams. (#44)
    • Fixed a performance issue when writing large chunks to a ResourceOutputStream. (#41, #42)
    Source code(tar.gz)
    Source code(zip)
  • v1.4.0(Oct 3, 2018)

  • v1.3.1(Apr 4, 2018)

  • v1.3.0(Mar 13, 2018)

  • v1.2.5(Mar 11, 2018)

  • v1.2.4(Mar 9, 2018)

    • Fixed issue with immediate reads, which could result in a blocking loop in combination with immediate writes. Such a situation is only likely on pipes or files, network sockets are probably not affected here, unless they receive data very quickly. Reads now deferred to the next tick in case they're immediate. (#37)
    Source code(tar.gz)
    Source code(zip)
  • v1.2.3(Mar 8, 2018)

    • Fixed wrong exception being thrown for writing to closed ResourceOutputStream (#35)
    • Fixed performance issue with STDIN on Windows. This might also improve performance for other streams.
    • Fixed condition to detect a broken pipe in ResourceOutputStream.
    Source code(tar.gz)
    Source code(zip)
Owner
Amp
Asynchronous Multitasking PHP
Amp
An event stream library based on tail

TailEventStream An event stream library based on tail. Note: I don't think you should use this library in a real project, but it's great for education

Matthias Noback 4 Feb 19, 2022
Laravel Ban simplify blocking and banning Eloquent models.

Laravel Ban Introduction Laravel Ban simplify management of Eloquent model's ban. Make any model bannable in a minutes! Use case is not limited to Use

cybercog 879 Dec 30, 2022
Laravel Nova Ban simplify blocking and banning Eloquent models.

Laravel Nova Ban Introduction Behind the scenes cybercog/laravel-ban is used. Contents Installation Usage Prepare bannable model Prepare bannable mode

cybercog 39 Sep 29, 2022
A simple and modern approach to stream filtering in PHP

clue/stream-filter A simple and modern approach to stream filtering in PHP Table of contents Why? Support us Usage append() prepend() fun() remove() I

Christian Lück 1.5k Dec 29, 2022
A simple package to forward Laravel application logs to a Kinesis stream

Laravel Monolog Kinesis Driver A simple package to forward Laravel application logs to a Kinesis stream. Installation Require the package with compose

Pod Point 34 Sep 6, 2022
Deploy and execute non-PHP AWS Lambda functions from your Laravel application.

Sidecar for Laravel Deploy and execute non-PHP AWS Lambda functions from your Laravel application. Read the full docs at hammerstone.dev/sidecar/docs.

Hammerstone 624 Dec 30, 2022
Barcode generator in PHP that is easy to use, non-bloated and framework independent.

PHP Barcode Generator This is an easy to use, non-bloated, framework independent, barcode generator in PHP. It creates SVG, PNG, JPG and HTML images,

Picqer 1.4k Jan 6, 2023
An abstraction layer for easily implementing industry-standard caching strategies

Laravel Model Repository This package provides an abstraction layer for easily implementing industry-standard caching strategies with Eloquent models.

null 44 Dec 31, 2022
An abstraction layer to get data from array or a file with dot-notation

Alex Unruh - Config This library is based on the Laravel config concept. It values performance and was built on top of the library Dflydev Dot Access

Alexandre Odair 3 May 20, 2022
A nice GUI for Laravel Artisan, ready out of the box, configurable and handy for non-CLI experienced developers.

Artisan UI A nice GUI for Laravel Artisan, ready out of the box, configurable and handy for non-CLI experienced developers. Supported commands must be

Pablo Leone 1 Dec 3, 2021
PHP package to help the development of Laravel-based Telegram bots

Laravel-telegram-bot Project description goes here. This description is usually two to three lines long. It should give an overview of what the projec

CC - UFFS 6 May 10, 2021
Laravel blade directives and php helpers for serverside rendered content, based on browser window size WITHOUT css

Laravel Window Size and Breakpoints Laravel blade directives and php helpers for server side rendered content, based on browser window size WITHOUT cs

Tina Hammar 7 Nov 23, 2022
SAML toolkit for Laravel based on OneLogin's SAML PHP Toolkit.

Laravel SAML SAML toolkit for Laravel based on OneLogin's SAML PHP Toolkit. Installation composer require overtrue/laravel-saml Configuration php arti

安正超 17 Jul 28, 2022
laravel - Potion is a pure PHP asset manager for Laravel 5 based off of Assetic.

laravel-potion Potion is a pure PHP asset manager for Laravel based off of Assetic. Description Laravel 5 comes with a great asset manager called Elix

Matthew R. Miller 61 Mar 1, 2022
Laravel blade directives and php helpers for serverside rendered content, based on browser window size WITHOUT css. Requires Livewire and AlpineJS.

Laravel Livewire Window Size and Breakpoints Laravel blade directives and php helpers for server side rendered content, based on browser window size W

Tina Hammar 15 Oct 6, 2022
Self-hosted CMS platform based on the Laravel PHP Framework.

October is a Content Management System (CMS) and web platform whose sole purpose is to make your development workflow simple again. It was born out of

October CMS 10.8k Jan 1, 2023
A collection of common algorithms implemented in PHP. The collection is based on "Cracking the Coding Interview" by Gayle Laakmann McDowell

PHPAlgorithms A collection of common algorithms implemented in PHP. The collection is based on "Cracking the Coding Interview" by Gayle Laakmann McDow

Doğan Can Uçar 921 Dec 18, 2022
Adds phone number functionality to Laravel based on the PHP port of Google's libphonenumber API by giggsey.

Laravel Phone Adds phone number functionality to Laravel based on the PHP port of Google's libphonenumber API by giggsey. Table of Contents Demo Insta

null 2.1k Jan 2, 2023
Adds phone number functionality to TYPO3 based on the PHP port of Google's libphonenumber API by giggsey

TYPO3 Phone Adds phone number functionality to TYPO3 based on the PHP port of Google's libphonenumber API by giggsey. Installation composer require si

Simon Schaufelberger 3 Oct 25, 2022