A PHP implementation of RabbitMQ Client for webman plugin.

Overview

workbunny

workbunny/webman-rabbitmq

🐇 A PHP implementation of RabbitMQ Client for webman plugin. 🐇

A PHP implementation of RabbitMQ Client for webman plugin

Latest Stable Version Total Downloads Latest Unstable Version License PHP Version Require

常见问题

  1. 什么时候使用消息队列?

    当你需要对系统进行解耦、削峰、异步的时候;如发送短信验证码、秒杀活动、资产的异步分账清算等。

  2. RabbitMQ和Redis的区别?

    Redis中的Stream的特性同样适用于消息队列,并且也包含了比较完善的ACK机制,但在一些点上与RabbitMQ存在不同:

    • Redis Stream没有完善的后台管理;RabbitMQ拥有较为完善的后台管理及Api;
    • Redis的持久化策略取舍:默认的RDB策略极端情况下存在丢失数据,AOF策略则需要牺牲一些性能;RabbitMQ持久化方案更多,可对消息持久化也可对队列持久化;
    • RabbitMQ拥有更多的插件可以提供更完善的协议支持及功能支持;
  3. 什么时候使用Redis?什么时候使用RabbitMQ?

    当你的队列使用比较单一或者比较轻量的时候,请选用 Redis Stream;当你需要一个比较完整的消息队列体系,包括需要利用交换机来绑定不同队列做一些比较复杂的消息任务的时候,请选择RabbitMQ;

    当然,如果你的队列使用也比较单一,但你需要用到一些管理后台相关系统化的功能的时候,又不想花费太多时间去开发的时候,也可以使用RabbitMQ;因为RabbitMQ提供了一整套后台管理的体系及 HTTP API 供开发者兼容到自己的管理后台中,不需要再消耗多余的时间去开发功能;

    注:这里的 轻量 指的是 无须将应用中的队列服务独立化,该队列服务是该应用独享的

简介

RabbitMQ的webman客户端插件;

异步无阻塞消费、异步无阻塞生产、同步阻塞生产;

简单易用高效,可以轻易的实现master/worker的队列模式(一个队列多个消费者);

支持延迟队列;

安装

composer require workbunny/webman-rabbitmq

配置

<?php
return [
    'enable' => true,

    'host'               => '127.0.0.1',
    'vhost'              => '/',
    'port'               => 5672,
    'username'           => 'guest',
    'password'           => 'guest',
    'mechanism'          => 'AMQPLAIN', # 阿里云等云服务使用 PLAIN
    'timeout'            => 10,
    'heartbeat'          => 50,
    'heartbeat_callback' => function(){ # 心跳回调
    },
    'error_callback'     => function(Throwable $throwable){ # 异常回调
    }
];

使用

创建Builder

  • 创建一个消费者进程数量为1的普通队列:(在项目根目录执行)
./webman workbunny:rabbitmq-builder test 1
  • 创建一个消费者进程数量为1的延迟队列:(在项目根目录执行)
./webman workbunny:rabbitmq-builder test 1 -d
	
#
	
./webman workbunny:rabbitmq-builder test 1 --delayed
  • 命令支持二级菜单
# 在 process/workbunny/rabbitmq/project 目录下创建 TestBuilder.php
./webman workbunny:rabbitmq-builder project/test 1

# 延迟同理

注:延迟队列需要为 rabbitMQ 安装 rabbitmq_delayed_message_exchange 插件

  1. 进入 rabbitMQ 的 plugins 目录下执行命令下载插件(以rabbitMQ 3.8.x举例):
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.17/rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez
  1. 执行安装命令
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

说明:

  • Builder 可以理解为类似 ORMModel,创建一个 Builder 就对应了一个队列;使用该 Builder 对象进行 publish() 时,会向该队列投放消息;创建多少个 Builder 就相当于创建了多少条队列;

  • 命令结构:

workbunny:rabbitmq-builder [-d|--delayed] [--] <name> <count>

# 【必填】 name:Builder名称
# 【必填】count:启动的消费者进程数量
# 【选填】-d/--delayed:是否是延迟队列
  • 在项目根目录下命令会在 process/workbunny/rabbitmq 路径下创建一个Builder,并且将该Builder自动加入 config/plugin/workbunny/webman-rabbitmq/process.php 配置中作为自定义进程启动;(如不需要自动加载消费者进程,请自行注释该配置)

  • 消费是异步的,不会阻塞当前进程,不会影响 webman/workermanstatus

  • Builder文件结构入下,可自行调整类属性:

<?php
declare(strict_types=1);

namespace process\workbunny\rabbitmq;

use Bunny\Channel as BunnyChannel;
use Bunny\Async\Client as BunnyClient;
use Bunny\Message as BunnyMessage;
use Workbunny\WebmanRabbitMQ\Constants;
use Workbunny\WebmanRabbitMQ\FastBuilder;

class TestBuilder extends FastBuilder
{
   	// QOS 大小
   	protected int $prefetch_size = 0;
   	// QOS 数量
   	protected int $prefetch_count = 0;
   	// QOS 是否全局
   	protected bool $is_global = false;
   	// 是否延迟队列
   	protected bool $delayed = false;
   	// 消费回调
   	public function handler(BunnyMessage $message, BunnyChannel $channel, BunnyClient $client): string
   	{
       	// TODO 消费需要的回调逻辑
       	var_dump('请重写 TestBuilderDelayed::handler() ');
       	return Constants::ACK;
       	# Constants::NACK
       	# Constants::REQUEUE
   	}
}

移除Builder

  • 移除名为 test 的普通队列:(在项目根目录执行)
./webman workbunny:rabbitmq-remove test
  • 移除名为 test 的延迟队列:(在项目根目录执行)
./webman workbunny:rabbitmq-remove test -d
#
./webman workbunny:rabbitmq-remove test --delayed
  • 仅关闭名为 test 的普通队列:(在项目根目录执行)
./webman workbunny:rabbitmq-remove test -c
#
./webman workbunny:rabbitmq-remove test --close

查看Builder

./webman workbunny:rabbitmq-list

注:当 Builder 未启动时,handler 与 count 显示为 --

+----------+---------------------------------------------------------------------------+-------------------------------------------------+-------+
| name     | file                                                                      | handler                                         | count |
+----------+---------------------------------------------------------------------------+-------------------------------------------------+-------+
| test     | /var/www/your-project/process/workbunny/rabbitmq/TestBuilder.php          | process\workbunny\rabbitmq\TestBuilder          | 1     |
| test -d  | /var/www/your-project/process/workbunny/rabbitmq/TestBuilderDelayed.php   | process\workbunny\rabbitmq\TestBuilderDelayed   | 1     |
+----------+---------------------------------------------------------------------------+-------------------------------------------------+-------+

生产

  • 每个builder各包含一个连接,使用多个builder会创建多个连接

  • 生产消息默认不关闭当前连接

  • 异步生产的连接与消费者共用

1. 同步发布消息

该方法会阻塞等待至消息生产成功,返回bool

  • 发布普通消息

注:向延迟队列发布普通消息会抛出一个 WebmanRabbitMQException 异常

use function Workbunny\WebmanRabbitMQ\sync_publish;
use process\workbunny\rabbitmq\TestBuilder;

sync_publish(TestBuilder::instance(), 'abc'); # return bool
  • 发布延迟消息

注:向普通队列发布延迟消息会抛出一个 WebmanRabbitMQException 异常

use function Workbunny\WebmanRabbitMQ\sync_publish;
use process\workbunny\rabbitmq\TestBuilder;

sync_publish(TestBuilder::instance(), 'abc', [
	'x-delay' => 10000, # 延迟10秒
]); # return bool

2. 异步发布消息

该方法不会阻塞等待,立即返回 React\Promise, 可以利用 React\Promise 进行 wait; 也可以纯异步不等待,React\Promise 项目地址

  • 发布普通消息

注:向延迟队列发布普通消息会抛出一个 WebmanRabbitMQException 异常

use function Workbunny\WebmanRabbitMQ\async_publish;
use process\workbunny\rabbitmq\TestBuilder;

async_publish(TestBuilder::instance(), 'abc'); # return PromiseInterface|bool
  • 发布延迟消息

注:向普通队列发布延迟消息会抛出一个 WebmanRabbitMQException 异常

use function Workbunny\WebmanRabbitMQ\async_publish;
use process\workbunny\rabbitmq\TestBuilder;

async_publish(TestBuilder::instance(), 'abc', [
	'x-delay' => 10000, # 延迟10秒
]); # return PromiseInterface|bool

