kafka php client

Overview

Kafka-php

中文文档

QQ Group Build Status Packagist Packagist Packagist GitHub issues GitHub forks GitHub stars GitHub license

Kafka-php is a pure PHP kafka client that currently supports greater than 0.8.x version of Kafka, this project v0.2.x and v0.1.x are incompatible if using the original v0.1.x You can refer to the document Kafka PHP v0.1.x Document, but it is recommended to switch to v0.2.x . v0.2.x use PHP asynchronous implementation and kafka broker interaction, more stable than v0.1.x efficient, because the use of PHP language so do not compile any expansion can be used to reduce the access and maintenance costs

Requirements

  • Minimum PHP version: 7.1
  • Kafka version greater than 0.8
  • The consumer module needs kafka broker version greater than 0.9.0

Installation

Add the lib directory to the PHP include_path and use an autoloader like the one in the examples directory (the code follows the PEAR/Zend one-class-per-file convention).

Composer Install

Simply add a dependency nmred/kafka-php to your project if you use Composer to manage the dependencies of your project.

$ composer require nmred/kafka-php

Here is a minimal example of a composer.json file :

{
	"require": {
		"nmred/kafka-php": "0.2.*"
	}
}

Configuration

Configuration properties are documented in Configuration

Producer

Asynchronous mode

<?php
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
use Monolog\Logger;
use Monolog\Handler\StdoutHandler;
// Create the logger
$logger = new Logger('my_logger');
// Now add some handlers
$logger->pushHandler(new StdoutHandler());

$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('10.13.4.159:9192');
$config->setBrokerVersion('1.0.0');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer(
    function() {
        return [
            [
                'topic' => 'test',
                'value' => 'test....message.',
                'key' => 'testkey',
            ],
        ];
    }
);
$producer->setLogger($logger);
$producer->success(function($result) {
	var_dump($result);
});
$producer->error(function($errorCode) {
		var_dump($errorCode);
});
$producer->send(true);

Synchronous mode

<?php
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
use Monolog\Logger;
use Monolog\Handler\StdoutHandler;
// Create the logger
$logger = new Logger('my_logger');
// Now add some handlers
$logger->pushHandler(new StdoutHandler());

$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9192');
$config->setBrokerVersion('1.0.0');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer();
$producer->setLogger($logger);

for($i = 0; $i < 100; $i++) {
    $producer->send([
        [
            'topic' => 'test1',
            'value' => 'test1....message.',
            'key' => '',
        ],
    ]);
}

Consumer

<?php
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
use Monolog\Logger;
use Monolog\Handler\StdoutHandler;
// Create the logger
$logger = new Logger('my_logger');
// Now add some handlers
$logger->pushHandler(new StdoutHandler());

$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('10.13.4.159:9192');
$config->setGroupId('test');
$config->setBrokerVersion('1.0.0');
$config->setTopics(['test']);
//$config->setOffsetReset('earliest');
$consumer = new \Kafka\Consumer();
$consumer->setLogger($logger);
$consumer->start(function($topic, $part, $message) {
	var_dump($message);
});

Low-Level API

Refer Example

QQ Group

Group 1: 531522091 Group 2: 657517955 QQ Group

