基于 swoole 的多进程队列系统,低延时(最低毫秒级)、低资源占用, 支持一键化协程、超时控制、失败重试。可与 laravel thinkphp 等框架配合使用

Overview

multi-process-queue

基于swoole的多进程队列系统,manage进程管理子进程,master进程监听队列分发任务,worker进程执行任务, 多进程、低延时(最低毫秒级)、低资源占用。可与 laravel thinkphp 等框架配合使用

版本要求:

  • php>=7.1
  • swoole >= 4.4

当前支持的驱动

  • redis (redis版本需大于3.0.2)

特性

  • 最低毫秒级任务延时
  • 自定义重试次数和错误回调
  • 自定义超时时间和超时回调
  • 自定义启动预执行函数,方便初始化其他框架与其他项目配合使用
  • master协程监听队列降低延时
  • 多worker进程消费任务
  • worker进程支持一键化协程协程
  • 支持后台守护运行,无需其余进程管理工具
  • 支持分布式部署

进程结构图

配置说明

配置项 类型 是否必填 默认值 说明
basics array 基础配置项
basics.name string 当前队列服务名称,多个服务同时启动时需要分别设置名字
basics.pid_path string /tmp 主进程pid文件存放路径
basics.driver string 队列驱动必须是MPQueue\Queue\Driver\DriverInterface的实现类
worker_start_handle callable 空字符串 worker进程启动后会调用(当前服务所有队列有效)
log array 日志配置
log.path string /tmp 日志存放路径
log.level int Monolog\Logger::INFO Monolog\Logger::DEBUG/Monolog\Logger::INFO
log.dirver string/class RotatingFileLogDriver 日志驱动,必须是MPQueue\Log\Driver\LogDriverInterface的实现
queue 二维数组 队列配置
queue[0].name string 队列名称
queue[0].worker_number int 3 工作进程数量
queue[0].memory_limit int 128 工作进程最大使用内存数(单位mb)(0无限制)
queue[0].sleep_seconds floot 1 监视进程休眠时间(秒,最小到0.001)
queue[0].timeout int 120 超时时间(s)以投递任务方为准
queue[0].fail_number int 3 最大失败次数以投递任务方为准
queue[0].fail_expire int 3 失败延时投递时间(s)以投递任务方为准
queue[0].timeout_handle callable 任务超时执行函数
queue[0].fail_handle callable 任务失败执行函数
queue[0].worker_start_handle callable worker进程启动加载函数(当前队列有效)

timeout_handle 会传入一个参数 $info 任务详细信息

fail_handle 会传入两个参数 $info 任务详细信息、$e出错的异常类

注意:任务超时后会触发timeout_handle会直接记录为失败,不会根据fail_number进行失败重试,也不会触发fail_handle。

配置示例

[
    'basics'=>[
        'name'=>'mp-queue-1',//多个服务器同时启动时需要分别设置名字
        'driver'=> new \MPQueue\Queue\Driver\Redis('127.0.0.1'),
    ],
    'queue' => [
        [
            'name' => 'test',//队列名称
            'timeout_handle'=>function(){
                var_dump('超时了');
             },//超时后触发函数
            'fail_handle'=>function(){
                var_dump('失败了');
            },//失败执行函数
        ],
        [
            'name' => 'test2',//队列名称
            'worker_number' => 4,//当前队列工作进程数量
            'memory_limit' => 0, //当前队列工作进程的最大使用内存,超出则重启。单位 MB
        ]
    ]
]

快速上手

1.安装

  • composer 安装
 composer require yuntian001/multi-process-queue
  • 或者直接下载压缩包 然后执行composer install

2. 启动队列

  • 新建 main.php