说明

  • 生产可用,欢迎 issue 和 PR
  • Message 可以理解为队列、交换机的配置信息;
  • 继承实现 AbstractMessage 可以自定义Message;
  • Builder 可通过 Builder->setMessage() 可设置自定义配置;
  • 可使用 SyncClientAsyncClient 自行实现一些自定义消费/自定义生产的功能;
You might also like...
Php-rpc-server - JSON RPC server implementation for PHP.

JSON RPC Server implementation for PHP. The json-rpc is a very simple protocol. You can see this by reading the protocol specification. This library i

A pure PHP implementation of the open Language Server Protocol. Provides static code analysis for PHP for any IDE.
A pure PHP implementation of the open Language Server Protocol. Provides static code analysis for PHP for any IDE.

A pure PHP implementation of the open Language Server Protocol. Provides static code analysis for PHP for any IDE.

PHP implementation of circuit breaker pattern.

What is php-circuit-breaker A component helping you gracefully handle outages and timeouts of external services (usually remote, 3rd party services).

Implementation of the Token Bucket algorithm in PHP.

Token Bucket This is a threadsafe implementation of the Token Bucket algorithm in PHP. You can use a token bucket to limit an usage rate for a resourc

A PHP implementation of the Unleash protocol aka Feature Flags in GitLab.

A PHP implementation of the Unleash protocol aka Feature Flags in GitLab. This implementation conforms to the official Unleash standards and implement

An implementation of the Minecraft: Bedrock Edition protocol in PHP

BedrockProtocol An implementation of the Minecraft: Bedrock Edition protocol in PHP This library implements all of the packets in the Minecraft: Bedro

PHP Implementation of PASERK

PASERK (PHP) Platform Agnostic SERialized Keys. Requires PHP 7.1 or newer. PASERK Specification The PASERK Specification can be found in this reposito

A minimalistic implementation of Promises for PHP

libPromise A minimalistic implementation of Promises for PHP. Installation via DEVirion Install the DEVirion plugin and start your server. This will c

PHP's Promse implementation depends on the Swoole module.

php-promise-swoole PHP's Promse implementation depends on the Swoole module. Promise::allsettled([ /** Timer 调用 */ /** Timer call */

Comments
  • 关于消费队列退出优雅关闭连接

    关于消费队列退出优雅关闭连接

    在本地启了一个docker运行rabbitmq,通过实时跟踪日志查看到,当webman退出时,调用消费队列进程的onWorkerStop方法时,虽然FasterBuilder中针对连接处理$this->_connection->close();,但发现rabbitmq这边有warning日志:

    2022-09-04 02:55:53.458 [warning] <0.20366.1> closing AMQP connection <0.20366.1> (172.19.0.1:60280 -> 172.19.0.2:5672, vhost: '/', user: 'rabbitmq'):
    client unexpectedly closed TCP connection
    

    同时发现如果在发布消息时,通过调用helpers的async_publish方法,设置close参数为true时,连接是正常关闭的:

    2022-09-04 03:02:36.367 [info] <0.20824.1> connection <0.20824.1> (172.19.0.1:60316 -> 172.19.0.2:5672): user 'rabbitmq' authenticated and granted access to vhost '/'
    ******
    2022-09-04 03:02:36.421 [info] <0.20824.1> closing AMQP connection <0.20824.1> (172.19.0.1:60316 -> 172.19.0.2:5672, vhost: '/', user: 'rabbitmq')
    
    opened by jeyfang123 3
  • 关于消息发送和接收的稳定性优化

    关于消息发送和接收的稳定性优化

    推送:sync_publish(Builder::instance(), $params, null, true); 说明文档中,最后的close 参数,最好默认是true。 如果该方法应用在 controller 部分,这个会因为没有固定的 心跳监听而造成大量的发布失败。 虽然每次都会重新创建连接,那也是稳定和高效的。

    process 里的消息接收。建议加入重连检查。有时候网络断开造成的消费者丢失没有重连。 或者搞个重连开关。

    ` public function onWorkerStart(Worker $worker): void { parent::onWorkerStart($worker);

        Timer::add(10, function () {
            $this->checkConnection();
        });
    
    }
    
    private function checkConnection() {
        try {
            if($this->connection()->client()->isConnected() == false) {
                $this->logger->debug('Reconnect');
                $this->connection()->consume($this->getMessage());
            }else {
                $this->logger->debug('Connection is OK');
            }
        } catch (\Exception $e) {
            $this->logger->error($e->getMessage());
        }
    }
    

    `

    opened by billlv 1
  • 关于helpers提供的消息发布方法

    关于helpers提供的消息发布方法

    最近在看rabbitmq相关的组件,在尝试使用这个组件时,关于helpers下的两个方法有个疑问,如下

     * 同步生产
     * @param FastBuilder $builder
     * @param string $body
     * @param array|null $headers
     * @param bool $close
     * @return bool
     */
    function sync_publish(FastBuilder $builder, string $body, ?array $headers = null, bool $close = false) : bool
    {
        $message = $builder->getMessage();
        if(
            ($message->getExchangeType() !== Constants::DELAYED and $headers['x-delay'] ?? 1) or
            ($message->getExchangeType() === Constants::DELAYED and !($headers['x-delay'] ?? 0))
        ){
            throw new WebmanRabbitMQException('Invalid publish. ');
        }
        $message->setBody($body);
        if($headers !== null){
            $message->setHeaders(array_merge($message->getHeaders(), $headers));
        }
        return $builder->syncConnection()->publish($message, $close);
    }
    

    若这里$builder是普通队列,则$message的ExchangeType是Constants::DIRECT,当$headers不传递时,默认为null,那这里不是始终抛WebmanRabbitMQException了?

    opened by jeyfang123 1
