Production-ready, stable Kafka client for PHP

Overview

PHP Kafka client - php-rdkafka

Join the chat at https://gitter.im/arnaud-lb/php-rdkafka

Supported librdkafka versions: >= 0.11 Supported Kafka versions: >= 0.8 Supported PHP versions: 7.x .. 8.x

PHP-rdkafka is a stable, production-ready, long term support, and fast Kafka client for PHP based on librdkafka.

It supports PHP 7, PHP 8, PHP 5 (in older versions), all librdkafka versions since 0.11, all Kafka versions since 0.8. This makes it easy to deploy the extension in production.

The goal of the extension is to be a low-level un-opinionated librdkafka binding focused on production and long term support.

The high level and low level consumers, producer, and metadata APIs are supported.

Documentation is available here.

Table of Contents

  1. Installation
  2. Examples
  3. Usage
  4. Documentation
  5. Credits
  6. License

Installation

https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.setup.html

Examples

https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.examples.html

Usage

Configuration parameters used below can be found in Librdkafka Configuration reference

Producing

Creating a producer

For producing, we first need to create a producer, and to add brokers (Kafka servers) to it:

<?php
$conf = new RdKafka\Conf();
$conf->set('log_level', (string) LOG_DEBUG);
$conf->set('debug', 'all');
$rk = new RdKafka\Producer($conf);
$rk->addBrokers("10.0.0.1:9092,10.0.0.2:9092");

Producing messages

⚠️ Make sure that your producer follows proper shutdown (see below) to not lose messages.
Next, we create a topic instance from the producer:

<?php

$topic = $rk->newTopic("test");

From there, we can produce as much messages as we want, using the produce method:

<?php

$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");

The first argument is the partition. RD_KAFKA_PARTITION_UA stands for unassigned, and lets librdkafka choose the partition.
The second argument are message flags and should be either 0
or RD_KAFKA_MSG_F_BLOCK to block produce on full queue. The message payload can be anything.

Proper shutdown

This should be done prior to destroying a producer instance
to make sure all queued and in-flight produce requests are completed
before terminating. Use a reasonable value for $timeout_ms.
⚠️ Not calling flush can lead to message loss!

$rk->flush($timeout_ms);

In case you don't care about sending messages that haven't been sent yet, you can use purge() before calling flush():

// Forget messages that are not fully sent yet
$rk->purge(RD_KAFKA_PURGE_F_QUEUE);

$rk->flush($timeout_ms);

High-level consuming

The RdKafka\KafkaConsumer class supports automatic partition assignment/revocation. See the example here.

Low-level consuming (legacy)

ℹ️ The low-level consumer is a legacy API, please prefer using the high-level consumer

We first need to create a low level consumer, and to add brokers (Kafka servers) to it:

<?php
$conf = new RdKafka\Conf();
$conf->set('log_level', (string) LOG_DEBUG);
$conf->set('debug', 'all');
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("10.0.0.1,10.0.0.2");

Next, create a topic instance by calling the newTopic() method, and start consuming on partition 0:

<?php

$topic = $rk->newTopic("test");

// The first argument is the partition to consume from.
// The second argument is the offset at which to start consumption. Valid values
// are: RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);

Next, retrieve the consumed messages:

<?php

while (true) {
    // The first argument is the partition (again).
    // The second argument is the timeout.
    $msg = $topic->consume(0, 1000);
    if (null === $msg || $msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
        // Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
        continue;
    } elseif ($msg->err) {
        echo $msg->errstr(), "\n";
        break;
    } else {
        echo $msg->payload, "\n";
    }
}

Low-level consuming from multiple topics / partitions (legacy)

ℹ️ The low-level consumer is a legacy API, please prefer using the high-level consumer

Consuming from multiple topics and/or partitions can be done by telling librdkafka to forward all messages from these topics/partitions to an internal queue, and then consuming from this queue:

Creating the queue:

<?php
$queue = $rk->newQueue();

Adding topic partitions to the queue:

<?php

$topic1 = $rk->newTopic("topic1");
$topic1->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);
$topic1->consumeQueueStart(1, RD_KAFKA_OFFSET_BEGINNING, $queue);

$topic2 = $rk->newTopic("topic2");
$topic2->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);

Next, retrieve the consumed messages from the queue:

<?php

while (true) {
    // The only argument is the timeout.
    $msg = $queue->consume(1000);
    if (null === $msg || $msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
        // Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
        continue;
    } elseif ($msg->err) {
        echo $msg->errstr(), "\n";
        break;
    } else {
        echo $msg->payload, "\n";
    }
}

Using stored offsets

Broker (default)

librdkafka per default stores offsets on the broker.

File offsets (deprecated)

If you're using local file for offset storage, then by default the file is created in the current directory, with a name based on the topic and the partition. The directory can be changed by setting the offset.store.path configuration property.

Consumer settings

Low-level consumer: auto commit settings

To manually control the offset, set enable.auto.offset.store to false.
The settings auto.commit.interval.ms and auto.commit.enable will control
if the stored offsets will be auto committed to the broker and in which interval.