<?php
define('MP_QUEUE_CLI', true);
use MPQueue\Config\Config;
require_once __DIR__ . '/vendor/autoload.php';
$config = [
   'basics' => [
       'name' => 'mp-queue-1',//多个服务器同时启动时需要分别设置名字
       'driver' => new \MPQueue\Queue\Driver\Redis('127.0.0.1'),
   ],
   'queue' => [
       [
           'name' => 'test',//队列名称
           'timeout_handle' => function () {
               var_dump('超时了');
           },//超时后触发函数
           'fail_handle' => function () {
               var_dump('失败了');
           },//失败执行函数
       ],
       [
           'name' => 'test2',//队列名称
           'worker_number' => 4,//当前队列工作进程数量
           'memory_limit' => 0, //当前队列工作进程的最大使用内存,超出则重启。单位 MB
       ]
   ]
];
Config::set($config);
(new \MPQueue\Console\Application())->run();
  • 执行命令 worker:start 启动队列
   php master.php worker:start
  • 或者后台运行
  php master.php worker:start -d
  • 支持的命令(list命令可查看)
 php master.php list
  queue:clean 清空队列内容 --queue test 清空指定队列:test
  queue:status 查看队列信息 --queue test 指定队列:test
  queue:failed 打印失败信息详情 必须指定队列 --queue test 指定队列:test
  worker:start 启动 携带参数-d 后台启动
  worker:stop 停止
  worker:restart 重启 携带参数-d 后台启动
  worker:reload 平滑重启
  worker:status 查看进程运行状态

3. 投递任务

  • 执行以下代码投递任务(可web调用,调用进程无需加载swoole扩展)
    require_once __DIR__ . '/vendor/autoload.php';//require composer 加载文件 如果已加载则无需重复require
    $config = [
        'basics' => [
            'name' => 'mp-queue-1',//多个服务器同时启动时需要分别设置名字
            'driver' => new \MPQueue\Queue\Driver\Redis('127.0.0.1'),
        ],
        'queue' => [
            [
                'name' => 'test',//队列名称
                'timeout_handle' => function () {
                    var_dump('超时了');
                },//超时后触发函数
                'fail_handle' => function () {
                    var_dump('失败了');
                },//失败执行函数
            ],
            [
                'name' => 'test2',//队列名称
                'worker_number' => 4,//当前队列工作进程数量
                'memory_limit' => 0, //当前队列工作进程的最大使用内存,超出则重启。单位 MB
            ]
        ]
    ];
     Config::set($config);
     $job = function(){
        var_dump('hello wordQ!');
     }
     MPQueue\Queue\Queue::push('test', $job,10);

MPQueue\Queue\Queue::push 接收三个参数:

  • $queue 队列名称

  • $job 投递的任务

    $job 允许的类型为callable 或 \MPQueue\Job的子类 具体测试可参考test文件夹

    $job为匿名函数时队列执行进程 和 投递任务进程无需再同一项目下。

    $job为静态方法/\MPQueue\Job的实现类/函数时 队列执行进程需要含有对应静态方法/\MPQueue\Job的实现类/函数。

    强烈建议投递$job使用\MPQueue\Job子类,子类中可自定义 超时时间、允许失败次数、延时重试时间、超时执行函数、失败执行函数。具体参数说明请进入src/Job.php进行查看

  • $delay 延时投递时间(s) 默认为0(立即投递)

任务投递示例,请参考test和test/Job文件夹下的normal.php、failed.php、timeout.php、pressure.php

注意事项

  • worker:reload会让worker进程执行完当前任务后进程重启,worker:restart会暴力重启manage、master、worker进程,可能会造成任务执行一半被断掉。
  • 因进程是在启动后直接加载到内存中的,更改代码后不会立即生效 需要执行worker:reload 或者 worker:start使其生效。
  • worker:reload 只会重启worker进程,不会重新加载配置文件,更改配置文件后需要worker:restart后才有效。

在laravel中使用

  • 安装
  composer require yuntian001/multi-process-queue
  • 在config文件夹中建立配置文件mp-queue.php