Comments
  • Exceptions when producing invalid messages + custom exception on reading socket

    Exceptions when producing invalid messages + custom exception on reading socket

    Following this https://github.com/weiboad/kafka-php/issues/176

    I think it should not create new topic automatically as it is a bit "magical behaviour"

    Improvement 
    opened by simPod 14
  • Ack'ing messages

    Ack'ing messages

    Hello,

    How do I ack messages while consuming so that the server can remove them from the queue.

    Also, is there a way to detect any errors while publishing the messages so that we can take actions, such as storing the message in a DB, when an error in publishing occurs.

    opened by pushpesh4u 14
  • 持续发送消息时会丢失并且报 Uncaught LogicException: Cannot stop(); event reactor not currently active

    持续发送消息时会丢失并且报 Uncaught LogicException: Cannot stop(); event reactor not currently active

    <?php
    
    require __DIR__ . '/vendor/autoload.php';
    
    $config = \Kafka\ProducerConfig::getInstance();
    $config->setMetadataRefreshIntervalMs(10000);
    $config->setMetadataBrokerList('dev-001:9092');
    $config->setIsAsyn(false);
    $config->setProduceInterval(500);
    $config->setBrokerVersion('0.8.2.2');
    
    while(true){
        $producer = new \Kafka\Producer(function() {
            return array(
                array(
                    'topic' => 'zipkin_traces',
                    'value' => '[]',
                ),
            );
        });
        $producer->success(function($result) {
            var_dump($result);
        });
        $producer->error(function($errorCode, $context) {
            var_dump($errorCode);
        });
    
        $producer->send();
        sleep(1);
    }
    

    输出

    array(2) {
      ["throttleTime"]=>
      int(0)
      ["data"]=>
      array(1) {
        [0]=>
        array(2) {
          ["topicName"]=>
          string(13) "zipkin_traces"
          ["partitions"]=>
          array(1) {
            [0]=>
            array(4) {
              ["partition"]=>
              int(0)
              ["errorCode"]=>
              int(0)
              ["offset"]=>
              int(14)
              ["timestamp"]=>
              int(0)
            }
          }
        }
      }
    }
    array(2) {
      ["throttleTime"]=>
      int(0)
      ["data"]=>
      array(1) {
        [0]=>
        array(2) {
          ["topicName"]=>
          string(13) "zipkin_traces"
          ["partitions"]=>
          array(1) {
            [0]=>
            array(4) {
              ["partition"]=>
              int(2)
              ["errorCode"]=>
              int(0)
              ["offset"]=>
              int(14)
              ["timestamp"]=>
              int(0)
            }
          }
        }
      }
    }
    array(2) {
      ["throttleTime"]=>
      int(0)
      ["data"]=>
      array(1) {
        [0]=>
        array(2) {
          ["topicName"]=>
          string(13) "zipkin_traces"
          ["partitions"]=>
          array(1) {
            [0]=>
            array(4) {
              ["partition"]=>
              int(3)
              ["errorCode"]=>
              int(0)
              ["offset"]=>
              int(14)
              ["timestamp"]=>
              int(0)
            }
          }
        }
      }
    }
    array(2) {
      ["throttleTime"]=>
      int(0)
      ["data"]=>
      array(1) {
        [0]=>
        array(2) {
          ["topicName"]=>
          string(13) "zipkin_traces"
          ["partitions"]=>
          array(1) {
            [0]=>
            array(4) {
              ["partition"]=>
              int(2)
              ["errorCode"]=>
              int(0)
              ["offset"]=>
              int(14)
              ["timestamp"]=>
              int(0)
            }
          }
        }
      }
    }
    array(2) {
      ["throttleTime"]=>
      int(0)
      ["data"]=>
      array(1) {
        [0]=>
        array(2) {
          ["topicName"]=>
          string(13) "zipkin_traces"
          ["partitions"]=>
          array(1) {
            [0]=>
            array(4) {
              ["partition"]=>
              int(7)
              ["errorCode"]=>
              int(0)
              ["offset"]=>
              int(14)
              ["timestamp"]=>
              int(0)
            }
          }
        }
      }
    }
    array(2) {
      ["throttleTime"]=>
      int(0)
      ["data"]=>
      array(1) {
        [0]=>
        array(2) {
          ["topicName"]=>
          string(13) "zipkin_traces"
          ["partitions"]=>
          array(1) {
            [0]=>
            array(4) {
              ["partition"]=>
              int(0)
              ["errorCode"]=>
              int(0)
              ["offset"]=>
              int(14)
              ["timestamp"]=>
              int(0)
            }
          }
        }
      }
    }
    int(1000)
    array(2) {
      ["throttleTime"]=>
      int(0)
      ["data"]=>
      array(1) {
        [0]=>
        array(2) {
          ["topicName"]=>
          string(13) "zipkin_traces"
          ["partitions"]=>
          array(1) {
            [0]=>
            array(4) {
              ["partition"]=>
              int(1)
              ["errorCode"]=>
              int(0)
              ["offset"]=>
              int(14)
              ["timestamp"]=>
              int(0)
            }
          }
        }
      }
    }
    int(1000)
    Fatal error: Uncaught LogicException: Cannot stop(); event reactor not currently active in /Users/Faye/workspace/user-service-view/vendor/amphp/amp/lib/NativeReactor.php:138
    Stack trace:
    #0 /Users/Faye/workspace/user-service-view/vendor/amphp/amp/lib/functions.php(65): Amp\NativeReactor->stop()
    #1 /Users/Faye/workspace/user-service-view/vendor/nmred/kafka-php/src/Kafka/Producer/Process.php(116): Amp\stop()
    #2 /Users/Faye/workspace/user-service-view/vendor/amphp/amp/lib/NativeReactor.php(294): Kafka\Producer\Process->Kafka\Producer\{closure}('000000005825f5e...', NULL)
    #3 /Users/Faye/workspace/user-service-view/vendor/amphp/amp/lib/NativeReactor.php(222): Amp\NativeReactor->executeTimers()
    #4 /Users/Faye/workspace/user-service-view/vendor/amphp/amp/lib/NativeReactor.php(71): Amp\NativeReactor->doTick(false)
    #5 /Users/Faye/workspace/user-service-view/vendor/amphp/amp/lib/functions.php(46): Amp\NativeReactor->run(NULL)
    #6 /Users/Faye/workspace/user-service-view/vendor/nmred/kafka-php/src/Kafka/Producer.php(72): Amp\run()
    #7  in /Users/Faye/workspace/user-service-view/vendor/amphp/amp/lib/NativeReactor.php on line 138
    

    运行大概6~7次之后就会进入到 error 里面,errorCode=1000,kafka 里检查过消息是没有发出去,最后再抛出上面的异常

    opened by kdlan 12
  • bugfix, issue141,179: lost message when consumer restart

    bugfix, issue141,179: lost message when consumer restart

    The cause of this issue: The reason is: when no new messages is available, the fetch request is also successful, and the message array in response is empty, but consumer also commits the offset. At this situation the consumer offset is equal to commit offset, so if you don't stop the consumer, the commit offset will be corrected by next successful fetch request, if you restart the consumer, the offset request will return the offset you committed last time.

    Base on kafka protocol, highwaterMarkOffset represents "The offset at the end of the log for this partition. This can be used by the client to determine how many messages behind the end of the log they are.", so it is not useful.

    opened by chongchaoyu 11
  • 帮忙看下这个错误的原因吧

    帮忙看下这个错误的原因吧

    PHP Notice: Undefined variable: memberId in /var/www/html/dl-test.kmapp.cn/vendor/nmred/kafka-php/src/Kafka/Consumer/Process.php on line 357 PHP Notice: Undefined offset: 1 in /var/www/html/dl-test.kmapp.cn/vendor/nmred/kafka-php/src/Kafka/Consumer/Assignment.php on line 130 PHP Fatal error: Uncaught exception 'Kafka\Exception\Protocol' with message 'given data invalid. assignments is undefined.' in /var/www/html/dl-test.kmapp.cn/vendor/nmred/kafka-php/src/Kafka/Protocol/SyncGroup.php:128 Stack trace: #0 [internal function]: Kafka\Protocol\SyncGroup->encodeGroupAssignment(Array) #1 /var/www/html/dl-test.kmapp.cn/vendor/nmred/kafka-php/src/Kafka/Protocol/Protocol.php(538): call_user_func(Array, Array) #2 /var/www/html/dl-test.kmapp.cn/vendor/nmred/kafka-php/src/Kafka/Protocol/SyncGroup.php(60): Kafka\Protocol\Protocol::encodeArray(Array, Array) #3 /var/www/html/dl-test.kmapp.cn/vendor/nmred/kafka-php/src/Kafka/Protocol.php(139): Kafka\Protocol\SyncGroup->encode(Array) #4 /var/www/html/dl-test.kmapp.cn/vendor/nmred/kafka-php/src/Kafka/Consumer/Process.php(347): Kafka\Protocol::encode(14, Array) #5 /var/www/html/dl-test.kmapp.cn/vendor/nmred/kafka-php/src/Kafka/Consumer/Process.php(88): Kafka\Consumer\Process->syncGroup() #6 [internal function]: Kafka\Consumer\Process->Kafka\Consumer{closure}()

    in /var/www/html/dl-test.kmapp.cn/vendor/nmred/kafka-php/src/Kafka/Protocol/SyncGroup.php on line 128

    opened by lilihua 10
  • "Bus error (core dumped)" when trying to run Produce example

    I ran Produce.php in the example directory and it's dying after throwing "Bus error (core dumped)" message. Code that I have downloaded is unaltered except in ZooKeeper.php constructor where it was found to be Zookeeper instead of ZooKeeper so there was autoload error. I corrected it and now this error is coming

    opened by shades198 9
  • singleton

    singleton

    Based on this https://github.com/weiboad/kafka-php/issues/177 I tried to remove singleton config from this lib so each consumer doesn't rely on that single instance.

    I have created symfony bundle that I'm pushing soon and I tested it there

    opened by simPod 7
  • broker.php中没有error方法,有异常会报错

    broker.php中没有error方法,有异常会报错

    [error] 538#0: *455777 FastCGI sent in stderr: "PHP message: PHP Fatal error: Call to undefined method Kafka\Broker::error() in /WWW/RRKDInterface/vendor/rrkd/php-kafka/src/Broker.php on line 196" while reading response header from upstream

    Bug Invalid 
    opened by fendouxiong 7
  • Key is not used for partitionning

    Key is not used for partitionning

    I have added the 'key' to the payload, which is the ID of one of my entities, so that i get the same entity in the same partition. Our topic has 4 partitions. Unfortunately it does not seem to work. We expect that with the above the message will always go in one of the 4 partitions, but we have seen that the message will be send to all partitions. Do you have any idea with this is the case?

    opened by mantzas 6
  • Implement compression properly

    Implement compression properly

    This PR fixes the creation and parsing of compressed message sets, making the library work follow the Kafka protocol regarding that functionality.

    @nmred some other small fixes are included I can extract them to a different PR to make it easier to review. I really recommend you to review this commit by commit

    Bug 
    opened by lcobucci 6
  • Sending partitionId in setMessages

    Sending partitionId in setMessages

    The Produce::setMessages call requires a partitionId to be sent to which the message will be sent.

    Shouldn't this be decided by the zookeeper rather than the producer to specify it.

    Is there a way to list all partitions in a given topic.

    Can we create topics/partitions.

    opened by pushpesh4u 6
  • How can I create topic with kafka-php programming library?

    How can I create topic with kafka-php programming library?

    Hi, I want create a topic with kafka-php programming library. How can I do?

    If topic doesn't exist in kafka server, executing this code, it doesn't work. It doesn't insert 10 messages "test1....message." because topic isn't create.

    date_default_timezone_set('PRC'); // Create the logger $logger = new Logger('my_logger'); // Now add some handlers $logger->pushHandler(new StreamHandler(DIR . '/app.log', Logger::DEBUG));

    $config = \Kafka\ProducerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('127.0.0.1:9092'); $config->setBrokerVersion('1.0.0'); $config->setRequiredAck(1); $config->setIsAsyn(false); $config->setProduceInterval(500); $producer = new \Kafka\Producer(); $producer->setLogger($logger);

    for($i = 0; $i < 10; $i++) { $result = $producer->send([ [ 'topic' => 'test1', 'value' => 'test1....message.', 'key' => '' ] ]); }

    exit;

    opened by nicolapaganotti1 0
  • Is still have any maintenace or update?

    Is still have any maintenace or update?

    I am the one who using this package, but look like it no have any release for long time ago. I really need to using SASL in this time, but dont have it on stable version.

    @nmred Do you have any plan for release a new version from master branch?

    If you no time for this, please allow other one to maintenace this repo. Not sure it is possible or not?

    opened by ALTELMA 1