High-level consumer: auto commit settings

To manually control the offset, set enable.auto.commit to false.

High level consumer: max.poll.interval.ms

Maximum allowed time between calls to consume messages for high-level consumers.
If this interval is exceeded the consumer is considered failed and the group will
rebalance in order to reassign the partitions to another consumer group member.

Consumer group id (general)

group.id is responsible for setting your consumer group ID and it should be unique (and should not change). Kafka uses it to recognize applications and store offsets for them.

<?php

$topicConf = new RdKafka\TopicConf();
$topicConf->set("auto.commit.interval.ms", 1e3);

$topic = $rk->newTopic("test", $topicConf);

$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

Interesting configuration parameters

Librdkafka Configuration reference

queued.max.messages.kbytes

librdkafka will buffer up to 1GB of messages for each consumed partition by default. You can lower memory usage by reducing the value of the queued.max.messages.kbytes parameter on your consumers.

topic.metadata.refresh.sparse and topic.metadata.refresh.interval.ms

Each consumer and producer instance will fetch topics metadata at an interval defined by the topic.metadata.refresh.interval.ms parameter. Depending on your librdkafka version, the parameter defaults to 10 seconds, or 600 seconds.

librdkafka fetches the metadata for all topics of the cluster by default. Setting topic.metadata.refresh.sparse to the string "true" makes sure that librdkafka fetches only the topics he uses.

Setting topic.metadata.refresh.sparse to "true", and topic.metadata.refresh.interval.ms to 600 seconds (plus some jitter) can reduce the bandwidth a lot, depending on the number of consumers and topics.

internal.termination.signal

This setting allows librdkafka threads to terminate as soon as librdkafka is done with them. This effectively allows your PHP processes / requests to terminate quickly.

When enabling this, you have to mask the signal like this:

<?php
// once
pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
// any time
$conf->set('internal.termination.signal', SIGIO);

socket.blocking.max.ms (librdkafka < 1.0.0)

Maximum time a broker socket operation may block. A lower value improves responsiveness at the expense of slightly higher CPU usage.

Reducing the value of this setting improves shutdown speed. The value defines the maximum time librdkafka will block in one iteration of a read loop. This also defines how often the main librdkafka thread will check for termination.

queue.buffering.max.ms

This defines the maximum and default time librdkafka will wait before sending a batch of messages. Reducing this setting to e.g. 1ms ensures that messages are sent ASAP, instead of being batched.

This has been seen to reduce the shutdown time of the rdkafka instance, and of the PHP process / request.

Performance / Low-latency settings

Here is a configuration optimized for low latency. This allows a PHP process / request to send messages ASAP and to terminate quickly.

<?php

$conf = new \RdKafka\Conf();
$conf->set('socket.timeout.ms', 50); // or socket.blocking.max.ms, depending on librdkafka version
if (function_exists('pcntl_sigprocmask')) {
    pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
    $conf->set('internal.termination.signal', SIGIO);
} else {
    $conf->set('queue.buffering.max.ms', 1);
}

$producer = new \RdKafka\Producer($conf);
$consumer = new \RdKafka\Consumer($conf);

It is advised to call poll at regular intervals to serve callbacks. In php-rdkafka:3.x
poll was also called during shutdown, so not calling it in regular intervals might
lead to a slightly longer shutdown. The example below polls until there are no more events in the queue:

$producer->produce(...);
while ($producer->getOutQLen() > 0) {
    $producer->poll(1);
}

Documentation

https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/book.rdkafka.html
The source of the documentation can be found here

Asking for Help

If the documentation is not enough, feel free to ask a questions on the php-rdkafka channels on Gitter or Google Groups.

Stubs

Because your IDE is not able to auto discover php-rdkadka api you can consider usage of external package providing a set of stubs for php-rdkafka classes, functions and constants: kwn/php-rdkafka-stubs

Contributing

If you would like to contribute, thank you :)

Before you start, please take a look at the CONTRIBUTING document to see how to get your changes merged in.

Credits

Documentation copied from librdkafka.

Authors: see contributors.

License

php-rdkafka is released under the MIT license.