<?php
//当前仅为示例,具体配置可按文档自定义
return [
    'basics'=>[
        'name'=>'mp-queue-1',//多个服务器同时启动时需要分别设置名字
        'driver'=> new \MPQueue\Queue\Driver\Redis('127.0.0.1'),
        'worker_start_handle'=>function(){
            //加载laravel核心程序
            defined('LARAVEL_START') || define('LARAVEL_START', microtime(true));
            $app = require_once __DIR__.'/../bootstrap/app.php';
            $app->make(Illuminate\Contracts\Console\Kernel::class)->bootstrap();
        }
    ],
    'queue' => [
        [
            'name' => 'test',//队列名称
            'timeout_handle'=>function($info){
                //自定义逻辑如存储到mysql
             },//超时后触发函数
            'fail_handle'=>function($info,$e){
                //自定义逻辑如存储到mysql
            },//失败执行函数
        ],
        [
            'name' => 'test2',//队列名称
            'worker_number' => 4,//当前队列工作进程数量
            'memory_limit' => 0, //当前队列工作进程的最大使用内存,超出则重启。单位 MB
        ]
    ],
    'log' => [
        'path' => __DIR__.'/../storage/logs',//日志存放目录需要可写权限
    ]
];

  • 在项目根目录建立启动文件 mp-queue
#!/usr/bin/env php
<?php
define('MP_QUEUE_CLI', true);
use MPQueue\Config\Config;

require __DIR__.'/vendor/autoload.php';
Config::set(include(__DIR__.'/config/mp-queue.php'));
(new \MPQueue\Console\Application())->run();
  • 创建job类 App\Job\HelloWord
<?php
//位置位于app/Job/HelloWord.php
namespace App\Job;

use Illuminate\Support\Facades\Log;
use MPQueue\Job;

Class HelloWord extends Job{
    //handle内可调用laravel方法和函数,但handle函数不支持依赖注入传参
    public function handle()
    {
        var_dump('hello word!');
        Log::info('hello word!');
    }
}

  • 后台启动队列(或不加-d直接启动)
 php mp-queue worker:start -d
  • 投递任务 在任意位置(如控制器)加入以下代码进行投递
    \MPQueue\Config\Config::set(config('mp-queue'));//配置项设置也可放在app容器中AppServiceProvider boot() 统一加载
    \MPQueue\Queue\Queue::push('test',\App\Job\HelloWord::class);

在thinkphp5.0中使用

  • 安装
  composer require yuntian001/multi-process-queue
  • 在application/extra文件夹中建立配置文件mp-queue.php
<?php
//当前仅为示例,具体配置可按文档自定义
return [
    'basics'=>[
        'name'=>'mp-queue-1',//多个服务器同时启动时需要分别设置名字
        'driver'=> new \MPQueue\Queue\Driver\Redis('127.0.0.1'),
        'worker_start_handle'=>function(){
            //加载thinkphp核心应用
            \think\App::initCommon();
        }
    ],
    'queue' => [
        [
            'name' => 'test',//队列名称
            'timeout_handle'=>function($info){
                //自定义逻辑如存储到mysql
            },//超时后触发函数
            'fail_handle'=>function($info,$e){
                //自定义逻辑如存储到mysql
            },//失败执行函数
        ],
        [
            'name' => 'test2',//队列名称
            'worker_number' => 4,//当前队列工作进程数量
            'memory_limit' => 0, //当前队列工作进程的最大使用内存,超出则重启。单位 MB
        ]
    ],
    'log' => [
        'path' => __DIR__.'/../../runtime',//日志存放目录需要可写权限
    ]
];

  • 在项目根目录建立启动文件mp-queue
#!/usr/bin/env php
<?php
define('MP_QUEUE_CLI', true);
define('APP_PATH', __DIR__ . '/application/');
// ThinkPHP 基础文件
require __DIR__ . '/thinkphp/base.php';
\MPQueue\Config\Config::set(include(__DIR__.'/application/extra/mp-queue.php'));
(new \MPQueue\Console\Application())->run();
  • 创建job类 app\job\HelloWord