Releases(v0.2.0.8)
Owner
Weibo Ad Platform Open Source
Weibo Ad Platform Open Source
A GETTR.com client library written in PHP with Laravel support.

Gettr API Clinet PHP A GETTR.com client library written in PHP with Laravel support. This library uses unofficial publicly accessible API endpoints of

null 10 Dec 13, 2022
SendCloud client for PHP

SendCloud client for PHP Installation composer require guangda/sendcloud Example $mailData = [ 'to'=>'[email protected]', 'subject'=>'test',

Guangda 3 Aug 27, 2021
Bearer client for the PHP programming language

Bearer PHP Client This is the official PHP client for interacting with Bearer.sh. Installation Install the package by running: composer require bearer

Bearer 9 Oct 31, 2022
The most widely used PHP client for RabbitMQ

php-amqplib This library is a pure PHP implementation of the AMQP 0-9-1 protocol. It's been tested against RabbitMQ. The library was used for the PHP

php-amqplib 4.2k Jan 3, 2023
Artax is an asynchronous HTTP client for PHP based on Amp

Artax is an asynchronous HTTP client for PHP based on Amp. Its API simplifies standards-compliant HTTP resource traversal and RESTful web service consumption without obscuring the underlying protocol. The library manually implements HTTP over TCP sockets; as such it has no dependency on ext/curl.

AMPHP 21 Dec 14, 2022
Grpc go-server php-client

Grpc go-server php-client

凯 1 Jan 24, 2022
Idiomatic PHP client for Google Compute.

Google Compute for PHP Idiomatic PHP client for Google Compute. API Documentation NOTE: This repository is part of Google Cloud PHP. Any support reque

Google APIs 3 Jun 13, 2022
An unofficial EdgeDB PHP client.

Unofficial EdgeDB HTTP PHP client Requirements PHP >= 8.0 (with fileinfo and mbstring) An EdgeDB server instance (tested with 1.0+9ecadfc) Quickstart

T3d 7 Aug 18, 2022
A standalone Amazon S3 (REST) client for PHP 5/CURL

Amazon S3 PHP Class Usage OO method (e,g; $s3->getObject(...)): $s3 = new S3($awsAccessKey, $awsSecretKey); Statically (e,g; S3::getObject(...)): S3::

Donovan Schönknecht 1k Jan 3, 2023
A PHP implementation of RabbitMQ Client for webman plugin.

workbunny/webman-rabbitmq ?? A PHP implementation of RabbitMQ Client for webman plugin. ?? A PHP implementation of RabbitMQ Client for webman plugin 常

workbunny 15 Dec 15, 2022
PHP DataDog StatsD Client

PHP DataDog StatsD Client This is an extremely simple PHP DogStatsD client. Requires PHP >= 5.6.0. See CHANGELOG.md for changes. For a Laravel-specifi

Datadog, Inc. 175 Nov 28, 2022
A PHP MySQL database client class to simplify database access

This lightweight database class is written with PHP and uses the MySQLi extension, it uses prepared statements to properly secure your queries, no need to worry about SQL injection attacks.

Khader Handal 50 Jul 30, 2022
Idiomatic PHP client for Cloud Firestore.

Cloud Firestore for PHP Idiomatic PHP client for Cloud Firestore. API documentation NOTE: This repository is part of Google Cloud PHP. Any support req

Google APIs 151 Dec 10, 2022
A PHP API client for ConvertKit

ConvertKit PHP API Client Introduction This is an API Client for the ConvertKit mailing list service for PHP versions 7.4 and up There are several cli

null 0 Aug 29, 2022
PHP client library for Coveralls API.

php-coveralls PHP client library for Coveralls. Prerequisites PHP 5.5+ for 2.x or 5.3+ for 1.x On GitHub Building on Travis CI, CircleCI, Jenkins or C

null 514 Dec 25, 2022
Idiomatic PHP client for Memorystore for Memcached.

Memorystore for Memcached for PHP Idiomatic PHP client for Memorystore for Memcached. API documentation NOTE: This repository is part of Google Cloud

Google APIs 1 Mar 20, 2022
Enterprise isEven API Client

zonuexe\isEvenApi This package is a modern, high performance, high modularity and strongly static typed enterprise quality API Client of isEven API fo

USAMI Kenta 3 Aug 26, 2021
Asynchronous WebSocket client

Pawl An asynchronous WebSocket client in PHP Install via composer: composer require ratchet/pawl Usage Pawl as a standalone app: Connect to an echo s

Ratchet 528 Dec 15, 2022
An open-source Laravel 8 online store, client area, and billing software specially made for Pterodactyl panel

PteroBilling An open-source Laravel 8 online store, client area, and billing software specially made for Pterodactyl panel           Announcement: An

PteroBilling 18 Nov 12, 2022