Comments
  • In the FPM environment, the response is very slow,

    In the FPM environment, the response is very slow,

    • PHP version: 7.0.10
    • librdkafka version: 0.9.2
    • php-rdkfaka version: 3.0.0
    $_st  = microtime(TRUE);
    $producer = new \RdKafka\Producer();
    $producer->addBrokers('kafka001:9092,kafka002:9092');
    $topic = $producer->newTopic('PHPTest');
    $topic->produce(\RD_KAFKA_PARTITION_UA, 0, date(DATE_W3C));
    var_dump(microtime(TRUE) - $_st);
    

    output: float(0.00036001205444336)

    But in the nginx log is mostly 100 to 200ms

    I would like to ask FPM how to optimize it?

    opened by QQ2021 37
  • How to count successfully sent messages, producer message order

    How to count successfully sent messages, producer message order

    • PHP version: 7.1.9
    • librdkafka version: 0.9.1
    • php-rdkafka version: 3.1.2
    • kafka version: 2.5.0

    my php code

    
           $max =1000;
            
            
    
            $configBrokers = config('kafka.brokers');
            $configTopic = config('kafka.topic');
        
            // 写入 kafka
            $producer = new \RdKafka\Producer();
            $producer->addBrokers($configBrokers);
        
            $configObj = new \RdKafka\TopicConf();
            $topic = $producer->newTopic($configTopic, $configObj);
        
            for ($i = 0; $i < $max; ++ $i) {
                $topic->produce(RD_KAFKA_PARTITION_UA, 0, "php send " . $i);
            }
            
            while (($len = $producer->getOutQLen()) > 0) {
            
                $diff = $max - $len;
                echo "\r {$diff}/$max";
                $producer->poll(1);
            }
            
            $diff = $max - $len;
            echo "\r {$diff}/$max";
    

    But when I go to consume, the quantity always does not match 1000. always

    question 
    opened by seth-shi 28
  • How to configure asynchrony in PHP?

    How to configure asynchrony in PHP?

    • PHP version:7.2
    • librdkafka version:4.0.3
    • php-rdkafka version:1.2.1
    • kafka version:2.5

    $conf = new \RdKafka\Conf(); $rk = new \RdKafka\Producer($conf); $rk->addBrokers(self::$broker_list); $cf = new \RdKafka\TopicConf(); $rk->newTopic(self::$topic, $cf); $topic->produce(RD_KAFKA_PARTITION_UA, self::$partition, $message);

    asynchrony in java Properties props = new Properties(); props.put('producer.type', 'sync');

    But I don't see such a configuration in the rdkafka PHP document

    opened by niwsmbulai1989 27
  • Question: How to use consumer / producer in a web application

    Question: How to use consumer / producer in a web application

    Bonjour

    Nous sommes du Portugal. Nous avons un probleme avec la configuration du kafka.

    Le message cést:

    Symfony\Component\Debug\Exception\FatalThrowableError Class 'App\Http\Controllers\RdKafka\Conf' not found

    Pouvez vous nous aider a resoudre ce problem?

    Merci en avance Joao Faria

    question 
    opened by jfariablosis 24
  • Exception during producer shutdown since 4.0.3 (race condition, RdKafka\Kafka::__construct() has not been called)

    Exception during producer shutdown since 4.0.3 (race condition, RdKafka\Kafka::__construct() has not been called)

    A few days ago, I added kafka to my project, and deployed in product env, and some error logs occurred (log in below).

    I'm not sure this fault caused by rdkafka, but it doesn't happen before. If i remove the kafka code, the error disappeared. I'd write a simple test script, but doesn't reproduce the same error.

    Can someone help me to find the real reason. Thanks.

    • PHP version: 7.2.31
    • librdkafka version: 1.4.0
    • php-rdkafka version: 4.0.3
    • kafka version: kafka_2.12-2.4.1
    [17-May-2020 10:50:49] WARNING: [pool www] child 17854 said into stderr: "NOTICE: PHP message: PHP Fatal error:  Uncaught Exception: RdKafka\Kafka::__construct() has not been called in [no active file]:0"
    [17-May-2020 10:50:49] WARNING: [pool www] child 17854 said into stderr: "Stack trace:"
    [17-May-2020 10:50:49] WARNING: [pool www] child 17854 said into stderr: "#0 {main}"
    [17-May-2020 10:50:49] WARNING: [pool www] child 17854 said into stderr: "  thrown in [no active file] on line 0"
    [17-May-2020 10:50:49] WARNING: [pool www] child 17854 said into stderr: "[no active file](0) : Fatal error - Uncaught Exception: RdKafka\Kafka::__construct() has not been called in [no active file]:0"
    [17-May-2020 10:50:49] WARNING: [pool www] child 17854 said into stderr: "Stack trace:"
    [17-May-2020 10:50:49] WARNING: [pool www] child 17854 said into stderr: "#0 {main}"
    [17-May-2020 10:50:49] WARNING: [pool www] child 17854 said into stderr: "  thrown"
    [17-May-2020 10:50:49] WARNING: [pool www] child 17854 said into stderr: "/home/www/softwares/php-7.2.31/ext/opcache/ZendAccelerator.c:652: accel_rep
    lace_string_by_process_permanent: Assertion `!((char*)(str) >= (accel_shared_globals->interned_strings_start) && (char*)(str) < (accel_shared_globals
    ->interned_strings_end))' failed."
    [17-May-2020 10:50:50] WARNING: [pool www] child 17854 exited on signal 6 (SIGABRT - core dumped) after 2750.439072 seconds from start
    [17-May-2020 10:50:50] NOTICE: [pool www] child 25102 started
    
    #0  0x00007f296baf72c7 in raise () from /lib64/libc.so.6
    #1  0x00007f296baf89b8 in abort () from /lib64/libc.so.6
    #2  0x00007f296baf00e6 in __assert_fail_base () from /lib64/libc.so.6
    #3  0x00007f296baf0192 in __assert_fail () from /lib64/libc.so.6
    
    #4  0x00007f2969753f1b in accel_replace_string_by_process_permanent (str=0x7f295c78d8b0)
        at /home/www/softwares/php-7.2.31/ext/opcache/ZendAccelerator.c:652
    #5  0x00007f2969753606 in accel_copy_permanent_strings (new_interned_string=0x7f2969753e49 <accel_replace_string_by_process_permanent>)
        at /home/www/softwares/php-7.2.31/ext/opcache/ZendAccelerator.c:528
    #6  0x00007f2969753fee in accel_use_permanent_interned_strings () at /home/www/softwares/php-7.2.31/ext/opcache/ZendAccelerator.c:679
    
    #7  0x00007f2969759194 in accel_shutdown () at /home/www/softwares/php-7.2.31/ext/opcache/ZendAccelerator.c:2874
    #8  0x00007f296975b878 in zm_shutdown_zend_accelerator (type=1, module_number=43)
        at /home/www/softwares/php-7.2.31/ext/opcache/zend_accelerator_module.c:435
    #9  0x0000000000a38858 in module_destructor (module=0x2180080) at /home/www/softwares/php-7.2.31/Zend/zend_API.c:2564
    #10 0x0000000000a2c976 in module_destructor_zval (zv=0x7ffdbe856890) at /home/www/softwares/php-7.2.31/Zend/zend.c:690
    #11 0x0000000000a42b7e in _zend_hash_del_el_ex (ht=0x1682100 <module_registry>, idx=42, p=0x1fd0520, prev=0x0)
        at /home/www/softwares/php-7.2.31/Zend/zend_hash.c:998
    #12 0x0000000000a42c49 in _zend_hash_del_el (ht=0x1682100 <module_registry>, idx=42, p=0x1fd0520)
        at /home/www/softwares/php-7.2.31/Zend/zend_hash.c:1021
    #13 0x0000000000a441a2 in zend_hash_graceful_reverse_destroy (ht=0x1682100 <module_registry>)
        at /home/www/softwares/php-7.2.31/Zend/zend_hash.c:1477
    #14 0x0000000000a36647 in zend_destroy_modules () at /home/www/softwares/php-7.2.31/Zend/zend_API.c:2008
    #15 0x0000000000a2ce7a in zend_shutdown () at /home/www/softwares/php-7.2.31/Zend/zend.c:905
    #16 0x000000000099bac4 in php_module_shutdown () at /home/www/softwares/php-7.2.31/main/main.c:2458
    #17 0x0000000000b26f8d in main (argc=1, argv=0x7ffdbe856db8) at /home/www/softwares/php-7.2.31/sapi/fpm/fpm/fpm_main.c:2020
    
    bug pr-open 
    opened by sofire 23
  • Producer is not sending the msg

    Producer is not sending the msg

    • PHP version: 7.3
    • php-rdkafka version: 4
    • kafka version: 5.3.1-ccs (Commit:03799faf9878a999)

    Hello guys,

    Since the new version, my producer is not able to send new msgs. I set the debug to all and got this msg: https://gist.github.com/wesleywillians/b64806d00b8e5c825c169565022da093

    I'm using pretty much the same example of the documentation.

    Any idea?

    wait-info 
    opened by wesleywillians 17
  • How to get latest mesaage from kafka topic witout while loop

    How to get latest mesaage from kafka topic witout while loop

    while (true) { // The first argument is the partition (again). // The second argument is the timeout. $msg = $topic->consume(0, 1000); if (null === $msg) { continue; } elseif ($msg->err) { echo $msg->errstr(), "\n"; break; } else { echo $msg->payload, "\n"; } } I would like a scheduler who calling this php in every 5 min so i need to get latest kafka topic message please help me.

    • PHP version:
    • librdkafka version:
    • php-rdkafka version:
    opened by sanjaygoyan 17
  • MSG_TIMED_OUT with error code -192 in DrMsgCb normal?

    MSG_TIMED_OUT with error code -192 in DrMsgCb normal?

    log:

    [thrd:main]: test_cdc: 1 message(s) from 1 toppar(s) timed out
    
    • PHP version: 7.2.26
    • librdkafka version: 0.11.3
    • php-rdkafka version: 4.0.3
    • kafka version: 2.2.1
    question 
    opened by xxm404 16
  • Can't connect to broker with SSL_SASL

    Can't connect to broker with SSL_SASL

    • PHP version: 7.3
    • librdkafka version: 0.9.3
    • php-rdkfaka version: 3.1.0-dev
    • OS used: Debian 9

    I can't seem to connect to remote kafka broker when using ssl_sasl. Whenever I try I get this: "../ssl/ssl_rsa.c:615: error:140DC002:SSL routines:use_certificate_chain_file:system lib: "

    My config:

        'kafka' => [
            'common' => [
                'metadata.broker.list' => 'my.nice.broker.com:9095',
    //            'bootstrap.servers' => ['my.nice.broker.com:9095'],
                'group.id' => 'GroupId',
                'security.protocol' => 'SASL_SSL',
                'sasl.mechanisms' => "SCRAM-SHA-256",
                'ssl.key.location' => __DIR__ . '/Certificates/client.key',
                'ssl.certificate.location' =>  __DIR__ . '/Certificates/client.cer.pem',
                'ssl.ca.location' =>__DIR__ . '/Certificates/',
                'ssl.key.password' => 'herebedragons',
                'sasl.password' => 'test1',
                'sasl.username' => 'test1',
            ],    
        ],
    

    Certificate is self-signed, was extracted from JKS file.

    Thinking it is a wrong config I tried removing ssl.key.location and ssl.certificate.location. It somewhat worked in a sense that I don't get an exception, but I'm still unable to draw messages in (consume returns null while there should be messages on the other side). rd_kafka_errno() returns me error 115 which I couldn't get much info on. And consumeStop function seem to be hanging php for good.

    opened by ledocool 16
  • Segmentation fault on PHP shutdown when security.protocol is ssl

    Segmentation fault on PHP shutdown when security.protocol is ssl

    This issue is occurring both on my Mac (with Homebrew or by-hand installs), and on a Ubuntu 14.04 system (with PHP etc compiled by hand).

    • PHP version: 7.0.7 on Mac OS, 5.6.22 on Ubuntu
    • librdkafka version: 0.9.1
    • php-rdkfaka version: both with 0.9.1 and master/php7 branches

    Simple-ish test script:

    foreach(['KAFKA_CLIENT_CERT', 'KAFKA_CLIENT_CERT_KEY', 'KAFKA_TRUSTED_CERT'] as $name) {
        $$name = tempnam('/tmp', $name);
        file_put_contents($$name, $_ENV[$name]);
    }
    
    $conf = new RdKafka\Conf();
    $conf->set('api.version.request', 'false');
    $conf->set('debug', 'all');
    $conf->set('security.protocol', 'ssl');
    $conf->set('ssl.ca.location', $KAFKA_TRUSTED_CERT);
    $conf->set('ssl.certificate.location', $KAFKA_CLIENT_CERT);
    $conf->set('ssl.key.location', $KAFKA_CLIENT_CERT_KEY);
    
    $rk = new RdKafka\Producer($conf);
    $rk->setLogLevel(LOG_DEBUG);
    $rk->addBrokers(str_replace('kafka+ssl://', '', $_ENV['KAFKA_URL']));
    
    $topic = $rk->newTopic('test');
    
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, 'Message payload');
    

    The message makes it through to the broker just fine. It appears that the segfault occurs on PHP shutdown; possibly related to the use of SSL. GDB:

    (gdb) run kafka.php
    Starting program: /usr/local/bin/php kafka.php
    
    Program received signal SIGSEGV, Segmentation fault.
    0x0000000102893090 in ?? ()
    (gdb) bt
    #0  0x0000000102893090 in ?? ()
    #1  0x0000000100c9458a in int_err_del () from /usr/local/opt/openssl/lib/libcrypto.1.0.0.dylib
    #2  0x000000010003a673 in zm_shutdown_openssl ()
    #3  0x0000000100429e19 in module_destructor ()
    #4  0x0000000100421a80 in module_destructor_zval ()
    #5  0x00000001004312ec in _zend_hash_del_el_ex ()
    #6  0x0000000100432701 in zend_hash_graceful_reverse_destroy ()
    #7  0x0000000100421c09 in zend_shutdown ()
    #8  0x00000001003c52f3 in php_module_shutdown ()
    #9  0x0000000000000001 in ?? ()
    #10 0x0000000000000001 in ?? ()
    #11 0x00007fff5fbffb20 in ?? ()
    #12 0x00000001004b0767 in main ()
    Backtrace stopped: frame did not save the PC
    

    If you need a Kafka server with SSL and client certs for auth to test/reproduce, let me know.

    bug 
    opened by dzuelke 16
  • connection down brokers. t try catch connection fail message. but can`t work

    connection down brokers. t try catch connection fail message. but can`t work

    PHP version: 7.1.9 librdkafka version: 0.9.1 php-rdkafka version: 3.1.2 kafka version: 2.5.0 my php code

    $kafkaBrokers = 'debian-server:9092';
    $kafkaTopic = 'test';
    
    $producer = new \RdKafka\Producer();
    $producer->addBrokers($kafkaBrokers);
            
    $topicConfig = new \RdKafka\TopicConf();
    $topic = $producer->newTopic($kafkaTopic, $topicConfig);
    
    // here block my web api, 
    // i try catch exception, but can`t work
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, 'i am message');
    

    i get error message

    %3|1596426822.744|FAIL|rdkafka#producer-1| debian-server:9092/bootstrap: Failed to resolve 'debian-server:9092': %3|1596426822.
    756|ERROR|rdkafka#producer-1| debian-server:9092/bootstrap: Failed to resolve 'debian-server:9092': %3|1596426822.767|ERROR|rdk
    afka#producer-1| 1/1 brokers are down
    

    I want to be able to return a response to the API after the connection fails

    opened by seth-shi 15