<?php
//application/job/HelloWord.php
namespace app\job;
use MPQueue\Job;
use think\Log;

Class HelloWord extends Job{
    //handle内可调用thinkphp方法和函数,但handle函数不支持依赖注入传参
    public function handle()
    {
        var_dump('hello word!');
        Log::info('hello word!');
    }
}
  • 后台启动队列(或不加-d直接启动)
 php mp-queue worker:start -d
  • 投递任务 在任意位置(如控制器)加入以下代码进行投递
    \MPQueue\Config\Config::set(config('mp-queue'));//配置项设置也可注册在初始化化行为(app_init)中 统一加载
    \MPQueue\Queue\Queue::push('test',\app\job\HelloWord::class);
You might also like...
🚀 Developing Rocketseat's Next Level Week (NLW#05) Application using PHP/Swoole + Hyperf

Inmana PHP 🚀 Developing Rocketseat 's Next Level Week (NLW#05) Application using Swoole + Hyperf. This is the app of the Elixir track. I know PHP/Swo

Simple live support server with PHP Swoole Websocket and Telegram API

Telgraf Simple live support server with PHP Swoole Websocket and Telegram API. Usage Server Setup Clone repository with following command. git clone h

Library for Open Swoole extension

Open Swoole Library This library works with Open Swoole since release version v4.7.1. WIP Table of Contents How to Contribute Code Requirements Develo

Hyperf instant messaging program based on swoole framework
Hyperf instant messaging program based on swoole framework

Hyperf instant messaging program based on swoole framework

Redis watcher for PHP-Casbin in Swoole.

Redis watcher for PHP-Casbin in Swoole Redis watcher for PHP-Casbin in Swoole , Casbin is a powerful and efficient open-source access control library.

Socks5 proxy server written in Swoole PHP

php-socks This is a Socks5 proxy server implementation built with PHP & Swoole. To start the proxy server, clone this repo, run composer install to in

DoraRPC is an RPC For the PHP MicroService by The Swoole

Dora RPC 简介(Introduction) Dora RPC 是一款基础于Swoole定长包头通讯协议的最精简的RPC, 用于复杂项目前后端分离,分离后项目都通过API工作可更好的跟踪、升级、维护及管理。 问题提交: Issue For complex projects separation

Hprose asynchronous client & standalone server based on swoole
Hprose asynchronous client & standalone server based on swoole

Hprose for Swoole Introduction Hprose is a High Performance Remote Object Service Engine. It is a modern, lightweight, cross-language, cross-platform,

PHP Kafka client is used in PHP-FPM and Swoole. PHP Kafka client supports 50 APIs, which might be one that supports the most message types ever.

longlang/phpkafka Introduction English | 简体中文 PHP Kafka client is used in PHP-FPM and Swoole. The communication protocol is based on the JSON file in

Releases(2.1.3)
  • 2.1.3(Jan 3, 2023)

  • 2.1.2(Jan 3, 2023)

    • 修复任务中投递任务会更改worker进程所在队列bug
    • 修复redis异常监听bug
    • 修复master进程于worker进程异常断开bug
    Source code(tar.gz)
    Source code(zip)
  • 2.1.1(Nov 1, 2022)

  • 2.1.0(Apr 8, 2022)

    1. 加上redis连接异常后进行轮询重连
    2. 修复反序列化失败bug
    3. 修复超时失败任务调用job回调异常bug
    4. 修复错误回调执行后进程状态不会滚bug
    5. 失败重试次数优化
    6. 优化cpu资源占用到1%以下(基于2核4G)
    7. 进程退出加上 退出状态码,日志加上状态码返回
    Source code(tar.gz)
    Source code(zip)
  • 2.0.0(Apr 8, 2022)

    1.取消超时回调,超时后记录为失败,触发失败回调。 2.由原来的每次失败都触发失败回调改为,超时或达到失败重试次数后才触发。 3.增加抢占模式:此模式下worker进程会主动抢任务会增加数据库查询和连接数量(1.0版本的模式为分发模式) 4.增加高可用性,超时时间适用于失败回调,如果进程在失败回调执行期间挂掉,其余进程会在超时时间到达后再次执行失败回调。 5.优化分发模式性能,按当前空闲进程数一次获取多个任务进行分发,qps大大提高。

    Source code(tar.gz)
    Source code(zip)
  • v1.0.1(Apr 28, 2021)

  • v1.0.0(Apr 21, 2021)