Releases(1.0.9)
Owner
workbunny
An open source group that has nothing to do.
workbunny
The most widely used PHP client for RabbitMQ

php-amqplib This library is a pure PHP implementation of the AMQP 0-9-1 protocol. It's been tested against RabbitMQ. The library was used for the PHP

php-amqplib 4.2k Jan 3, 2023
A lightweight queue based on Redis Stream for webman plugin.

workbunny/webman-rqueue ?? A lightweight queue based on Redis Stream for webman plugin. ?? A lightweight queue based on Redis Stream for webman plugin

workbunny 10 Dec 12, 2022
Webman quickly creates a verification code tool similar to Google verification code

webman-captcha-grid webman quickly creates a verification code tool similar to Google verification code webman 快速创建一个类似于 Google 点图验证码的本地验证码扩展 介绍 webma

听风吹雨 6 Dec 5, 2022
Very easy to use a current limiting component, the code is very simple, based on the webman framework.

Very easy to use a current limiting component, the code is very simple, based on the webman framework.

nsp-team 13 Dec 29, 2022
Laravel illuminate/filesystem for webman

webman-tech/laravel-filesystem Laravel illuminate/filesystem for webman 介绍 站在巨人(laravel)的肩膀上使文件存储使用更加可靠和便捷 所有方法和配置与 laravel 几乎一模一样,因此使用方式完全参考 Laravel文

null 5 Dec 15, 2022
PHP Library that implements several messaging patterns for RabbitMQ

Thumper Thumper is a PHP library that aims to abstract several messaging patterns that can be implemented over RabbitMQ. Inside the examples folder yo

php-amqplib 276 Nov 20, 2022
This composer plugin is a temporary implementation of using symbolic links to local packages as dependencies to allow a parallel work process

Composer symlinker A Composer plugin to install packages as local symbolic links. This plugin is a temporary implementation of using symbolic links to

Pierre Cassat 18 Nov 9, 2021
A plugin manager for PocketMine-MP downloads plugin from PocketMine-MP official plugin repository

oh-my-pmmp A plugin manager for PocketMine-MP Getting Started Prerequisites Your server MUST RUN the latest version of PocketMine. Installation From P

thebigcrafter 6 Jan 4, 2023
Integrates the ClassicPress Plugin Directory and any plugin stored in GitHub (tagged with classicpress-plugin) in the ClassicPress Admin

ClassicPress Plugin Directory Adds a new screen in a ClassicPress Install where you can browse, install, activate, deactivate, update, delete and pagi

TukuToi 3 Dec 27, 2022
A pure PHP implementation of the MessagePack serialization format / msgpack.org[PHP]

msgpack.php A pure PHP implementation of the MessagePack serialization format. Features Fully compliant with the latest MessagePack specification, inc

Eugene Leonovich 368 Dec 19, 2022