PHP library for Disque, an in-memory, distributed job queue

Overview

disque-php

Latest Version Software License Build Status Coverage Status Quality Score Total Downloads

A PHP library for the very promising disque distributed job queue. Features:

  • Support for both PHP (5.5+) and HHVM
  • No dependencies: Fast connection to Disque out-of-the-box
  • High level API to easily push jobs to a queue, and retrieve jobs from queues
  • Easily schedule jobs for execution at a certain DateTime
  • Use the built in Job class, or implement your own
  • Smart node connection support based on number of jobs produced by nodes
  • Connect to Disque with the built-in connection, or reutilize your existing Redis client (such as predis)
  • Supporting all current Disque commands, and allows you to easily implement custom commands
  • Fully unit tested

Installation

$ composer require mariano/disque-php --no-dev

If you want to run its tests remove the --no-dev argument.

Usage

This library provides a Queue API for easy job pushing/pulling, and direct access to all Disque commands via its Client API.

Create the client:

use Disque\Connection\Credentials;
use Disque\Client;

$nodes = [
    new Credentials('127.0.0.1', 7711),
    new Credentials('127.0.0.1', 7712, 'password'),
];

$disque = new Client($nodes);

Queue a job:

$job = new \Disque\Queue\Job(['name' => 'Claudia']);
$disque->queue('my_queue')->push($job);

Schedule job to be processed at a certain time:

$job = new \Disque\Queue\Job(['name' => 'Mariano']);
$disque->queue('my_queue')->schedule($job, new \DateTime('+2 hours'));

Fetch queued jobs, mark them as processed, and keep waiting on jobs:

$queue = $disque->queue('my_queue');
while ($job = $queue->pull()) {
    echo "GOT JOB!";
    var_dump($job->getBody());
    $queue->processed($job);
}

For more information on the APIs provided, read the full documentation.

Testing

$ phpunit

Contributing

Please see CONTRIBUTING for details.

Support

If you need some help or even better want to collaborate, feel free to hit me on twitter: @mgiglesias

Security

If you discover any security related issues, please contact @mgiglesias instead of using the issue tracker.

Acknowledgments

First and foremost, Salvatore Sanfilippo for writing what looks to be the definite solution for job queues (thanks for all the fish Gearman).

Other disque client libraries for the inspiration.

The PHP League for an awesome README.md skeleton, and tips about packaging PHP components.

A special acknolewdgment and appreciation for our amazing contributors!

License

The MIT License (MIT). Please see License File for more information.

