Use Kafka Producers and Consumers in your laravel app with ease!

Overview

Laravel Kafka

docs/laravel-kafka.png

Latest Version On Packagist Total Downloads MIT Licensed Continuous Integration Check & fix styling PHP Version Require

Do you use Kafka in your laravel packages? All packages I've seen until today, including some built by myself, does not provide a nice syntax usage syntax or, if it does, the test process with these packages are very painful.

This package provides a nice way of producing and consuming kafka messages in your Laravel projects.

Follow these docs to install this package and start using kafka with ease.

Installation

To install this package, you must have installed PHP RdKafka extension. You can follow the steps here to install rdkafka in your system.

With RdKafka installed, require this package with composer:

composer require mateusjunges/laravel-kafka

Usage

After installing the package, you can start producing and consuming Kafka messages.

Producing Kafka Messages

To publish your messages to Kafka, you can use the publishOn method, of Junges\Kafka\Facades\Kafka class:

use Junges\Kafka\Facades\Kafka;

Kafka::publishOn('broker', 'topic');

This method returns a Junges\Kafka\Producers\ProducerBuilder::class instance, and you can configure your message.

The ProducerBuilder class contains a few methods to configure your kafka producer. The following lines describes these methods.

ProducerBuilder configuration methods

The withConfigOption method sets a \RdKafka\Conf::class option. You can check all available options here. This methods set one config per call, and you can use withConfigOptions passing an array of config name and config value as argument. Here's an example:

use Junges\Kafka\Facades\Kafka;

Kafka::publishOn('broker', 'topic')
    ->withConfigOption('property-name', 'property-value')
    ->withConfigOptions([
        'property-name' => 'property-value'
    ]);

While you are developing your application, you can enable debug with the withDebugEnabled method. To disable debug mode, you can use ->withDebugEnabled(false), or withDebugDisabled methods.

use Junges\Kafka\Facades\Kafka;

Kafka::publishOn('broker', 'topic')
    ->withConfigOption('property-name', 'property-value')
    ->withConfigOptions([
        'property-name' => 'property-value'
    ])
    ->withDebugEnabled() // To enable debug mode
    ->withDebugDisabled() // To disable debug mode
    ->withDebugEnabled(false) // Also to disable debug mode

Configuring the Kafka message payload

In kafka, you can configure your payload with a message, message headers and message key. All these configurations are available within ProducerBuilder class.

Configuring message headers

To configure the message headers, use the withHeaders method:

use Junges\Kafka\Facades\Kafka;

Kafka::publishOn('broker', 'topic')
    ->withHeaders([
        'header-key' => 'header-value'
    ])

Configure the message payload

You can configure the message with the withMessage or withMessageKey methods.

The withMessage sets the entire message, and it accepts a Junges\Kafka\Message::class instance as argument.

This is how you should use it:

use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message;

$message = new Message(
    headers: ['header-key' => 'header-value'],
    message: ['key' => 'value'],
    key: 'kafka key here'  
)

Kafka::publishOn('broker', 'topic')->withMessage($message);

The withMessageKey method sets only a key in your message.

use Junges\Kafka\Facades\Kafka;

Kafka::publishOn('broker', 'topic')->withMessageKey('key', 'value');

Using Kafka Keys

In Kafka, keys are used to determine the partition within a log to which a message get's appended to. If you want to use a key in your message, you should use the withKafkaKey method:

use Junges\Kafka\Facades\Kafka;

Kafka::publishOn('broker', 'topic')->withKafkaKey('your-kafka-key');

Sending the message to Kafka

After configuring all your message options, you must use the send method, to send the message to kafka.

use Junges\Kafka\Facades\Kafka;

/** @var \Junges\Kafka\Producers\ProducerBuilder $producer */
$producer = Kafka::publishOn('broker', 'topic')
    ->withConfigOptions(['key' => 'value'])
    ->withKafkaKey('your-kafka-key')
    ->withKafkaKey('kafka-key')
    ->withHeaders(['header-key' => 'header-value']);

$producer->send();

Consuming Kafka Messages

If your application needs to read messages from a Kafka topic, you must create a consumer object, subscribe to the appropriate topic and start receiving messages.

To create a consumer using this package, you can use the createConsumer method, on Kafka facade:

use Junges\Kafka\Facades\Kafka;

$consumer = Kafka::createConsumer('broker');

This method returns a Junges\Kafka\Consumers\ConsumerBuilder::class instance, and you can use it to configure your consumer.

Subscribing to a topic

With a consumer created, you can subscribe to a kafka topic using the subscribe method:

use Junges\Kafka\Facades\Kafka;

$consumer = Kafka::createConsumer('broker')->subscribe('topic');

Of course, you can subscribe to more than one topic at once, either using an array of topics or specifying one by one:

use Junges\Kafka\Facades\Kafka;

$consumer = Kafka::createConsumer('broker')->subscribe('topic-1', 'topic-2', 'topic-n');

// Or, using array:
$consumer = Kafka::createConsumer('broker')->subscribe([
    'topic-1',
    'topic-2',
    'topic-n'
]);

Configuring consumer groups

Kafka consumers belonging to the same consumer group share a group id. THe consumers in a group divides the topic partitions as fairly amongst themselves as possible by establishing that each partition is only consumed by a single consumer from the group.

To attach your consumer to a consumer group, you can use the method withConsumerGroupId to specify the consumer group id:

use Junges\Kafka\Facades\Kafka;

$consumer = Kafka::createConsumer('broker')->withConsumerGroupId('foo');

Configuring message handlers

Now that you have created your kafka consumer, you must create a handler for the messages this consumer receives. By default, a consumer is any callable. You can use an invokable class or a simple callback. Use the withHandler method to specify your handler:

$consumer = \Junges\Kafka\Facades\Kafka::createConsumer('broker');

// Using callback:
$consumer->withHandler(function(\RdKafka\Message $message) {
    // Handle your message here
});

Or, using a invokable class:

class Handler
{
    public function __invoke(\RdKafka\Message $message){
        // Handle your message here
    }
}

$consumer = \Junges\Kafka\Facades\Kafka::createConsumer('broker')->withHandler(Handler::class)

Configuring max messages to be consumed

If you want to consume a limited amount of messages, you can use the withMaxMessages method to set the max number of messages to be consumed by a kafka consumer:

$consumer = \Junges\Kafka\Facades\Kafka::createConsumer('broker')->withMaxMessages(2);

Configuring a dead letter queue

In kafka, a Dead Letter Queue (or DLQ), is a simple kafka topic in the kafka cluster which acts as the destination for messages that were not able to make it to the desired destination due to some error.

To create a dlq in this package, you can use the withDlq method. If you don't specify the DLQ topic name, it will be created based on the topic you are consuming, adding the -dlq suffix to the topic name.

$consumer = \Junges\Kafka\Facades\Kafka::createConsumer('broker')->withDlq();

//Or, specifying the dlq topic name:
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer('broker')->withDlq('your-dlq-topic-name')

Using SASL

SASL allows your producers and your consumers to authenticate to your Kafka cluster, which verifies their identity. It's also a secure way to enable your clients to endorse an identity. To provide SASL configuration, you can use the withSasl method, passing a Junges\Kafka\Config\Sasl instance as the argument:

$consumer = \Junges\Kafka\Facades\Kafka::createConsumer('broker')
    ->withSasl(new \Junges\Kafka\Config\Sasl(
        password: 'password',
        username: 'username'
        mechanisms: 'authentication mechanism'
    ));

Using middlewares

Middlewares provides a convenient way to filter and inspecting your Kafka messages. To write a middleware in this package, you can use the withMiddleware method. The middleware is a callable in which the first argument is the message itself and the second one is the next handler. The middlewares get executed in the order they are defined,

$consumer = \Junges\Kafka\Facades\Kafka::createConsumer('broker')
    ->withMiddleware(function($message, callable $next) {
        // Perform some work here
        return $next($message);
    });

Using auto commit

The auto-commit check is called in every poll and it checks that the time elapsed is greater than the configured time. To enable auto commit, use the withAutoCommit method:

$consumer = \Junges\Kafka\Facades\Kafka::createConsumer('broker')->withAutoCommit();

Setting Kafka configuration options