Releases(6.0.3)
  • 6.0.3(Jul 2, 2022)

  • 6.0.2(Jun 12, 2022)

  • 6.0.1(Feb 15, 2022)

  • 6.0.0(Jan 7, 2022)

    Changes since 5.x

    Improvements

    • PHP 8.1 support (@remicollet, @ruudk, @nick-zh)
    • Added parameter types (when built with PHP>=8.0) (@arnaud-lb)
    • Added tentative return types (when built with PHP>=8.1) (@arnaud-lb)

    Deprecations

    • PHP 8.1: Overloading php-rdkafka methods without specifying a return type will trigger a deprecation message unless annotated with #[\ReturnTypeWillChange]

    Changes since 6.0.0RC2

    Bugfixes

    • Fix newTopic() arginfo (#502, @arnaud-lb)
    Source code(tar.gz)
    Source code(zip)
  • 6.0.0RC2(Nov 27, 2021)

  • 5.0.2(Nov 27, 2021)

  • 6.0.0RC1(Nov 19, 2021)

  • 5.0.1(Nov 19, 2021)

    Enhancements

    • Add pausePartitions(), resumePartitions() on RdKfaka, RdKafka\KafkaConsumer (#438, @arnaud-lb)
    • Clarify error when KafkaConsumer is closed (@zoonru)

    Bugfixes

    • Fix windows build (#440, @nick-zh)
    • Fix crash in RdKafka\Metadata\Topic::getTopic() (#465, @arnaud-lb)
    Source code(tar.gz)
    Source code(zip)
  • 5.0.0(Jan 14, 2021)

    Enhancements

    • PHP 8 support (@nick-zh, @arnaud-lb)
    • Suport passing an opaque value in produce(), producev() (@arnaud-lb)

    Breaking changes

    • Dropped PHP 5 support
    Source code(tar.gz)
    Source code(zip)
  • 4.1.2(Dec 24, 2020)

    BREAKING CHANGE: Since version 4.0, the client no longer polls for network
    events at shutdown (during object destructor). This behaviour didn't give
    enough control to the user in case of server issue, and could cause the script to hang while terminating.

    Starting from 4.0, programs MUST call flush() before shutting down, otherwise some messages and callbacks may be lost.

    Enhancements

    • Enabled features on windows build: headers, purge, murmur (#410, @nick-zh, @cmb69)
    Source code(tar.gz)
    Source code(zip)
  • 4.1.1(Dec 7, 2020)

    BREAKING CHANGE: Since version 4.0, the client no longer polls for network
    events at shutdown (during object destructor). This behaviour didn't give
    enough control to the user in case of server issue, and could cause the script to hang while terminating.

    Starting from 4.0, programs MUST call flush() before shutting down, otherwise some messages and callbacks may be lost.

    Bugfixes

    • Fix windows pecl build (#402, @nick-zh)
    Source code(tar.gz)
    Source code(zip)
  • 4.1.0(Dec 6, 2020)

    BREAKING CHANGE: Since version 4.0, the client no longer polls for network
    events at shutdown (during object destructor). This behaviour didn't give
    enough control to the user in case of server issue, and could cause the script to hang while terminating.

    Starting from 4.0, programs MUST call flush() before shutting down, otherwise some messages and callbacks may be lost.

    Features

    • Add transactional producer support (#359, @nick-zh)
    Source code(tar.gz)
    Source code(zip)
  • 4.0.4(Oct 8, 2020)

    BREAKING CHANGE: Since version 4.0, the client no longer polls for network
    events at shutdown (during object destructor). This behaviour didn't give
    enough control to the user in case of server issue, and could cause the script to hang while terminating.

    Starting from 4.0, programs MUST call flush() before shutting down, otherwise some messages and callbacks may be lost.

    Bugfixes

    • Fix crash during shurtdown (#367, @nick-zh, @sofire)

    Enhancements

    • Improved CI (@Steveb-p, @arnaud-lb)

    Documentation

    • Improved doc (@nick-zh, @Steveb-p)
    Source code(tar.gz)
    Source code(zip)
  • 4.0.3(Feb 7, 2020)

    BREAKING CHANGE: Since version 4.0, the client no longer polls for network events at shutdown (during object destructor). This behaviour didn't give enough control to the user in case of server issue, and could cause the script to hang while terminating.

    Starting from 4.0, programs MUST call flush() before shutting down, otherwise some messages and callbacks may be lost.

    Improvements

    • Add partition check for offsetStore (#331, @nick-zh)
    • Naming consistency for setting in tests (#339, @romainneutron)

    Bugfixes

    • Fix headers containing null bytes (#338, @arnaud-lb, @dirx @nick-zh)
    • Fix topic deconstruct for high level consumer (#333, @nick-zh)

    Documentation

    • Fix doc example (#340, @Steveb-p)
    • Remove outdated and duplicate examples (#341, @nick-zh)
    Source code(tar.gz)
    Source code(zip)
  • 4.0.2(Dec 15, 2019)

    BREAKING CHANGE: Since version 4.0, the client no longer polls for network events at shutdown (during object destructor). This behaviour didn't give enough control to the user in case of server issue, and could cause the script to hang while terminating.

    Starting from 4.0, programs MUST call flush() before shutting down, otherwise some messages and callbacks may be lost.

    Enhancements

    • internal improvements: warnings and types (#322, @remicollet)

    Bugfixes

    • Fix partition 0 exposed as NULL in Message (#327 reverts #321, @arnaud-lb @nick-zh)
    • Fix memory leak in consume() when messages have headers (#323, @nick-zh)
    Source code(tar.gz)
    Source code(zip)
  • 4.0.1(Dec 8, 2019)

    WARNING BREAKS CONSUMERS: #324 , stick to php-rdkafka:4.0.0 for now

    BREAKING CHANGE: Since version 4.0, the client no longer polls for network events at shutdown (during object destructor). This behaviour didn't give enough control to the user in case of server issue, and could cause the script to hang while terminating.

    Starting from 4.0, programs MUST call flush() before shutting down, otherwise some messages and callbacks may be lost.

    Features

    • Added RdKafka\ConsumerTopic::consumeCallback() (#310, @nick-zh)

    Enhancements

    • Run integration tests in CI (#223, @Steveb-p)
    • Improved README (#295 #297 #298, #307 @Steveb-p @sndsgd @nick-zh)
    • Fix windows test cases (#296, @cmb69)
    • Add testsuite in pecl archive (#291, @remicollet)
    • Add editor config (#308, @Steveb-p)

    Bugfixes

    • Fix build (#290, @nick-zh)
    • Fix segfault during module shutdown (#294, @arnaud-lb @nick-zh)
    • Fix RdKafka\Topic visibility in PHP 7.4 (#316, @nick-zh)
    • Fix headers memory management in producev (#318 , @nick-zh)
    • Fix partition number in error (#321, @nick-zh)
    Source code(tar.gz)
    Source code(zip)
  • 3.1.3(Dec 8, 2019)

  • 4.0.0(Oct 4, 2019)

    BREAKING CHANGE: Since version 4.0, the client longer polls for network events at shutdown (during object destructor). This behaviour didn't give enought control to the user in case of server issue, and could cause the script to hang while terminating.

    Starting from this version, programs MUST now call flush() before shutting down, otherwise some messages and callbacks may be lost.

    Features

    • Added RdKafka\Kafka::offsetsForTimes(), RdKafka\KafkaConsumer::offsetsForTimes() (#238, #270, @nick-zh)
    • Added RdKafka\KafkaConsumer::getOffsetPositions() (#244, @nick-zh)
    • Added RdKafka\Kafka::purge() (#255, @nick-zh)
    • Added RdKafka\Kafka::flush() (#264, @nick-zh)
    • Added RdKafka\ConsumerTopic::consumeBatch() (#256, @nick-zh)
    • Added RdKafka\Conf::setLogCb() (#253, @nick-zh)
    • Added RdKafka\KafkaConsumer::queryWatermarkOffsets() (#271, @nick-zh)
    • Added RdKafka\KafkaConsumer::close() (#144, @TiMESPLiNTER)

    Enhancements

    • Support block on full producer queue (RD_KAFKA_MSG_F_BLOCK) (#245, @nick-zh)
    • Add additional partitioners (#267, @nick-zh)
    • Fix phpinfo output (#172, @TiMESPLiNTER)
    • Don't poll in destruct anymore (#264, #278, @nick-zh)

    Bugfixes

    • Fix segfault, remove Producer::newQueue (#273, @nick-zh)

    General

    • Dropping support for librdkafka below 0.11 (#247, @arnaud-lb)
    • Update build matrix PHP 7.3 + nightly, librdkafka 1.x + master (#249, @arnaud-lb)
    • Deprecating deprecated librdkafka functions (#266, #254, #251, @nick-zh)
    Source code(tar.gz)
    Source code(zip)
  • 3.1.2(Jul 8, 2019)

  • 3.1.1(Jul 3, 2019)

    Features

    • Expose query watermark offsets (#219, @gytislakavicius)

    Enhancements

    • Support sending timestamp (epoch ms) in producev (#228, @lkm)

    Fixes

    • Fix KafkaTopic::producev causing segfault on librdkafka 1.0.0 (#222, @Steveb-p)
    • Fix version parsing (#224, @dariuskasiulevicius)
    Source code(tar.gz)
    Source code(zip)
  • 3.1.0(Apr 23, 2019)

    Features

    • add callback for offset_commit (RdKafka\Conf::setOffsetCommitCb) and consume (RdKafka\Conf::setConsumeCb) (#155)
    • add support for message headers (RdKafka\ProducerTopic::producev) for Kafka >= 0.11.0.0 and librdkafka >= 0.11.4 (#206)
    • get committed offsets (RdKafka\KafkaConsumer::getCommittedOffsets) (#208)

    Enhancements

    • RdKafka\ProducerTopic::produce - allow payload or key to be null (#179)
    • RdKafka\Message::errstr - prefer readable error over message payload (#181)

    Fixes

    • fix arginfo of RdKafka\TopicConf::set (#188)
    • multiple code example fixes (#193, #203)
    • Parse message headers only on non error (#207)
    Source code(tar.gz)
    Source code(zip)
Owner
Arnaud Le Blanc
Arnaud Le Blanc
The Kafka Enqueue transport - This is an implementation of Queue Interop specification

Supporting Enqueue Enqueue is an MIT-licensed open source project with its ongoing development made possible entirely by the support of community and

Enqueue 40 Oct 6, 2022
PHP library with ready-to-use Yunbi API implementation.

yunbi-client-php A simple PHP client for Crypto Trade Site Yunbi.com Quick example <?php require_once('lib/yunbi-client.php'); try { $client = new

null 6 Dec 2, 2019
Google-api-php-client - A PHP client library for accessing Google APIs

Google APIs Client Library for PHP Reference Docs https://googleapis.github.io/google-api-php-client/main/ License Apache 2.0 The Google API Client Li

Google APIs 8.4k Dec 30, 2022
PHP JSON-RPC 2.0 Server/Client Implementation with Automatic Client Class Generation via SMD

PHP JSON-RPC 2.0 Server/Client Implementation with Automatic Client Class Generation via SMD

Sergey Bykov 63 Feb 14, 2022
OpenAI API Client is a component-oriented, extensible client library for the OpenAI API. It's designed to be faster and more memory efficient than traditional PHP libraries.

OpenAI API Client in PHP (community-maintained) This library is a component-oriented, extensible client library for the OpenAI API. It's designed to b

Mounir R'Quiba 6 Jun 14, 2023
⚡️ Web3 PHP is a supercharged PHP API client that allows you to interact with a generic Ethereum RPC.

Web3 PHP is a supercharged PHP API client that allows you to interact with a generic Ethereum RPC. This project is a work-in-progress. Code and docume

Web3 PHP 665 Dec 23, 2022
A simple PHP GitHub API client, Object Oriented, tested and documented.

PHP GitHub API A simple Object Oriented wrapper for GitHub API, written with PHP. Uses GitHub API v3 & supports GitHub API v4. The object API (v3) is

KNP Labs 2k Jan 7, 2023
A simple Object Oriented PHP Client for Termii SMS API

Termii Client A simple Object Oriented PHP Client for Termii SMS API. Uses Termii API. Requirements PHP >= 7.2 Guzzlehttp ~6|~7 Installation Via Compo

Ilesanmi Olawale Adedotun 5 Feb 24, 2022
Xendit REST API Client for PHP - Card, Virtual Account, Invoice, Disbursement, Recurring Payments, Payout, EWallet, Balance, Retail Outlets Services

Xendit REST API Client for PHP - Card, Virtual Account, Invoice, Disbursement, Recurring Payments, Payout, EWallet, Balance, Retail Outlets Services

Xendit 96 Jan 6, 2023
php 8 client for the lemon.markets api

lemon.markets php client This repository contains a php 8+ compatible client for the https://lemon.markets API. The documentation of the API can be fo

Daniel Freudenberger 4 Nov 17, 2022
PHP client for Microsoft Azure Face API.

Microsoft Azure Face API PHP client A PHP library that utilizes Azure Face REST API. Requirements PHP >= 7.4 Installation composer require darmen/php-

Darmen Amanbayev 6 Sep 14, 2022
Google PHP API Client Services

Google PHP API Client Services

Google APIs 1.1k Dec 22, 2022
AltiriaSmsPhpClient, the official PHP client of Altiria

Altiria, cliente SMS PHP Altiria SMS PHP es un cliente que simplifica al máximo la integración de nuestro API para PHP. Por el momento, esta librería

Altiria 3 Dec 22, 2022
PHP Client for the GoFlink API

GoFlink PHP API Client This project is an unofficial library to communicate with the GoFlink API from your PHP project. Documentation about the API is

Rico Hageman 4 Oct 3, 2022
A PHP client for the official Kizeo Forms API V3+. 📌

Kizeo Forms API V3+ - PHP This is a Swagger generated doc for Kizeo REST API 3. You can find additionnal documentation here : Online documentation. Th

siapepfrance 1 Oct 26, 2021
Client for the Tenant Security Proxy in PHP

Tenant Security Client PHP Library A PHP client for implementing CMK within a vendor's infrastructure. Makes requests through an IronCore Tenant Secur

IronCore Labs 1 Nov 19, 2021
Shopee Open API v2 Client build with php

Shopee PHP Client This is a Shopee PHP Client, currently supported for API V2 in ShopeeOpenPlatform Composer Install composer require haistar/shopee-p

ravimukti 17 Dec 27, 2022
The official Previewify.app PHP Client

Previewify for PHP This is the official Previewify client for PHP. Support us Like our work? You can support us by purchasing one of our products. Ins

Flowframe 6 Jan 19, 2022
VideoColor PHP Search Client

This library is designed to find information about a movie and get the frame position using a screenshot from a video.

null 3 Oct 9, 2022