Comments
  • RFC: Built in CLI tool to run jobs

    RFC: Built in CLI tool to run jobs

    One of the things I wanted to add for 2.0 is a built-in CLI tool that would make it easy for people to run jobs without them having to build an actual CLI job runner. I've just pushed to the cli_tool branch my proposal for this.

    What's your opinion @Revisor?

    TO-DO

    enhancement 
    opened by mariano 9
  • Add NOHANG support to GETJOB

    Add NOHANG support to GETJOB

    Add support for NOHANG to GETJOB.

    In the latest version of Disque, GETJOB can be called with the NOHANG option, which allows it to return immediately if no jobs are available. This PR adds support for that workflow.

    opened by kaecyra 8
  • Add methods for working with the body, NACKs and additional deliveries to the JobInterface

    Add methods for working with the body, NACKs and additional deliveries to the JobInterface

    GETJOB returns at least the queue, the ID and the body:

    Jobs are returned as a three elements array containing the queue name, the Job ID, and the job body itself.

    What's the point of a job without a body?

    Case in point: Disque\Queue\Marshal\JobMarshaler::marshal() expects a JobInterface type, but it actually requires a Job type and even throws an exception, because it needs the Job::getBody() method.

    And subquestions:

    • Should we agree that there is no point of a Job without an ID and a body, shouldn't they be provided as soon as in the constructor so that the object can't be instantiated in an invalid state?
    • In case the ID and the body are provided in the constructor, wouldn't it be preferrable to remove their setters and make Job immutable? What would be the use case for changing the Job ID or the body on the fly, after it has been created?
    opened by Revisor 8
  • Reconnect, if the current node has lost its connection

    Reconnect, if the current node has lost its connection

    When switching nodes, the prioritizer can recommend to stay on the current node. The current node however could have lost its connection in the meantime. In that case we will try and reconnect, or if not possible, connect to the next best possible node.

    The check whether the current node has disconnected has been moved so that it is checked every time we execute any command, not just after the GETJOB command.

    I tried to write a test for this situation, but ended up with a brittle monster of a test depending on the order of method calls among several mocks. :-/

    opened by Revisor 7
  • We should reconnect if not connected

    We should reconnect if not connected

    I have no idea on how to test this. I've come to this problem where a one queue processor adds to another queue and thus a new disque client is made. to push a job to another queue. And somewhat the connection in original disque client is dropped.

    opened by mvrhov 6
  • Is there something else you would like to change for 2.0?

    Is there something else you would like to change for 2.0?

    In #12 you mentioned that moving the connection code out of the Manager was something you had wanted to do.

    Is there something else you would like to change and take advantage of the upcoming 2.0? If so, can you create issues for it?

    There is always something to improve, of course, but what are the high priorities in your opinion?

    Since I'm now immersed in the library, I would be glad to help you.

    opened by Revisor 5
  • Should Queue::pull() throw an exception if there is no job to do?

    Should Queue::pull() throw an exception if there is no job to do?

    This is admittedly a small issue that doesn't concern the functionality, but rather best practices.

    Should Queue::pull() really throw an exception if there is no job to do? From all I've read, exceptions should be used sparingly and only for truly exceptional cases. If there is another clean way to relay the negative result, it should be favored over throwing an exception.

    According to this exception typology, the exception thrown here would be an example of a vexing exception. It is not exceptional to not have a job ready.

    What if the method just returned null?

    What's your opinion about this in general? There might be more cases like this, but this is the place I'm working on right now and I don't want to nitpick needlessly.

    opened by Revisor 5
  • hello() and info() throw ConnectionException if called first

    hello() and info() throw ConnectionException if called first

    This script runs fine:

    $client = new \Disque\Client([
        new \Disque\Connection\Credentials('127.0.0.1', 7711),
    ]);
    
    $job = $client->queue('something')->pull(1);
    
    var_dump($job);
    
    // hello called after pull
    var_dump($client->hello());
    

    But if i try to call hello first, it throws a ConnectionException every time.

    $client = new \Disque\Client([
        new \Disque\Connection\Credentials('127.0.0.1', 7711),
    ]);
    
    // hello called right after client created, throws exception here
    var_dump($client->hello());
    
    $job = $client->queue('something')->pull(1);
    
    var_dump($job);
    
    

    Any ideas? I'm trying to write a wrapper and I'd like to be able to test the connection when I create it. I thought using hello or info would be a great way to do that, but both seem to have this issue.

    opened by dwolfhub 4
  • Refactor creating a connection from Connection\Manager into a ConnectionBuilder?

    Refactor creating a connection from Connection\Manager into a ConnectionBuilder?

    While working on #8 and refactoring the Disque\Connection\Manager, I realized that, no, we cannot just set a connection object instead of a class name, because we need to create a new connection for each node separately.

    We have in my opinion two options:

    1. Leave it as it is now - Manager gets a class name that conforms to the ConnectionInterface and creates its own connections as needed
    2. Or the Manager gets a ConnectionBuilder (name may change) and every time it needs a new connection, it asks the builder for one.

    The status quo has these advantages:

    1. It's already there and it works.
    2. It's simple - just set a class name and go.

    Refactoring into a ConnectionBuilder has these advantages:

    1. We can use type hinting instead of the "condition + exception" combo: Set a ConnectionBuilder instead of a class name, and further methods would require a ConnectionInterface
    2. The Manager is already doing too much. ConnectionBuilder could make the Manager simpler by taking over all the steps needed for the actual connection:
    • Manager::buildConnection() - the instantiation of a new connection object
    • Manager::doConnect() - the actual connection
    • Manager::getNodeConnection() - the initial HELLO

    (There are multiple steps, that's why I suggest the name Builder instead of Factory)

    The Manager's role would then be to manage node connections as a group, choose the best node, disconnect the inactive ones etc. That's a complicated enough task, even without having to handhold single connections.

    This is more an internal refactoring, not much would change for the library user (unless they want to use a different connection object).

    How does it sound to you?

    opened by Revisor 4
  • Update Packagist package

    Update Packagist package

    If installed with Composer, disque-php GetJob command fails due to changed Job ID prefix. It is fixed in current version on Github, so Packagist version needs to be updated.

    opened by nailk 3
  • 2.0 Release

    2.0 Release

    Hi! I've been looking into this library and there's quite a difference between the last published stable version on packagist (1.3) and the latest one here. Any feature missing so you can release 2.0 ? Are you waiting for disque to release 1.0 to release your version ?

    opened by xocasdashdash 3
  • Implement PING - optional argument

    Implement PING - optional argument

    The PING command is useful for checking for underlying timeouts, connection issues, dead server nodes, etc. PING appears to be implemented the same as Redis's PING (but I've asked upstream whether it's likely to be supported long-term.)

    It works like this:

    127.0.0.1:7711> PING
    PONG
    127.0.0.1:7711> PING something
    "something"
    

    This means that the PING command takes one optional string argument. It appears that neither ARGUMENTS_TYPE_STRING nor ARGUMENTS_TYPE_EMPTY quite covers this type of command.

    opened by dominics 0
  • Consider dropping HHVM support

    Consider dropping HHVM support

    Builds for HHVM are currently failing.

    Presumably should be a major version to drop support, so maybe consider just no longer testing on HHVM for now and officially dropping support later.

    Nobody is using it (happy to be surprised if that's not the case), and it's still not compatible with 7.0 features (let alone 7.1, and 7.2 is now in alpha)

    opened by dominics 0
  • Logic error in Socket::getType() results in

    Logic error in Socket::getType() results in "Don't know how to handle a response of type" exception

    The method Socket::getType() (https://github.com/mariano/disque-php/blob/39bc697/src/Connection/Socket.php#L192L207) is notated via PHPDoc as @return string A single char - that's not the case; it's actually ?string due to a small logic bug.

    The problem is: if $this->socket is at EOF, the initial while condition is already false, and fgetc is never called. This makes the value of $type strictly equal to null, not false or the empty string.

    From there, the check at the end of the method ($type === false || $type === '') will fail to match, and the correct exception is not thrown (should probably be "Nothing received while reading from client" - this usually happens during timeout).

    This results in the user getting a ResponseException (when there was no response!) instead of a ConnectionException. The message is confusing too: "Don't know how to handle a response of type".

    opened by dominics 1
  • Client does not cope with missing host in HELLO reponse

    Client does not cope with missing host in HELLO reponse

    It's not well documented, but the HELLO response is not expected to always reply with an IP address for every node.

    Specifically:

    • Shut down any running Disque instance
    • Clear any existing nodes.conf
    • Start a single-node cluster of Disque (must never have had CLUSTER MEET called on it)
    • Connect and call HELLO

    In this case, the Disque server does not have an IP address yet (even if you use the bind option), and the HELLO response looks like this:

    127.0.0.1:7711> hello
    1) (integer) 1
    2) "b27f65e00cb76bb4de71dab8148f3a09d93251cd"
    3) 1) "b27f65e00cb76bb4de71dab8148f3a09d93251cd"
       2) ""
       3) "7711"
       4) "1"
    

    In this situation, the client will still consider this a valid candidate node, and will load it into the connection managers ->nodes member during revealClusterFromHello()/revealNodeFromHello(). From there, it may be selected by the priorityStrategy and put into use (because even though it doesn't have a host, it does have a low priority), causing an error.

    Instead, we should filter out HELLO responses that don't have a host set (we wouldn't be able to connect to them anyway)

    opened by dominics 2
  • Roadmap for 3.0

    Roadmap for 3.0

    I'd like to release 3.0 soon. On the list of to-do I see:

    • [ ] Issue #25
    • [ ] Issue #28
    • [ ] Issue #43
    • [ ] Review and add official documentation for Disqontrol (see issue #35)

    I'd also like to make php7 the minimum requirement, and consequently move to full strict typing.

    What are your feelings retarding this roadmap and particularly the new minimum php version to be supported?

    opened by mariano 5
Releases(2.0.3)
  • 2.0.3(Feb 16, 2017)

    Added

    • Added nohang option to getJob(). Thanks @kaecyra
    • Added ability to specify job options when scheduling a job via schedule(). Thanks @aleksraiden

    Changed

    • Removed predis/predis from list of suggested packages to install
    Source code(tar.gz)
    Source code(zip)
  • 2.0.2(May 10, 2016)

  • 2.0.1(May 10, 2016)

    Changed

    • Job IDs should now always follow the Disque RC1 format.
    • Reconnect to node when node lost connection.

    Added

    • Added support for JSCAN
    • Added support for PAUSE
    Source code(tar.gz)
    Source code(zip)
  • 2.0-alpha(Nov 3, 2015)

    Changed

    • Exception Disque\Connection\ResponseException has been moved to Disque\Connection\Response\ResponseException
    • The Disque constructor has changed, so instead of receiving an array of IP addresses, it now receives an array of Credentials, where each Credentials instance refers to a specific Disque node, and allows the use of passworded nodes.
    • JobInterface has changed to add the following methods: getBody(), setBody(), getQueue(), setQueue(), getNacks(), setNacks(), getAdditionalDeliveries(), setAdditionalDeliveries()
    • The pull() method in Queue no longer throws a JobNotAvailableException if no job is available, but instead returns null.
    • The JobNotAvailableException has been removed, as no jobs being available is not actually an exception, but a possible acceptable outcome.
    • ManagerInterface no longer has the getConnectionClass() and setConnectionClass() methods. Instead it uses the new setConnectionFactory() method to allow one to specify a connection factory.

    Added

    • Added Node, which handles the connection to a specific node.
    • Added the failed() method to Queue which can be used to mark a job as failed, therefore increasing its NACK counter.
    • Added ConnectionFactoryInterface, used by ManagerInterface, to create a new connection to redis.
    • Added ConnectionFactoryInterface implementation classes PredisFactory and SocketFactory
    • Added NodePrioritizerInterface to allow customizing the way the client switches through nodes based on a specific strategy.
    • Added NodePrioritizerInterface implementation classes ConservativeJobCountPrioritizer, RandomPrioritizer and NullPrioritizer
    • Added option withcounters to the $options argument in getJob() which allows the returned job to include its NACK and additional deliveries counters.
    • Added support for NACK
    Source code(tar.gz)
    Source code(zip)
  • 1.3.0(May 18, 2015)

    Added

    • Added support for WORKING.
    • Added processing() method to Queue API.
    • Added $password option to addServer() in Disque\Client.

    Changed

    • By default when creating a new Disque\Client without arguments NO server is pre-loaded. You will have to manually add servers via addServer(), or specify them to the Disque\Client constructor.
    Source code(tar.gz)
    Source code(zip)
  • 1.2.1(May 14, 2015)

    Changed

    • QPEEK changed in upstream and now returns the job queue. Client API has been modified to reflect this.
    • CommandInterface has a new method: isBlocking(), which tells if the given command should block while waiting for a response to not be affected by timeouts.

    Added

    • Added support for QSCAN.

    Fixed

    • Fixed bug where if the connection would timeout while waiting for a response a ConnectionException would be thrown. This affected getJob() which should not be interrupted by a timeout. This required a change in the definition of CommandInterface by adding the method isBlocking()
    Source code(tar.gz)
    Source code(zip)
  • 1.2.0(May 12, 2015)

    Changed

    • JobInterface is now a simpler interface. Its load() and dump() methods have been moved to a MarshalInterface, effectively changing how custom Job classes work.
    • Disque\Queue\MarshalException has been moved to Disque\Queue\Marshal\MarshalException.
    • The setJobClass() method in Queue has been removed. Instead use setMarshaler(), which should be given an instance of Disque\Queue\Marshaler\MarshalerInterface.
    Source code(tar.gz)
    Source code(zip)
  • 1.1.0(May 10, 2015)

    Added

    • Refactoring of response parsing for greater flexibility.
    • Added new Disque\Connection\Manager class to manage connections to nodes.
    • GETJOB can now influence which node we are connected to. By means of $disque->getConnectionManager()->setMinimumJobsToChangeNode() we can specify that if a certain node produces that many jobs, then we should instead connect to the node producing those jobs (as suggested by Disque itself).
    • Added Disque\Queue\Queue and Disque\Queue\Job class to offer a higher level API that simplifies queueing and fetching jobs
    • Added method queue() to Disque\Client to create / fetch a queue which is an instance of Disque\Queue\Queue
    • Added schedule() method to Disque\Queue that allows to easily schedule jobs to be processed at a certain time.

    Changed

    • Disque\Connection\Connection is now named Disque\Connection\Socket.
    • The method setConnectionImplementation has been moved to Disque\Connection\Manager, and renamed to setConnectionClass. So from a disque instance you can change it via: $disque->getConnectionManager()->setConnectionClass($class)
    • Moved exceptions around:
      • Disque\Exception\InvalidCommandArgumentException to Disque\Command\Argument\InvalidArgumentException
      • Disque\Exception\InvalidCommandException to Disque\Command\InvalidCommandException
      • Disque\Exception\InvalidCommandOptionException to Disque\Command\Argument\InvalidOptionException
      • Disque\Exception\InvalidCommandResponseException to Disque\Command\Response\InvalidResponseException
      • Disque\Exception\Connection\Exception\ConnectionException to Disque\Connection\ConnectionException
      • Disque\Exception\Connection\Exception\ResponseException to Disque\Connection\ResponseException
      • Disque\Exception\DisqueException to Disque\DisqueException

    Fixed

    • Fixed issue where when a timeout was using when calling getJob(), and the call timed out because there were no jobs available, an non-array would be returned. No on that case it will return an empty array.
    • Fixed issue where if no options were provided to addJob(), yet all three parameters were specified, an InvalidCommandArgumentException was thrown.
    Source code(tar.gz)
    Source code(zip)
  • 1.0.0(May 4, 2015)

    • Added support for commands HELLO, INFO, SHOW, ADDJOB, DELJOB, GETJOB, ACKJOB, FASTACK, ENQUEUE, DEQUEUE, QLEN, QPEEK
    • Added built-in connection to Disque
    • Added support for Predis connections, and allowing adding new connection methods via ConnectionInterface
    Source code(tar.gz)
    Source code(zip)
Owner
Mariano Iglesias
Mariano Iglesias
laminas-memory manages data in an environment with limited memory

Memory objects (memory containers) are generated by the memory manager, and transparently swapped/loaded when required.

Laminas Project 5 Jul 26, 2022
zend-memory manages data in an environment with limited memory

Memory objects (memory containers) are generated by the memory manager, and transparently swapped/loaded when required.

Zend Framework 16 Aug 29, 2020
Ip2region is a offline IP location library with accuracy rate of 99.9% and 0.0x millseconds searching performance. DB file is ONLY a few megabytes with all IP address stored. binding for Java,PHP,C,Python,Nodejs,Golang,C#,lua. Binary,B-tree,Memory searching algorithm

Ip2region是什么? ip2region - 准确率99.9%的离线IP地址定位库,0.0x毫秒级查询,ip2region.db数据库只有数MB,提供了java,php,c,python,nodejs,golang,c#等查询绑定和Binary,B树,内存三种查询算法。 Ip2region特性

Lion 12.6k Dec 30, 2022
A library of powerful code snippets to help you get the job done with Gravity Forms and Gravity Perks.

Gravity Wiz Snippet Library Gravity Wiz is creating the most comprehensive library of snippets for Gravity Forms ever. We'll be consistently moving ou

Gravity Wiz 151 Dec 27, 2022
This example shows how to estimate pi, using generated random numbers that uniformly distributed.

php-estimatepi This example shows how to estimate pi, using generated random numbers that uniformly distributed. Every pair of numbers produced will b

Oğuzhan Cerit 1 Nov 26, 2021
A cross-language remote procedure call(RPC) framework for rapid development of high performance distributed services.

Motan Overview Motan is a cross-language remote procedure call(RPC) framework for rapid development of high performance distributed services. Related

Weibo R&D Open Source Projects 5.8k Dec 20, 2022
TiDB is an open source distributed HTAP database compatible with the MySQL protocol

What is TiDB? TiDB ("Ti" stands for Titanium) is an open-source NewSQL database that supports Hybrid Transactional and Analytical Processing (HTAP) wo

PingCAP 33.1k Jan 9, 2023
PHP Meminfo is a PHP extension that gives you insights on the PHP memory content

MEMINFO PHP Meminfo is a PHP extension that gives you insights on the PHP memory content. Its main goal is to help you understand memory leaks: by loo

Benoit Jacquemont 994 Dec 29, 2022
Get the system resources in PHP, as memory, number of CPU'S, Temperature of CPU or GPU, Operating System, Hard Disk usage, .... Works in Windows & Linux

system-resources. A class to get the hardware resources We can get CPU load, CPU/GPU temperature, free/used memory & Hard disk. Written in PHP It is a

Rafael Martin Soto 10 Oct 15, 2022
This PHP script optimizes the speed of your RAM memory

██████╗░██╗░░██╗██████╗░░█████╗░██╗░░░░░███████╗░█████╗░███╗░░██╗███████╗██████╗░ ██╔══██╗██║░░██║██╔══██╗██╔══██╗██║░░░░░██╔════╝██╔══██╗████╗░██║██╔

Érik Freitas 7 Feb 12, 2022
An autoscaling Bloom filter with ultra-low memory footprint for PHP

Ok Bloomer An autoscaling Bloom filter with ultra-low memory footprint for PHP. Ok Bloomer employs a novel layered filtering strategy that allows it t

Andrew DalPino 2 Sep 20, 2022
Bug bounty tools built in PHP to help penetration tester doing the job

BugBountyTools-PHP Bug bounty tools built in PHP to help penetration tester doing the job Website who using this script: KitaBantu Soon! 403 Bypasser

Muhammad Daffa 7 Aug 17, 2022
Iran decoration platform is an open source Php web application where you can find your job as a freelancer working in people home in decoration positions and others.

Iran-Decoration Platform Iran decoration platform is an open source Php web application where you can find your job as a freelancer working in people

AmirHossein Mohammadi 8 Dec 14, 2022
High-performance, low-memory-footprint, single-file embedded database for key/value storage

LDBA - a fast, pure PHP, key-value database. Information LDBA is a high-performance, low-memory-footprint, single-file embedded database for key/value

Simplito 12 Nov 13, 2022
Prisma is an app that strengthens the relationship between people with memory loss and the people close to them

Prisma is an app that strengthens the relationship between people with memory loss and the people close to them. It does this by providing a living, collaborative digital photo album that can be populated with content of interest to these people.

Soulcenter 45 Dec 8, 2021
JsonCollectionParser - Event-based parser for large JSON collections (consumes small amount of memory)

Event-based parser for large JSON collections (consumes small amount of memory). Built on top of JSON Streaming Parser This packa

Max Grigorian 113 Dec 6, 2022
Provides an object-oriented API to query in-memory collections in a SQL-style.

POQ - PHP Object Query Install composer require alexandre-daubois/poq 1.0.0-beta2 That's it, ready to go! ?? Usage Here is the set of data we're going

Alexandre Daubois 16 Nov 29, 2022
Kirby Janitor job for staging

Simple staging Janitor jobs Plugin for very simple staging setup for https://github.com/bnomei/kirby3-janitor/ (required). Beta quality - use at your

Florian Karsten 8 Nov 27, 2022
Use php-fpm as a simple built-in async queue

PHP-FPM Async Queue Use php-fpm as a simple built-in async queue. Based on interoperable queue interfaces Queue Interop. Usage composer makasim/php-fp

Max Kotliar 111 Nov 19, 2022