Owner
yuntian
yuntian
Multi-process coroutine edition Swoole spider !! Learn about Swoole's network programming and the use of its related APIs

swoole_spider php bin/spider // Just do it !! Cache use Swoole\Table; use App\Table\Cache; $table = new Table(1<<20); // capacity size $table->column

null 3 Apr 22, 2021
💫 Vega is a CLI mode HTTP web framework written in PHP support Swoole, WorkerMan / Vega 是一个用 PHP 编写的 CLI 模式 HTTP 网络框架,支持 Swoole、WorkerMan

Mix Vega 中文 | English Vega is a CLI mode HTTP web framework written in PHP support Swoole, WorkerMan Vega 是一个用 PHP 编写的 CLI 模式 HTTP 网络框架,支持 Swoole、Work

Mix PHP 46 Apr 28, 2022
A easy way to install your basic yii projetc, we have encrypt database password in phpfile, my class with alot funtions to help you encrypt and decrypt and our swoole server install just run ./yii swoole/start and be happy!

Yii 2 Basic Project Template with swoole and Modules Yii 2 Basic Project Template is a skeleton Yii 2 application best for rapidly creating small proj

null 3 Apr 11, 2022
swoole,easyswoole,swoole framework

EasySwoole - A High Performance Swoole Framework EasySwoole is a distributed, persistent memory PHP framework based on the Swoole extension. It was cr

null 4.6k Jan 2, 2023
swoole and golang ipc, use goroutine complete swoole coroutine

swoole and golang ipc demo swoole process module exec go excutable file as sider car, use goroutine complete swoole coroutine hub.php <?php require '

null 2 Apr 17, 2022
LaravelS is an out-of-the-box adapter between Swoole and Laravel/Lumen.

?? LaravelS is an out-of-the-box adapter between Swoole and Laravel/Lumen.

Biao Xie 3.7k Dec 29, 2022
This package provides a high performance HTTP server to speed up your Laravel/Lumen application based on Swoole.

This package provides a high performance HTTP server to speed up your Laravel/Lumen application based on Swoole.

Swoole Taiwan 3.9k Jan 8, 2023
一个极简高性能php框架,支持[swoole | php-fpm ]环境

One - 一个极简高性能php框架,支持[swoole | php-fpm ]环境 快 - 即使在php-fpm下也能1ms内响应请求 简单 - 让你重点关心用one做什么,而不是怎么用one 灵活 - 各个组件松耦合,可以灵活搭配使用,使用方法保持一致 原生sql可以和模型关系with搭配使用,

vic 862 Jan 1, 2023
球球大作战(PHP+Swoole)

球球大作战 在线多人H5游戏(H5客户端, 服务端) W A S D 控制 技术栈: PHP7.4+ Swoole4.6(多进程, WebSocket, 共享内存) SpriteJS v3(2D Canvas渲染) 演示Demo 安装: 环境要求:Linux,PHP7.4+(启用Swoole拓展)

Vacant 15 May 9, 2022
swoole worker

SwooleWorker SwooleWorker is a distributed long connection development framework based on Swoole4. 【Github】 【HomePage】 Manual 【ENGLISH】 【简体中文】 Usage s

null 123 Jan 9, 2023