PHP Warning: unpack(): Type l: not enough input, need 4, have 0 in /www/vendor/longlang/phpkafka/src/Protocol/Type/Int32.php on line 43
Warning: unpack(): Type l: not enough input, need 4, have 0 in /www/vendor/longlang/phpkafka/src/Protocol/Type/Int32.php on line 43
PHP Fatal error: Uncaught longlang\phpkafka\Exception\KafkaErrorException: [35] The version of API is not supported. in /www/vendor/longlang/phpkafka/src/Protocol/ErrorCode.php:385
Stack trace:
#0 /www/vendor/longlang/phpkafka/src/Client/SyncClient.php(190): longlang\phpkafka\Protocol\ErrorCode::check()
#1 /www/vendor/longlang/phpkafka/src/Client/SyncClient.php(98): longlang\phpkafka\Client\SyncClient->updateApiVersions()
#2 /www/vendor/longlang/phpkafka/src/Client/SwooleClient.php(52): longlang\phpkafka\Client\SyncClient->connect()
#3 /www/vendor/longlang/phpkafka/src/Broker.php(88): longlang\phpkafka\Client\SwooleClient->connect()
#4 /www/vendor/longlang/phpkafka/src/Producer/Producer.php(39): longlang\phpkafka\Broker->updateBrokers()
#5 /www/vendor/hyperf/kafka/src/Producer.php(173): longlang\phpkafka\Producer\Producer->__construct()
#6 /www/vendor/hyperf/kafka/src/Producer.php(137): Hyperf\Kafka\Producer->makeProducer()
#7 {main}
thrown in /www/vendor/longlang/phpkafka/src/Protocol/ErrorCode.php on line 385
Fatal error: Uncaught longlang\phpkafka\Exception\KafkaErrorException: [35] The version of API is not supported. in /www/vendor/longlang/phpkafka/src/Protocol/ErrorCode.php:385
Stack trace:
#0 /www/vendor/longlang/phpkafka/src/Client/SyncClient.php(190): longlang\phpkafka\Protocol\ErrorCode::check()
#1 /www/vendor/longlang/phpkafka/src/Client/SyncClient.php(98): longlang\phpkafka\Client\SyncClient->updateApiVersions()
#2 /www/vendor/longlang/phpkafka/src/Client/SwooleClient.php(52): longlang\phpkafka\Client\SyncClient->connect()
#3 /www/vendor/longlang/phpkafka/src/Broker.php(88): longlang\phpkafka\Client\SwooleClient->connect()
#4 /www/vendor/longlang/phpkafka/src/Producer/Producer.php(39): longlang\phpkafka\Broker->updateBrokers()
#5 /www/vendor/hyperf/kafka/src/Producer.php(173): longlang\phpkafka\Producer\Producer->__construct()
#6 /www/vendor/hyperf/kafka/src/Producer.php(137): Hyperf\Kafka\Producer->makeProducer()
#7 {main}
thrown in /www/vendor/longlang/phpkafka/src/Protocol/ErrorCode.php on line 385
[2021-09-02 02:58:02 *753.1] ERROR php_swoole_server_rshutdown() (ERRNO 503): Fatal error: Uncaught longlang\phpkafka\Exception\KafkaErrorException: [35] The version of API is not supported. in /www/vendor/longlang/phpkafka/src/Protocol/ErrorCode.php:385
Stack trace:
#0 /www/vendor/longlang/phpkafka/src/Client/SyncClient.php(190): longlang\phpkafka\Protocol\ErrorCode::check()
#1 /www/vendor/longlang/phpkafka/src/Client/SyncClient.php(98): longlang\phpkafka\Client\SyncClient->updateApiVersions()
#2 /www/vendor/longlang/phpkafka/src/Client/SwooleClient.php(52): longlang\phpkafka\Client\SyncClient->connect()
#3 /www/vendor/longlang/phpkafka/src/Broker.php(88): longlang\phpkafka\Client\SwooleClient->connect()
#4 /www/vendor/longlang/phpkafka/src/Producer/Producer.php(39): longlang\phpkafka\Broker->updateBrokers()
#5 /www/vendor/hyperf/kafka/src/Producer.php(173): longlang\phpkafka\Producer\Producer->__construct()
#6 /www/vendor/hyperf/kafka/src/Producer.php(137): Hyperf\Kafka\Producer->makeProducer()
#7 {main}
thrown in /www/vendor/longlang/phpkafka/src/Protocol/ErrorCode.php on line 385
其中,“The version of API is not supported.”指明phpkafka不支持,不晓得需要什么具体的版本?
php -v & php --ri swoole & composer info | grep longlang/phpkafka
bash-5.1# php -v & php --ri swoole & composer info | grep longlang/phpkafka
[1] 777
[2] 778
PHP 7.4.23 (cli) (built: Aug 26 2021 23:05:01) ( NTS )
Copyright (c) The PHP Group
Zend Engine v3.4.0, Copyright (c) Zend Technologies
with Zend OPcache v7.4.21, Copyright (c), by Zend Technologies
swoole
Swoole => enabled
Author => Swoole Team <[email protected]>
Version => 4.7.0
Built => Aug 28 2021 09:20:30
coroutine => enabled with boost asm context
epoll => enabled
eventfd => enabled
signalfd => enabled
spinlock => enabled
rwlock => enabled
openssl => OpenSSL 1.1.1l 24 Aug 2021
dtls => enabled
http2 => enabled
json => enabled
curl-native => enabled
pcre => enabled
zlib => 1.2.11
brotli => E16777225/D16777225
mutex_timedlock => enabled
pthread_barrier => enabled
async_redis => enabled
Directive => Local Value => Master Value
swoole.enable_coroutine => On => On
swoole.enable_library => On => On
swoole.enable_preemptive_scheduler => Off => Off
swoole.display_errors => On => On
swoole.use_shortname => Off => Off
swoole.unixsock_buffer_size => 8388608 => 8388608
longlang/phpkafka v1.1.5 A kafka client. Support php-fpm and Swoole.
[1]- Done php -v
[2]+ Done php --ri swoole
kafka配置
<?php
declare(strict_types=1);
use Hyperf\Kafka\Constants\KafkaStrategy;
return [
'default' => [
'connect_timeout' => -1,
'send_timeout' => -1,
'recv_timeout' => -1,
'client_id' => '',
'max_write_attempts' => 3,
'bootstrap_servers' => explode(',', env('KAFKA_LOG_BOOTSTRAP_SERVERS', '')),
'acks' => -1,
'producer_id' => -1,
'producer_epoch' => -1,
'partition_leader_epoch' => -1,
'interval' => 0,
'session_timeout' => 60,
'rebalance_timeout' => 60,
'replica_id' => -1,
'rack_id' => '',
'group_retry' => 5,
'group_retry_sleep' => 1,
'group_heartbeat' => 3,
'offset_retry' => 5,
'auto_create_topic' => true,
'partition_assignment_strategy' => KafkaStrategy::RANGE_ASSIGNOR,
'pool' => [
'min_connections' => 1,
'max_connections' => 10,
'connect_timeout' => 10.0,
'wait_timeout' => 3.0,
'heartbeat' => -1,
'max_idle_time' => 60.0,
],
],
'log_service' => [
'app' => env('KAFKA_LOG_APP', ''),
'topic' => env('KAFKA_LOG_TOPIC', '--topic--’),
'bootstrap' => env('KAFKA_LOG_BOOTSTRAP_SERVERS', '--bootstrap--‘),
],
];
<?php
declare(strict_types=1);
namespace App\Service\QueueService;
use App\Service\PageService\BasePS;
use Hyperf\Contract\ConfigInterface;
use Hyperf\HttpServer\Contract\RequestInterface;
use Hyperf\Kafka\Producer;
use Hyperf\Logger\LoggerFactory;
use Hyperf\Contract\ContainerInterface;
class KafkaQS
{
protected $producer;
protected $logger;
protected $config;
protected $container;
protected $request;
public function __construct(ContainerInterface $container, Producer $producer, RequestInterface $request)
{
$this->producer = $producer;
$this->container = $container;
$this->config = $container->get(ConfigInterface::class)->get('kafka.log_service');
}
/**
* elkLog
*
* @param array $param
* @return bool
*/
public function elkLog(array $param = []): bool
{
$topic = $this->config['topic'];
$url = $this->config['bootstrap'];
$logStr = 'test demo';
$this->producer->send($topic, $logStr, $url);
return true;
}
}
question