To set configuration options, you can use two methods: withOptions, passing an array of option and option value or, using the `withOption method and passing two arguments, the option name and the option value.

$consumer = \Junges\Kafka\Facades\Kafka::createConsumer('broker')
    ->withOptions([
        'option-name' => 'option-value'
    ]);
// Or:
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer('broker')
    ->withOption('option-name', 'option-value');

Building the consumer

When you have finished configuring your consumer, you must call the build method, which returns a Junges\Kafka\Consumers\Consumer instance.

$consumer = \Junges\Kafka\Facades\Kafka::createConsumer('broker')
    // Configure your consumer here
    ->build();

Consuming the kafka messages

After building the consumer, you must call the consume method to consume the messages:

$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->build();

$consumer->consume();

Using Kafka::fake()

When testing your application, you may wish to "mock" certain aspects of the app, so they are not actually executed during a given test. This package provides convenient helpers for mocking the kafka producer out of the box. These helpers primarily provide a convenience layer over Mockery so you don't have to manually make complicated Mockery method calls.

The Kafka facade also provides methods to perform assertions over published messages, such as assertPublished, assertPublishedOn and assertNothingPublished.

use Junges\Kafka\Facades\Kafka;
use PHPUnit\Framework\TestCase;

class MyTest extends TestCase
{
     public function testMyAwesomeApp()
     {
         Kafka::fake();
         
         $producer = Kafka::publishOn('broker', 'topic')
             ->withHeaders(['key' => 'value'])
             ->withMessageKey('foo', 'bar');
             
         $producer->send();
             
         Kafka::assertPublished($producer->getMessage());       
     }
}

If you want to assert that a message was published in a specific kafka topic, you can use the assertPublishedOn method:

use PHPUnit\Framework\TestCase;
use Junges\Kafka\Facades\Kafka;

class MyTest extends TestCase
{
    public function testWithSpecificTopic()
    {
        Kafka::fake();
        
        $producer = Kafka::publishOn('broker', 'some-kafka-topic')
            ->withHeaders(['key' => 'value'])
            ->withMessageKey('key', 'value');
            
        $producer->send();
        
        Kafka::assertPublishedOn('some-kafka-topic', $producer->getMessage());
    }
}

You can also use a callback function to perform assertions within the message using a callback in which the argument is the published message itself.

use PHPUnit\Framework\TestCase;
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message;

class MyTest extends TestCase
{
    public function testWithSpecificTopic()
    {
        Kafka::fake();
        
        $producer = Kafka::publishOn('broker', 'some-kafka-topic')
            ->withHeaders(['key' => 'value'])
            ->withMessageKey('key', 'value');
            
        $producer->send();
        
        Kafka::assertPublishedOn('some-kafka-topic', $producer->getMessage(), function(Message $message) {
            return $message->getHeaders()['key'] === 'value';
        });
    }
} 

You can also assert that nothing was published at all, using the assertNothingPublished:

use PHPUnit\Framework\TestCase;
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message;

class MyTest extends TestCase
{
    public function testWithSpecificTopic()
    {
        Kafka::fake();
        
        if (false) {
            $producer = Kafka::publishOn('broker', 'some-kafka-topic')
                ->withHeaders(['key' => 'value'])
                ->withMessageKey('key', 'value');
                
            $producer->send();
        }
        
        Kafka::assertNothingPublished();
    }
} 

Testing

Run composer test to test this package.

Contributing

Thank you for considering contributing for the Laravel ACL package! The contribution guide can be found here.

Credits

License

The Laravel Kafka package is open-sourced software licenced under the MIT License. Please see the License File for more information.

Comments
  • [v1.7.x] Additional option to stop consumer after last message on topic

    [v1.7.x] Additional option to stop consumer after last message on topic

    Hi, I really miss such a function, so I decided to try to implement it myself. I am waiting for an answer to how much this option is correct and whether it will be added to the package.

    opened by StounhandJ 17
  • [QUESTION] Call to undefined method RdKafka\ProducerTopic::producev()

    [QUESTION] Call to undefined method RdKafka\ProducerTopic::producev()

    Hello,

    I am getting error " Call to undefined method RdKafka\ProducerTopic::producev() " on creating producer. i have also installed librdkafka using command "$ apt install librdkafka-dev"

    I am using below environment. PHP : 8.0.12 OS : Linux rdkafka : 5.0.0

    Thanks, Dilip Screenshot from 2021-10-29 19-54-46

    question wontfix 
    opened by dilip-E2 10
  • [QUESTION]How to save consumer message in database?

    [QUESTION]How to save consumer message in database?

    @mateusjunges Hi i am trying to save data in database but not save in database.

    <?php
    
    namespace App\Handlers;
    
    use Illuminate\Support\Facades\Log;
    use Junges\Kafka\Contracts\KafkaConsumerMessage;
    use App\Http\Controllers\TermsController;
    use App\Model\Aquib;
    use DB;
    
    class TestHandler
    {
        public function __invoke(KafkaConsumerMessage $message)
        {
            $values = array('ab' => 1);
           DB::table('test')->insert($values);
    
            
            $var = $message->getBody();
            Log::debug('Message received!', [
                'body' => $var,
                'headers' => $message->getHeaders(),
                'partition' => $message->getPartition(),
                'key' => $message->getKey(),
                'topic' => $message->getTopicName()
            ]);
            
            
        }
    }
    
    opened by maquib 9
  • [BUG] stopAfterLastMessage not working

    [BUG] stopAfterLastMessage not working

    I'm calling the method stopAfterLastMessage() on the consumer building but the consumption don't stop.

    I'm on ubuntu 22.04, running php 8.1 with rdkafka 6.0.3, laravel-kafka:1.8.1.

    I check the code and in line 349 we have this: if ($this->config->shouldStopAfterLastMessage() && RD_KAFKA_RESP_ERR__PARTITION_EOF === $message->err) { RD_KAFKA_RESP_ERR__PARTITION_EOF equals -191 but $message->err is returning -185, i try to send a pull request but i get access denied, so i am sending the code that would be like this:

        private const CONSUMER_STOP_ERRORS = [
            RD_KAFKA_RESP_ERR__PARTITION_EOF,
            RD_KAFKA_RESP_ERR__TIMED_OUT
        ];
        
       ....
       
        if ($this->config->shouldStopAfterLastMessage() && in_array($message->err, self::CONSUMER_STOP_ERRORS)) {
            $this->stopConsume();
        }
    
    opened by daavelar 8
  • [QUESTION] Getting

    [QUESTION] Getting "Unsupported value "snappy" for configuration property "compression.codec" while creating a consumer

    Hi @mateusjunges , i have the same error as #105 but when creating a consumer :

    $builder = Kafka::createConsumer($this->topics, $this->group, $this->host.':'.$this->port);
    $builder->withHandler(function(KafkaConsumerMessage $message) {
    	// Handle your message here
    	echo $message->getBody();
    	echo $message->getTopicName();
    })
    ->stopAfterLastMessage()
    ->withMaxMessages(100);
    
    $consumer = $builder->build();
    
    $consumer->consume();
    

    I'm using the following versions (on windows 11) :

    laravel-kafka : 1.8.1 rdKafka : 6.0.1 librdkafka version (runtime) : 1.6.2 librdkafka version (build) :| 1.6.2.255

    I tried to change the KAFKA_COMPRESSION_TYPE option to "none" or "gzip" without any success.

    Note that by following the high level consumer exemple from rdkafka i am receiving message just fine.

    Any ideas what am i missing ?

    Thanks

    opened by grunk 8
  • [QUESTION] CouldNotPublishMessage exception: Sent messages may not be completed yet.

    [QUESTION] CouldNotPublishMessage exception: Sent messages may not be completed yet.

    @mateusjunges can you plz help me on this error is coming when am trying to send message to kafka. Here is my code, Screenshot (91)

    Kafka::publishOn('broker', 'topic')
        ->withBodyKey('foo', 'bar')
        ->send();
    
    return response('ok');
    
    question 
    opened by maquib 8
  • [QUESTION] Is possible to publish and consume messages in Aws MSK with SASL enabled?

    [QUESTION] Is possible to publish and consume messages in Aws MSK with SASL enabled?

    Hi, i'm trying to publish a message to an AWS MSK broker using SASL authentication with SCRAM mechanism, but i'm getting the message "Sent messages may not be completed yet." as response. Below you can see my code:

          $message = new Message(
                body: $request->get('message'),
            );
    
            $sasl = new Sasl(
                username: 'myuser',
                password: 'mypass',
                mechanisms: 'SCRAM-SHA-256'
            );
    
            try {
                Kafka::publishOn('test')
                    ->withSasl($sasl)
                    ->withMessage($message)
                    ->send();
            } catch (CouldNotPublishMessage $exception) {
                return back()->with('error', $exception->getMessage());
            }
    

    Am I missing something or there is some new feature needed to publish in AWS MSK?

    opened by daavelar 7
  • [BUG] `handleBatch` timer issue

    [BUG] `handleBatch` timer issue

    Hi, I noticed a little problem with batch consuming messages.

    Let's say I have a topic full of hundreds of messages, and I want to consume by batches of 20 messages. What happens : the consumer reads 20 messages, and then 1, then 20, then 1, then 20, 1, 20, 1, etc It would be better to try to have 20, 20, 20, 20, 20 etc (assuming we can get 20 before timeout of the batch).

    The problem is here : image

    As there is a return, the timer isn't reset if we reach batch max size and the timer continue, so maybe the next time the function will be called the timer will have timeout already ...

    Removing the return and add a elseif solves the problem : fix (Yes, it's the same code in both if... so it could be better, but you got the idea)

    Something to fix for next release ?

    bug 
    opened by jchambondynadmic 6
  • [BUG] Cannot declare class `App\Console\Kernel`, because the name is already in use

    [BUG] Cannot declare class `App\Console\Kernel`, because the name is already in use

    Hi, I'm using laravel-opcache and laravel-octane-dockerfile with Laravel Octane. Before starting the server and after running php artisan compile command to pre-compile application code (includes vendor directory), the following error will be thrown:

     Symfony\Component\ErrorHandler\Error\FatalError 
    
      Cannot declare class App\Console\Kernel, because the name is already in use
    
      at vendor/mateusjunges/laravel-kafka/dev/laravel-console-kernel.php:0
          1â–• <?php
          2â–• 
          3â–• namespace App\Console;
          4â–• 
          5â–• use Illuminate\Console\Scheduling\Schedule;
          6â–• use Illuminate\Foundation\Console\Kernel as ConsoleKernel;
          7â–• 
          8â–• class Kernel extends ConsoleKernel
          9â–• {
    
    
       Whoops\Exception\ErrorException 
    
      Cannot declare class App\Console\Kernel, because the name is already in use
    
      at vendor/mateusjunges/laravel-kafka/dev/laravel-console-kernel.php:0
          1â–• <?php
          2â–• 
          3â–• namespace App\Console;
          4â–• 
          5â–• use Illuminate\Console\Scheduling\Schedule;
          6â–• use Illuminate\Foundation\Console\Kernel as ConsoleKernel;
          7â–• 
          8â–• class Kernel extends ConsoleKernel
          9â–• {
    
          +1 vendor frames 
      2   [internal]:0
          Whoops\Run::handleShutdown()
    

    I tried to exclude vendor/mateusjunges/laravel-kafka/dev directory from pre-compiling in laravel-opcache package config, but the problem still exists.

    opened by smortexa 6
  • Reworked testing framework so that it properly checks what was dispatched

    Reworked testing framework so that it properly checks what was dispatched

    This PR originally was to fix incorrect testing code in src/Support/Testing/Fakes/KafkaFake.php

    Previously this code, the published method was running the callback against the $message variable that was actually passed in during the test definition. This made no sense as the filter ran on the same object everytime so wasn't actually performing a truthy check on the messages that were actually dispatched.

    private function published(KafkaProducerMessage $message = null, $callback = null): Collection
    {
        if (! $this->hasPublished()) {
            return collect();
        }
    
        $callback = $callback ?: function () {
            return true;
        };
    
    
        return collect($this->getPublishedMessages())->filter(function ($_, $topic) use ($message, $callback) {
            return $callback($message, $topic);
        });
    }
    

    I reworked this to perform the truthy test against the published messages and reworked the $message variable so that it can be used as an expected message. I also wrote a default callback to compare the data in the message. This allows you to write tests like

    Kafka::fake();
    Kafka::publishOn('topic')->withHeaders(['custom' => 'header'])->send();
    Kafka::assertPublished((new Message)->withHeaders(['custom' => 'header']);
    

    Without this change all tests pass regardless of what you pass in. I refactored all test assert methods to use this new style.


    One thing i cannot figure out is why the publishedMessages is stored on the ProducerFake object. This object is recreated everytime a new message is published.

    This means that if you run the following test always fails because it can only ever be as high as 1.

    Kafka::fake();
    Kafka::publishOn('topic')->withHeaders(['custom' => 'header'])->send();
    Kafka::publishOn('topic')->withHeaders(['custom' => 'header2'])->send();
    Kafka::assertPublishedOnTimes('topic', 2)
    
    opened by thetomcake 6
  • [QUESTION] Milliseconds retry parameter not being set when calling flush.

    [QUESTION] Milliseconds retry parameter not being set when calling flush.

    Hello,

    First of all, thanks for the hard work with a very simple-to-use package! I've had a bit of an issue where some Kafka messages were getting lost. I was using the send method in your ProducerBuilder class:

    public function send(): bool
    {
        $producer = $this->build();
    
        return $producer->produce($this->message);
    }
    

    Since I was producing lots of messages, I changed my code to use the sendBatch method, which fixed my issue and everything worked smoothly after.

    After that, I dug into the problem a bit more and I realized the flush method in the Producer class is using the Laravel retry logic:

    private function flush(): mixed
    {
        return retry(10, function () {
            $result = $this->producer->flush(1000);
    
            if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
                return true;
            }
    
            $message = rd_kafka_err2str($result);
    
            throw CouldNotPublishMessage::withMessage($message, $result);
        });
    }
    

    I was surprised since it seemed to me not as many messages should've been lost for me, even by sending them individually (with the send method I shared above) to Kafka with this nice retry logic when flushing. Then I realized, for the flush retry, the sleepMilliseconds parameter is not being set. So the code doesn't seem to be sleeping for any amount of time before retrying. I was just curious about why you took that decision. I was thinking it could be better to sleep for some milliseconds and maybe you could even retry less than 10 times. Being that this method is used for both, the send and sendBatch methods, I think it would be worth it.

    Also, curious about why you are using the retry Larvel helper here, but not the retry method in the Retryable class you created. Wondering if the same retry method should be used everywhere in the package.

    Thanks again for your work!

    opened by joanorrit 5
  • [BUG] Exception handling

    [BUG] Exception handling

    Describe the bug On throwing custom exceptions, the report() method of the exception is not called.

    To Reproduce Steps to reproduce the behavior: 1- Have a fresh Larvel installation. 2- Make a custom exception using php artisan make:exception TestException, then do something in your report() method of the exception, for example, Log::info('some info'). 3- Create a consumer, then within the handler() of the consumer, throw the TestException.

    Expected behavior By default, if a custom exception is thrown in Laravel, if it has a report() method, it's gonna get called. this works just fine outside of the consumer handle() method, and the report() method gets called.

    Desktop (please complete the following information):

    • OS: Ubuntu 21.04
    • mateusjunges/laravel-kafka v1.10
    • PHP 8.1
    opened by behzadev 0
  • Package v2

    Package v2

    New major version

    We now follows semantic versioning

    Changes

    • The \Junges\Kafka\Contracts\CanProduceMessages contract was renamed to \Junges\Kafka\Contracts\MessageProducer

    • The \Junges\Kafka\Contracts\CanPublishMessagesToKafka contract was renamed to \Junges\Kafka\Contracts\MessagePublisher

    • The \Junges\Kafka\Contracts\KafkaProducerMessage contract was renamed to \Junges\Kafka\Contracts\ProducerMessage

    • The \Junges\Kafka\Contracts\CanConsumeMessages was renamed to \Junges\Kafka\Contracts\MessageConsumer

    • The \Junges\Kafka\Contracts\KafkaConsumerMessage was renamed to \Junges\Kafka\Contracts\ConsumerMessage

    • The \Junges\Kafka\Contracts\CanConsumeMessagesFromKafka was renamed to \Junges\Kafka\Contracts\ConsumeMessagesFromKafka

    • The \Junges\Kafka\Contracts\CanConsumeBatchMessages contract was renamed to \Junges\Kafka\Contracts\BatchMessageConsumer

    • The \Junges\Kafka\Contracts\CanConsumeMessages contract was renamed to \Junges\Kafka\Contracts\MessageConsumer

    • Using strict types across the codebase

    • Added a upgrade guide

    • Upgrade the code to use PHP 8.1 features

    • The withSasl method now accepts all SASL parameters instead of a Sasl object.

    public function withSasl(string $username, string $password, string $mechanisms, string $securityProtocol = 'SASL_PLAINTEXT');
    

    WIP

    opened by mateusjunges 0
  • [QUESTION] Consumer connecting to a Broker that is not available - exception/error

    [QUESTION] Consumer connecting to a Broker that is not available - exception/error

    From what I could find whenever the broker is down, consumer will log the following messages:

    %3|1665746426.651|FAIL|rdkafka#producer-2| [thrd:ssl://broker:9092/bootstrap]: ssl://broker:9092/bootstrap: Failed to resolve 'broker:9092': No address associated with hostname (after 3ms in state CONNECT) %3|1665746426.651|ERROR|rdkafka#producer-2| [thrd:ssl://broker:9092/bootstrap]: 1/1 brokers are down

    Is there currently, somehow, a way to report or handle this error?

    Thanks.

    opened by joaoccmatos 1
  • [QUESTION] Health check when the consumer can start consuming

    [QUESTION] Health check when the consumer can start consuming

    Hello!

    I am trying to figure out if a healthcheck can be configured or added.

    When I start the consumer in our kubernetes cluster it takes a solid 30 seconds for the consumer to start consuming and I want to see if I can add a healthcheck so I can keep the old consumers running until the new one can actually start consuming. This would prevent having message lag when updating our consumers

    Is there an easy way to implement a healthcheck for this?

    feature request enhancement 
    opened by TigoCoinmerce 5
  • [QUESTION] Consumer not working with stopAfterLastMessage enabled

    [QUESTION] Consumer not working with stopAfterLastMessage enabled

    I have this code in a Laravel Command:

    Kafka::createConsumer()
        ->subscribe($this->topic)
        ->withConsumerGroupId(config('kafka.consumer_group_id'))
        ->withHandler(fn (KafkaConsumerMessage $message) => $this->handler($handler, $message))
        ->stopAfterLastMessage()
        ->build()
        ->consume();
    

    This command is executed in a cronjob every minute.

    The problem is that never read any message from kafka, the pending queue offset is always in same position.

    If I remove the stopAfterLastMessage option, the command will be executed without stop and then they can read the messages without problem.

    Kafka::createConsumer()
        ->subscribe($this->topic)
        ->withConsumerGroupId(config('kafka.consumer_group_id'))
        ->withHandler(fn (KafkaConsumerMessage $message) => $this->handler($handler, $message))
        ->build()
        ->consume();
    

    My config is:

    <?php
    
    return [
        /*
         | Your kafka brokers url.
         */
        'brokers' => env('KAFKA_BROKERS', 'localhost:9092'),
    
        /*
         | Kafka consumers belonging to the same consumer group share a group id.
         | The consumers in a group then divides the topic partitions as fairly amongst themselves as possible by
         | establishing that each partition is only consumed by a single consumer from the group.
         | This config defines the consumer group id you want to use for your project.
         */
        'consumer_group_id' => env('KAFKA_CONSUMER_GROUP_ID', 'group'),
    
        /*
         | After the consumer receives its assignment from the coordinator,
         | it must determine the initial position for each assigned partition.
         | When the group is first created, before any messages have been consumed, the position is set according to a configurable
         | offset reset policy (auto.offset.reset). Typically, consumption starts either at the earliest offset or the latest offset.
         | You can choose between "latest", "earliest" or "none".
         */
        'offset_reset' => env('KAFKA_OFFSET_RESET', 'earliest'),
    
        /*
         | If you set enable.auto.commit (which is the default), then the consumer will automatically commit offsets periodically at the
         | interval set by auto.commit.interval.ms.
         */
        'auto_commit' => env('KAFKA_AUTO_COMMIT', true),
    
        'sleep_on_error' => env('KAFKA_ERROR_SLEEP', 5),
    
        'partition' => env('KAFKA_PARTITION', -1),
    
        /*
         | Kafka supports 4 compression codecs: none , gzip , lz4 and snappy
         */
        'compression' => env('KAFKA_COMPRESSION_TYPE', 'snappy'),
    
        /*
         | Choose if debug is enabled or not.
         */
        'debug' => env('KAFKA_DEBUG', false),
    
        /*
         | Repository for batching messages together
         | Implement BatchRepositoryInterface to save batches in different storage
         */
        'batch_repository' => env('KAFKA_BATCH_REPOSITORY', \Junges\Kafka\BatchRepositories\InMemoryBatchRepository::class),
    ];
    

    Any idea why can not read the pending queue messages when stopAfterLastMessage is enabled?

    I can read the pending queue messages using the kafka command ./bin/kafka-console-consumer.sh --topic topic --bootstrap-server localhost:9092

    Thanks!

    opened by eusonlito 0
Releases(v1.10.0)
  • v1.10.0(Dec 18, 2022)

    What's Changed

    • Drop support for laravel 8 and rdkafka 4.0 by @mateusjunges in https://github.com/mateusjunges/laravel-kafka/pull/158
    • Add support for php8.2 and drop support for php8.0 by @mateusjunges in https://github.com/mateusjunges/laravel-kafka/pull/159

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.9.3...v1.10.0

    Source code(tar.gz)
    Source code(zip)
  • v1.9.3(Nov 29, 2022)

  • v1.9.2(Nov 29, 2022)

    What's Changed

    • Allow to configure sleep timeout on flush retry by @mateusjunges in https://github.com/mateusjunges/laravel-kafka/pull/156

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.9.1...v1.9.2

    Source code(tar.gz)
    Source code(zip)
  • v1.9.1(Nov 2, 2022)

    What's Changed

    • Filter config options for consumer and producers by @mateusjunges in #151

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.9.0...v1.9.1

    Source code(tar.gz)
    Source code(zip)
  • v1.9.0(Oct 28, 2022)

    What's Changed

    • Make Kafka class usable through interface injection by @mosharaf13 in https://github.com/mateusjunges/laravel-kafka/pull/150

    New Contributors

    • @mosharaf13 made their first contribution in https://github.com/mateusjunges/laravel-kafka/pull/150

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.8.9...v1.9.0

    Source code(tar.gz)
    Source code(zip)
  • v1.8.9(Oct 4, 2022)

    What's Changed

    • Fix batch message processing by @mateusjunges in https://github.com/mateusjunges/laravel-kafka/pull/143

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.8.8...v1.8.9

    Source code(tar.gz)
    Source code(zip)
  • v1.8.8(Sep 12, 2022)

    What's Changed

    • [v1.8.x] Fix docs by @mateusjunges in https://github.com/mateusjunges/laravel-kafka/pull/137
    • [v1.8.x] Add methods to configure config callbacks by @mateusjunges in https://github.com/mateusjunges/laravel-kafka/pull/136
    • [v1.8.x] Customize Deserializer in kafka:consume Command by @cragonnyunt in https://github.com/mateusjunges/laravel-kafka/pull/140

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.8.7...v1.8.8

    Source code(tar.gz)
    Source code(zip)
  • v1.8.7(Aug 22, 2022)

    What's Changed

    • Make consumer timeout configurable (fixes #132) by @mateusjunges in https://github.com/mateusjunges/laravel-kafka/pull/134

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.8.6...v1.8.7

    Source code(tar.gz)
    Source code(zip)
  • v1.8.6(Aug 17, 2022)

    What's Changed

    • [v1.8.x] Use stub files by @smortexa in https://github.com/mateusjunges/laravel-kafka/pull/129

    New Contributors

    • @smortexa made their first contribution in https://github.com/mateusjunges/laravel-kafka/pull/129

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.8.5...v1.8.6

    Source code(tar.gz)
    Source code(zip)
  • v1.8.5(Aug 16, 2022)

    What's Changed

    • [v1.8.x] Add restart command by @gajosu in https://github.com/mateusjunges/laravel-kafka/pull/119
    • [v1.8.x] Use dummy world instead of laravel namespace in dev classes by @mateusjunges in https://github.com/mateusjunges/laravel-kafka/pull/128

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.8.4...v1.8.5

    Source code(tar.gz)
    Source code(zip)
  • v1.8.4(Aug 2, 2022)

    What's Changed

    • Fix compression type to be read from config file by @mateusjunges in https://github.com/mateusjunges/laravel-kafka/pull/123

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.8.3...v1.8.4

    Source code(tar.gz)
    Source code(zip)
  • v1.8.3(Aug 2, 2022)

    What's Changed

    • Fixes #120 by @mateusjunges in https://github.com/mateusjunges/laravel-kafka/pull/122

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.8.2...v1.8.3

    Source code(tar.gz)
    Source code(zip)
  • v1.8.2(Jul 22, 2022)

    What's Changed

    • [v1.8.x] Added missing return type + missing docblocks by @mateusjunges in https://github.com/mateusjunges/laravel-kafka/pull/115
    • [v1.8.x] Resolve consumer via service container by @cragonnyunt in https://github.com/mateusjunges/laravel-kafka/pull/118

    New Contributors

    • @cragonnyunt made their first contribution in https://github.com/mateusjunges/laravel-kafka/pull/118

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.8.1...v1.8.2

    Source code(tar.gz)
    Source code(zip)
  • v1.8.1(Jun 14, 2022)

    What's Changed

    • [v1.8.x] Better error handling when flush returns error by @mateusjunges in https://github.com/mateusjunges/laravel-kafka/pull/112

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.8.0...v1.8.1

    Source code(tar.gz)
    Source code(zip)
  • v1.8.0(Jun 13, 2022)

    What's Changed

    • [v1.7.x] Message consumer Mock by @gajosu in https://github.com/mateusjunges/laravel-kafka/pull/107
    • [v1.7.x] Fix docker file by @gajosu in https://github.com/mateusjunges/laravel-kafka/pull/108
    • [v1.7.x] Add Batch Support for Message consumer Mock by @gajosu in https://github.com/mateusjunges/laravel-kafka/pull/109
    • [v1.7.x] Added consumer contracts by @mateusjunges in https://github.com/mateusjunges/laravel-kafka/pull/110
    • [v1.7.x] Add docs for mocking consumers by @mateusjunges in https://github.com/mateusjunges/laravel-kafka/pull/111
    • [v1.7.x] Additional option to stop consumer after last message on topic by @StounhandJ in https://github.com/mateusjunges/laravel-kafka/pull/103

    New Contributors

    • @StounhandJ made their first contribution in https://github.com/mateusjunges/laravel-kafka/pull/103

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.7.7...v1.8.0

    Source code(tar.gz)
    Source code(zip)
  • v1.7.7(Jun 3, 2022)

    What's Changed

    • [v1.7.x] Added missing comas in the class params by @shanginn in https://github.com/mateusjunges/laravel-kafka/pull/101
    • [v1.7.x] Link to the current version documentation fixed by @Elnadrion in https://github.com/mateusjunges/laravel-kafka/pull/104
    • [v1.7.x] Fix securityProtocol sasl config in consumer by @gajosu in https://github.com/mateusjunges/laravel-kafka/pull/106

    New Contributors

    • @shanginn made their first contribution in https://github.com/mateusjunges/laravel-kafka/pull/101
    • @gajosu made their first contribution in https://github.com/mateusjunges/laravel-kafka/pull/106

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.7.6...v1.7.7

    Source code(tar.gz)
    Source code(zip)
  • v1.7.6(May 18, 2022)

    What's Changed

    • [v1.7.x] Deprecation warning fix by @Elnadrion in https://github.com/mateusjunges/laravel-kafka/pull/100

    New Contributors

    • @Elnadrion made their first contribution in https://github.com/mateusjunges/laravel-kafka/pull/100

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.7.5...v1.7.6

    Source code(tar.gz)
    Source code(zip)
  • v1.7.5(May 2, 2022)

    What's Changed

    • [v1.7.x] Added SASL Security Protocol to getSasl method by @mateusjunges in https://github.com/mateusjunges/laravel-kafka/pull/97

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.7.4...v1.7.5

    Source code(tar.gz)
    Source code(zip)
  • v1.7.4(Apr 25, 2022)

    What's Changed

    • [v1.7.x] Do not allow setting dlq without subscribing to topics by @mateusjunges in https://github.com/mateusjunges/laravel-kafka/pull/95

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.7.3...v1.7.4

    Source code(tar.gz)
    Source code(zip)
  • v1.7.3(Apr 19, 2022)

    What's Changed

    • Fix facade docblock by @nmfzone in https://github.com/mateusjunges/laravel-kafka/pull/93

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.7.2...v1.7.3

    Source code(tar.gz)
    Source code(zip)
  • v1.6.6(Apr 19, 2022)

    What's Changed

    • Fix facade docblock by @nmfzone in https://github.com/mateusjunges/laravel-kafka/pull/93

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.6.5...v1.6.6

    Source code(tar.gz)
    Source code(zip)
  • v1.7.2(Apr 7, 2022)

    What's Changed

    • [1.6.x] Fix serialize by @lukecurtis93 in https://github.com/mateusjunges/laravel-kafka/pull/92

    New Contributors

    • @lukecurtis93 made their first contribution in https://github.com/mateusjunges/laravel-kafka/pull/92

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.7.1...v1.7.2

    Source code(tar.gz)
    Source code(zip)
  • v1.6.5(Apr 7, 2022)

    What's Changed

    • [1.6.x] If callback is provided, filter of published messages should use it by @nmfzone in https://github.com/mateusjunges/laravel-kafka/pull/87
    • [1.6.x] Fix serialize by @lukecurtis93 in https://github.com/mateusjunges/laravel-kafka/pull/92

    New Contributors

    • @nmfzone made their first contribution in https://github.com/mateusjunges/laravel-kafka/pull/87
    • @lukecurtis93 made their first contribution in https://github.com/mateusjunges/laravel-kafka/pull/92

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.6.4...v1.6.5

    Source code(tar.gz)
    Source code(zip)
  • v1.7.1(Mar 23, 2022)

    What's Changed

    • [1.7.x] Added support for batch producing and handling batch of messages by @vsvp21 in https://github.com/mateusjunges/laravel-kafka/pull/86

    New Contributors

    • @vsvp21 made their first contribution in https://github.com/mateusjunges/laravel-kafka/pull/86

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.7.0...v1.7.1

    Source code(tar.gz)
    Source code(zip)
  • v1.7.0(Mar 21, 2022)

    What's Changed

    • [1.6.x] If callback is provided, filter of published messages should use it by @nmfzone in https://github.com/mateusjunges/laravel-kafka/pull/87
    • [1.6.x] Add support for laravel 9 by @mateusjunges in https://github.com/mateusjunges/laravel-kafka/pull/88

    New Contributors

    • @nmfzone made their first contribution in https://github.com/mateusjunges/laravel-kafka/pull/87

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.6.4...v1.7.0

    Source code(tar.gz)
    Source code(zip)
  • v1.6.4(Feb 28, 2022)

    What's Changed

    • Use correct consumer group id config key by @mateusjunges in https://github.com/mateusjunges/laravel-kafka/pull/82

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.6.3...v1.6.4

    Source code(tar.gz)
    Source code(zip)
  • v1.6.3(Feb 17, 2022)

    What's Changed

    • [1.6.x] Add auto.offset.reset as consumer only config option by @mateusjunges in https://github.com/mateusjunges/laravel-kafka/pull/79

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.6.2...v1.6.3

    Source code(tar.gz)
    Source code(zip)
  • v1.6.2(Feb 16, 2022)

    What's Changed

    • Move v1.6x docs to junges.dev by @mateusjunges in https://github.com/mateusjunges/laravel-kafka/pull/72
    • Remove producer config property from getConsumerOptions method by @mateusjunges in https://github.com/mateusjunges/laravel-kafka/pull/76
    • [v1.6.x] Fix sasl authentication by @mateusjunges in https://github.com/mateusjunges/laravel-kafka/pull/78

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.5.4...v1.6.2

    Source code(tar.gz)
    Source code(zip)
  • v1.5.4(Jan 21, 2022)

    What's Changed

    • [1.3.x] Allow using sasl with lowercase config keys by @mateusjunges in https://github.com/mateusjunges/laravel-kafka/pull/71

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.4.6...v1.5.4

    Source code(tar.gz)
    Source code(zip)
  • v1.6.1(Jan 20, 2022)

    What's Changed

    • [1.6.x] Fixes #69 - Allow to use SASL with lowercase config keys by @mateusjunges in https://github.com/mateusjunges/laravel-kafka/pull/70

    Full Changelog: https://github.com/mateusjunges/laravel-kafka/compare/v1.6.0...v1.6.1

    Source code(tar.gz)
    Source code(zip)
Owner
Mateus Junges
BA in Computer Engineering, studying for a Master's degree in Computational Security at UFPR. Working as Backend Engineer at Paylivre and open source lover.
Mateus Junges
Laravel-model-mapper - Map your model attributes to class properties with ease.

Laravel Model-Property Mapper This package provides functionality to map your model attributes to local class properties with the same names. The pack

Michael Rubel 15 Oct 29, 2022
Cache chunks of your Blade markup with ease. 🔪

Blade Cache Directive Cache chunks of your Blade markup with ease. Installation You can install the package via Composer: composer require ryangjchand

Ryan Chandler 155 Dec 10, 2022
Cast your Eloquent model attributes to Value Objects with ease.

Laravel Value Objects Cast your Eloquent model attributes to value objects with ease! Requirements This package requires PHP >= 5.4. Using the latest

Red Crystal Code 23 Dec 30, 2022
Create inline partials in your Blade templates with ease

Create inline partials in your Blade templates with ease. This package introduces a new @capture directive that allows you to capture small parts of y

Ryan Chandler 27 Dec 8, 2022
Laravel User Activity Log - a package for Laravel 8.x that provides easy to use features to log the activities of the users of your Laravel app

Laravel User Activity Log - a package for Laravel 8.x that provides easy to use features to log the activities of the users of your Laravel app

null 9 Dec 14, 2022
A simple blog app where a user can signup , login, like a post , delete a post , edit a post. The app is built using laravel , tailwind css and postgres

About Laravel Laravel is a web application framework with expressive, elegant syntax. We believe development must be an enjoyable and creative experie

Nahom_zd 1 Mar 6, 2022
Laravel-OvalFi helps you Set up, test, and manage your OvalFi integration directly in your Laravel App.

OvalFi Laravel Package Laravel-OvalFi helps you Set up, test, and manage your OvalFi integration directly in your Laravel App. Installation You can in

Paul Adams 2 Sep 8, 2022
Podcastwala - Your very own Podcast web app built with Laravel. Manage and listen to your favorite podcasts

Podcastwala Your very own Podcast web app built with Laravel 5. This web app enables you to manage RSS feeds for your favorite podcasts and listen to

null 142 Sep 14, 2022
A web app for detecting backend technologies used in a web app, Based on wappalyzer node module

About Techdetector This a web fingerprinting application, it detects back end technologies of a given domain by using the node module wappalyzer. And

Shobi 17 Dec 30, 2022
CV-Resumes-App is helped us to build resume .. you can help me to improve this app...

About Laravel Laravel is a web application framework with expressive, elegant syntax. We believe development must be an enjoyable and creative experie

Eng Hasan Hajjar 2 Sep 30, 2022
Stop duplicating your Eloquent query scopes and constraints in PHP. This package lets you re-use your query scopes and constraints by adding them as a subquery.

Laravel Eloquent Scope as Select Stop duplicating your Eloquent query scopes and constraints in PHP. This package lets you re-use your query scopes an

Protone Media 75 Dec 7, 2022
Laravel Podcast is Laravel 5.5 web app that enables you to manage RSS feeds for your favorite podcasts and listen to the episodes in a seamless UI and User Authentication.

Laravel Podcast is Laravel 5.5 web app that enables you to manage RSS feeds for your favorite podcasts and listen to the episodes in a seamless UI and

Jeremy Kenedy 35 Dec 19, 2022
A super simple package allowing for use MySQL 'USE INDEX' and 'FORCE INDEX' statements.

Laravel MySQL Use Index Scope A super simple package allowing for use MySQL USE INDEX and FORCE INDEX statements. Requirements PHP ^7.4 | ^8.0 Laravel

Vasyl 29 Dec 27, 2022
A simple to use query builder for the jQuery QueryBuilder plugin for use with Laravel.

QueryBuilderParser Status Label Status Value Build Insights Code Climate Test Coverage QueryBuilderParser is designed mainly to be used inside Laravel

Tim Groeneveld 149 Nov 11, 2022
A Laravel package that allows you to use multiple ".env" files in a precedent manner. Use ".env" files per domain (multi-tentant)!

Laravel Multi ENVs Use multiple .envs files and have a chain of precedence for the environment variables in these different .envs files. Use the .env

Allyson Silva 48 Dec 29, 2022
Create and manage A Domain Driven Design (DDD) in your Laravel app, simply and efficiently.

Create and manage A Domain Driven Design (DDD) in your Laravel app, simply and efficiently.

Lucas Nepomuceno 4 Jun 11, 2022
Add tags and taggable behaviour to your Laravel app

Add tags and taggable behaviour to a Laravel app This package offers taggable behaviour for your models. After the package is installed the only thing

Spatie 1.4k Dec 29, 2022
Easily capture every incoming request and the corresponding outgoing response in your Laravel app.

Easily capture every incoming request and the corresponding outgoing response in your Laravel app. This package is designed to work only with the Lara

Mark Townsend 22 Nov 15, 2022
This package enables you to create and run a fully functioning WebSocket server in your Laravel app.

This package enables you to create and run a fully functioning WebSocket server in your Laravel app. It can optionally receive messages broadcast over ZeroMQ.

Asked.io 181 Oct 6, 2022