Laravel Kafka
Do you use Kafka in your laravel packages? All packages I've seen until today, including some built by myself, does not provide a nice syntax usage syntax or, if it does, the test process with these packages are very painful.
This package provides a nice way of producing and consuming kafka messages in your Laravel projects.
Follow these docs to install this package and start using kafka with ease.
Installation
To install this package, you must have installed PHP RdKafka extension. You can follow the steps here to install rdkafka in your system.
With RdKafka installed, require this package with composer:
composer require mateusjunges/laravel-kafka
Usage
After installing the package, you can start producing and consuming Kafka messages.
Producing Kafka Messages
To publish your messages to Kafka, you can use the publishOn
method, of Junges\Kafka\Facades\Kafka
class:
use Junges\Kafka\Facades\Kafka;
Kafka::publishOn('broker', 'topic');
This method returns a Junges\Kafka\Producers\ProducerBuilder::class
instance, and you can configure your message.
The ProducerBuilder
class contains a few methods to configure your kafka producer. The following lines describes these methods.
ProducerBuilder configuration methods
The withConfigOption
method sets a \RdKafka\Conf::class
option. You can check all available options here. This methods set one config per call, and you can use withConfigOptions
passing an array of config name and config value as argument. Here's an example:
use Junges\Kafka\Facades\Kafka;
Kafka::publishOn('broker', 'topic')
->withConfigOption('property-name', 'property-value')
->withConfigOptions([
'property-name' => 'property-value'
]);
While you are developing your application, you can enable debug with the withDebugEnabled
method. To disable debug mode, you can use ->withDebugEnabled(false)
, or withDebugDisabled
methods.
use Junges\Kafka\Facades\Kafka;
Kafka::publishOn('broker', 'topic')
->withConfigOption('property-name', 'property-value')
->withConfigOptions([
'property-name' => 'property-value'
])
->withDebugEnabled() // To enable debug mode
->withDebugDisabled() // To disable debug mode
->withDebugEnabled(false) // Also to disable debug mode
Configuring the Kafka message payload
In kafka, you can configure your payload with a message, message headers and message key. All these configurations are available within ProducerBuilder
class.
Configuring message headers
To configure the message headers, use the withHeaders
method:
use Junges\Kafka\Facades\Kafka;
Kafka::publishOn('broker', 'topic')
->withHeaders([
'header-key' => 'header-value'
])
Configure the message payload
You can configure the message with the withMessage
or withMessageKey
methods.
The withMessage
sets the entire message, and it accepts a Junges\Kafka\Message::class
instance as argument.
This is how you should use it:
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message;
$message = new Message(
headers: ['header-key' => 'header-value'],
message: ['key' => 'value'],
key: 'kafka key here'
)
Kafka::publishOn('broker', 'topic')->withMessage($message);
The withMessageKey
method sets only a key in your message.
use Junges\Kafka\Facades\Kafka;
Kafka::publishOn('broker', 'topic')->withMessageKey('key', 'value');
Using Kafka Keys
In Kafka, keys are used to determine the partition within a log to which a message get's appended to. If you want to use a key in your message, you should use the withKafkaKey
method:
use Junges\Kafka\Facades\Kafka;
Kafka::publishOn('broker', 'topic')->withKafkaKey('your-kafka-key');
Sending the message to Kafka
After configuring all your message options, you must use the send
method, to send the message to kafka.
use Junges\Kafka\Facades\Kafka;
/** @var \Junges\Kafka\Producers\ProducerBuilder $producer */
$producer = Kafka::publishOn('broker', 'topic')
->withConfigOptions(['key' => 'value'])
->withKafkaKey('your-kafka-key')
->withKafkaKey('kafka-key')
->withHeaders(['header-key' => 'header-value']);
$producer->send();
Consuming Kafka Messages
If your application needs to read messages from a Kafka topic, you must create a consumer object, subscribe to the appropriate topic and start receiving messages.
To create a consumer using this package, you can use the createConsumer
method, on Kafka facade:
use Junges\Kafka\Facades\Kafka;
$consumer = Kafka::createConsumer('broker');
This method returns a Junges\Kafka\Consumers\ConsumerBuilder::class
instance, and you can use it to configure your consumer.
Subscribing to a topic
With a consumer created, you can subscribe to a kafka topic using the subscribe
method:
use Junges\Kafka\Facades\Kafka;
$consumer = Kafka::createConsumer('broker')->subscribe('topic');
Of course, you can subscribe to more than one topic at once, either using an array of topics or specifying one by one:
use Junges\Kafka\Facades\Kafka;
$consumer = Kafka::createConsumer('broker')->subscribe('topic-1', 'topic-2', 'topic-n');
// Or, using array:
$consumer = Kafka::createConsumer('broker')->subscribe([
'topic-1',
'topic-2',
'topic-n'
]);
Configuring consumer groups
Kafka consumers belonging to the same consumer group share a group id. THe consumers in a group divides the topic partitions as fairly amongst themselves as possible by establishing that each partition is only consumed by a single consumer from the group.
To attach your consumer to a consumer group, you can use the method withConsumerGroupId
to specify the consumer group id:
use Junges\Kafka\Facades\Kafka;
$consumer = Kafka::createConsumer('broker')->withConsumerGroupId('foo');
Configuring message handlers
Now that you have created your kafka consumer, you must create a handler for the messages this consumer receives. By default, a consumer is any callable
. You can use an invokable class or a simple callback. Use the withHandler
method to specify your handler:
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer('broker');
// Using callback:
$consumer->withHandler(function(\RdKafka\Message $message) {
// Handle your message here
});
Or, using a invokable class:
class Handler
{
public function __invoke(\RdKafka\Message $message){
// Handle your message here
}
}
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer('broker')->withHandler(Handler::class)
Configuring max messages to be consumed
If you want to consume a limited amount of messages, you can use the withMaxMessages
method to set the max number of messages to be consumed by a kafka consumer:
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer('broker')->withMaxMessages(2);
Configuring a dead letter queue
In kafka, a Dead Letter Queue (or DLQ), is a simple kafka topic in the kafka cluster which acts as the destination for messages that were not able to make it to the desired destination due to some error.
To create a dlq
in this package, you can use the withDlq
method. If you don't specify the DLQ topic name, it will be created based on the topic you are consuming, adding the -dlq
suffix to the topic name.
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer('broker')->withDlq();
//Or, specifying the dlq topic name:
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer('broker')->withDlq('your-dlq-topic-name')
Using SASL
SASL allows your producers and your consumers to authenticate to your Kafka cluster, which verifies their identity. It's also a secure way to enable your clients to endorse an identity. To provide SASL configuration, you can use the withSasl
method, passing a Junges\Kafka\Config\Sasl
instance as the argument:
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer('broker')
->withSasl(new \Junges\Kafka\Config\Sasl(
password: 'password',
username: 'username'
mechanisms: 'authentication mechanism'
));
Using middlewares
Middlewares provides a convenient way to filter and inspecting your Kafka messages. To write a middleware in this package, you can use the withMiddleware
method. The middleware is a callable in which the first argument is the message itself and the second one is the next handler. The middlewares get executed in the order they are defined,
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer('broker')
->withMiddleware(function($message, callable $next) {
// Perform some work here
return $next($message);
});
Using auto commit
The auto-commit check is called in every poll and it checks that the time elapsed is greater than the configured time. To enable auto commit, use the withAutoCommit
method:
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer('broker')->withAutoCommit();
Setting Kafka configuration options
To set configuration options, you can use two methods: withOptions
, passing an array of option and option value or, using the `withOption method and passing two arguments, the option name and the option value.
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer('broker')
->withOptions([
'option-name' => 'option-value'
]);
// Or:
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer('broker')
->withOption('option-name', 'option-value');
Building the consumer
When you have finished configuring your consumer, you must call the build
method, which returns a Junges\Kafka\Consumers\Consumer
instance.
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer('broker')
// Configure your consumer here
->build();
Consuming the kafka messages
After building the consumer, you must call the consume
method to consume the messages:
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->build();
$consumer->consume();
Kafka::fake()
Using When testing your application, you may wish to "mock" certain aspects of the app, so they are not actually executed during a given test. This package provides convenient helpers for mocking the kafka producer out of the box. These helpers primarily provide a convenience layer over Mockery so you don't have to manually make complicated Mockery method calls.
The Kafka facade also provides methods to perform assertions over published messages, such as assertPublished
, assertPublishedOn
and assertNothingPublished
.
use Junges\Kafka\Facades\Kafka;
use PHPUnit\Framework\TestCase;
class MyTest extends TestCase
{
public function testMyAwesomeApp()
{
Kafka::fake();
$producer = Kafka::publishOn('broker', 'topic')
->withHeaders(['key' => 'value'])
->withMessageKey('foo', 'bar');
$producer->send();
Kafka::assertPublished($producer->getMessage());
}
}
If you want to assert that a message was published in a specific kafka topic, you can use the assertPublishedOn
method:
use PHPUnit\Framework\TestCase;
use Junges\Kafka\Facades\Kafka;
class MyTest extends TestCase
{
public function testWithSpecificTopic()
{
Kafka::fake();
$producer = Kafka::publishOn('broker', 'some-kafka-topic')
->withHeaders(['key' => 'value'])
->withMessageKey('key', 'value');
$producer->send();
Kafka::assertPublishedOn('some-kafka-topic', $producer->getMessage());
}
}
You can also use a callback function to perform assertions within the message using a callback in which the argument is the published message itself.
use PHPUnit\Framework\TestCase;
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message;
class MyTest extends TestCase
{
public function testWithSpecificTopic()
{
Kafka::fake();
$producer = Kafka::publishOn('broker', 'some-kafka-topic')
->withHeaders(['key' => 'value'])
->withMessageKey('key', 'value');
$producer->send();
Kafka::assertPublishedOn('some-kafka-topic', $producer->getMessage(), function(Message $message) {
return $message->getHeaders()['key'] === 'value';
});
}
}
You can also assert that nothing was published at all, using the assertNothingPublished
:
use PHPUnit\Framework\TestCase;
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message;
class MyTest extends TestCase
{
public function testWithSpecificTopic()
{
Kafka::fake();
if (false) {
$producer = Kafka::publishOn('broker', 'some-kafka-topic')
->withHeaders(['key' => 'value'])
->withMessageKey('key', 'value');
$producer->send();
}
Kafka::assertNothingPublished();
}
}
Testing
Run composer test
to test this package.
Contributing
Thank you for considering contributing for the Laravel ACL package! The contribution guide can be found here.
Credits
License
The Laravel Kafka package is open-sourced software licenced under the MIT License. Please see the License File for more information.