Commit 8605bf82 authored by zazaname's avatar zazaname

聚合服务端后台V1.0

parent 9afd96bc
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
return [
'default' => 'sync',
'connections' => [
'sync' => [
'type' => 'sync',
],
'database' => [
'type' => 'database',
'queue' => 'default',
'table' => 'jobs',
'connection' => null,
],
'redis' => [
'type' => 'redis',
'queue' => 'default',
'host' => '127.0.0.1',
'port' => 6379,
'password' => '',
'select' => 0,
'timeout' => 0,
'persistent' => false,
],
],
'failed' => [
'type' => 'none',
'table' => 'failed_jobs',
],
];
<?php
// This file is automatically generated at:2023-04-26 12:21:40
// This file is automatically generated at:2023-05-05 16:08:46
declare (strict_types = 1);
return array (
0 => 'think\\captcha\\CaptchaService',
1 => 'think\\app\\Service',
2 => 'think\\queue\\Service',
);
\ No newline at end of file
/vendor/
/.idea/
/composer.lock
/thinkphp/
This diff is collapsed.
# think-queue for ThinkPHP6
## 安装
> composer require topthink/think-queue
## 配置
> 配置文件位于 `config/queue.php`
### 公共配置
```bash
[
'default'=>'sync' //驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动//或其他自定义的完整的类名
]
```
## 创建任务类
> 推荐使用 `app\job` 作为任务类的命名空间
> 也可以放在任意可以自动加载到的地方
任务类不需继承任何类,如果这个类只有一个任务,那么就只需要提供一个`fire`方法就可以了,如果有多个小任务,就写多个方法,下面发布任务的时候会有区别
每个方法会传入两个参数 `think\queue\Job $job`(当前的任务对象) 和 `$data`(发布任务时自定义的数据)
还有个可选的任务失败执行的方法 `failed` 传入的参数为`$data`(发布任务时自定义的数据)
### 下面写两个例子
```php
namespace app\job;
use think\queue\Job;
class Job1{
public function fire(Job $job, $data){
//....这里执行具体的任务
if ($job->attempts() > 3) {
//通过这个方法可以检查这个任务已经重试了几次了
}
//如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法
$job->delete();
// 也可以重新发布这个任务
$job->release($delay); //$delay为延迟时间
}
public function failed($data){
// ...任务达到最大重试次数后,失败了
}
}
```
```php
namespace app\lib\job;
use think\queue\Job;
class Job2{
public function task1(Job $job, $data){
}
public function task2(Job $job, $data){
}
public function failed($data){
}
}
```
## 发布任务
> `think\facade\Queue::push($job, $data = '', $queue = null)` 和 `think\facade\Queue::later($delay, $job, $data = '', $queue = null)` 两个方法,前者是立即执行,后者是在`$delay`秒后执行
`$job` 是任务名
命名空间是`app\job`的,比如上面的例子一,写`Job1`类名即可
其他的需要些完整的类名,比如上面的例子二,需要写完整的类名`app\lib\job\Job2`
如果一个任务类里有多个小任务的话,如上面的例子二,需要用@+方法名`app\lib\job\Job2@task1``app\lib\job\Job2@task2`
`$data` 是你要传到任务里的参数
`$queue` 队列名,指定这个任务是在哪个队列上执行,同下面监控队列的时候指定的队列名,可不填
## 监听任务并执行
```bash
&> php think queue:listen
&> php think queue:work
```
两种,具体的可选参数可以输入命令加 `--help` 查看
> 可配合supervisor使用,保证进程常驻
{
"name": "topthink/think-queue",
"description": "The ThinkPHP6 Queue Package",
"authors": [
{
"name": "yunwuxin",
"email": "448901948@qq.com"
}
],
"license": "Apache-2.0",
"autoload": {
"psr-4": {
"think\\": "src"
},
"files": [
"src/common.php"
]
},
"autoload-dev": {
"psr-4": {
"think\\test\\queue\\": "tests"
}
},
"minimum-stability": "dev",
"require": {
"ext-json": "*",
"topthink/framework": "^6.0",
"symfony/process": "^4.2",
"nesbot/carbon": "^2.16"
},
"extra": {
"think": {
"services": [
"think\\queue\\Service"
],
"config": {
"queue": "src/config.php"
}
}
},
"require-dev": {
"phpunit/phpunit": "^6.2",
"mockery/mockery": "^1.2",
"topthink/think-migration": "^3.0.0"
}
}
<?xml version="1.0" encoding="UTF-8"?>
<phpunit backupGlobals="false"
backupStaticAttributes="false"
beStrictAboutTestsThatDoNotTestAnything="false"
bootstrap="tests/bootstrap.php"
colors="true"
convertErrorsToExceptions="true"
convertNoticesToExceptions="true"
convertWarningsToExceptions="true"
processIsolation="false"
stopOnError="false"
stopOnFailure="false"
verbose="true"
>
<testsuites>
<testsuite name="Think Queue Test Suite">
<directory suffix="Test.php">./tests</directory>
</testsuite>
</testsuites>
<filter>
<whitelist processUncoveredFilesFromWhitelist="true">
<directory suffix=".php">./src</directory>
<exclude>
<file>./src/queue/Service.php</file>
<file>./src/common.php</file>
<file>./src/config.php</file>
</exclude>
</whitelist>
</filter>
</phpunit>
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think;
use think\queue\Connector;
use think\queue\connector\Database;
use think\queue\connector\Redis;
/**
* Class Queue
* @package think\queue
*
* @mixin Database
* @mixin Redis
*/
class Queue extends Manager
{
protected $namespace = '\\think\\queue\\connector\\';
protected function resolveType(string $name)
{
return $this->app->config->get("queue.connections.{$name}.type", 'sync');
}
protected function resolveConfig(string $name)
{
return $this->app->config->get("queue.connections.{$name}");
}
protected function createDriver(string $name)
{
/** @var Connector $driver */
$driver = parent::createDriver($name);
return $driver->setApp($this->app)
->setConnection($name);
}
/**
* @param null|string $name
* @return Connector
*/
public function connection($name = null)
{
return $this->driver($name);
}
/**
* 默认驱动
* @return string
*/
public function getDefaultDriver()
{
return $this->app->config->get('queue.default');
}
}
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
use think\facade\Queue;
if (!function_exists('queue')) {
/**
* 添加到队列
* @param $job
* @param string $data
* @param int $delay
* @param null $queue
*/
function queue($job, $data = '', $delay = 0, $queue = null)
{
if ($delay > 0) {
Queue::later($delay, $job, $data, $queue);
} else {
Queue::push($job, $data, $queue);
}
}
}
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
return [
'default' => 'sync',
'connections' => [
'sync' => [
'type' => 'sync',
],
'database' => [
'type' => 'database',
'queue' => 'default',
'table' => 'jobs',
'connection' => null,
],
'redis' => [
'type' => 'redis',
'queue' => 'default',
'host' => '127.0.0.1',
'port' => 6379,
'password' => '',
'select' => 0,
'timeout' => 0,
'persistent' => false,
],
],
'failed' => [
'type' => 'none',
'table' => 'failed_jobs',
],
];
<?php
namespace think\facade;
use think\Facade;
/**
* Class Queue
* @package think\facade
* @mixin \think\Queue
*/
class Queue extends Facade
{
protected static function getFacadeClass()
{
return 'queue';
}
}
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue;
use think\App;
class CallQueuedHandler
{
protected $app;
public function __construct(App $app)
{
$this->app = $app;
}
public function call(Job $job, array $data)
{
$command = unserialize($data['command']);
$this->app->invoke([$command, 'handle']);
if (!$job->isDeletedOrReleased()) {
$job->delete();
}
}
public function failed(array $data)
{
$command = unserialize($data['command']);
if (method_exists($command, 'failed')) {
$command->failed();
}
}
}
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue;
use DateTimeInterface;
use InvalidArgumentException;
use think\App;
abstract class Connector
{
/** @var App */
protected $app;
/**
* The connector name for the queue.
*
* @var string
*/
protected $connection;
protected $options = [];
abstract public function size($queue = null);
abstract public function push($job, $data = '', $queue = null);
public function pushOn($queue, $job, $data = '')
{
return $this->push($job, $data, $queue);
}
abstract public function pushRaw($payload, $queue = null, array $options = []);
abstract public function later($delay, $job, $data = '', $queue = null);
public function laterOn($queue, $delay, $job, $data = '')
{
return $this->later($delay, $job, $data, $queue);
}
public function bulk($jobs, $data = '', $queue = null)
{
foreach ((array) $jobs as $job) {
$this->push($job, $data, $queue);
}
}
abstract public function pop($queue = null);
protected function createPayload($job, $data = '')
{
$payload = $this->createPayloadArray($job, $data);
$payload = json_encode($payload);
if (JSON_ERROR_NONE !== json_last_error()) {
throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());
}
return $payload;
}
protected function createPayloadArray($job, $data = '')
{
return is_object($job)
? $this->createObjectPayload($job)
: $this->createPlainPayload($job, $data);
}
protected function createPlainPayload($job, $data)
{
return [
'job' => $job,
'maxTries' => null,
'timeout' => null,
'data' => $data,
];
}
protected function createObjectPayload($job)
{
return [
'job' => 'think\queue\CallQueuedHandler@call',
'maxTries' => $job->tries ?? null,
'timeout' => $job->timeout ?? null,
'timeoutAt' => $this->getJobExpiration($job),
'data' => [
'commandName' => get_class($job),
'command' => serialize(clone $job),
],
];
}
public function getJobExpiration($job)
{
if (!method_exists($job, 'retryUntil') && !isset($job->timeoutAt)) {
return;
}
$expiration = $job->timeoutAt ?? $job->retryUntil();
return $expiration instanceof DateTimeInterface
? $expiration->getTimestamp() : $expiration;
}
protected function setMeta($payload, $key, $value)
{
$payload = json_decode($payload, true);
$payload[$key] = $value;
$payload = json_encode($payload);
if (JSON_ERROR_NONE !== json_last_error()) {
throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());
}
return $payload;
}
public function setApp(App $app)
{
$this->app = $app;
return $this;
}
/**
* Get the connector name for the queue.
*
* @return string
*/
public function getConnection()
{
return $this->connection;
}
/**
* Set the connector name for the queue.
*
* @param string $name
* @return $this
*/
public function setConnection($name)
{
$this->connection = $name;
return $this;
}
}
<?php
namespace think\queue;
abstract class FailedJob
{
/**
* Log a failed job into storage.
*
* @param string $connection
* @param string $queue
* @param string $payload
* @param \Exception $exception
* @return int|null
*/
abstract public function log($connection, $queue, $payload, $exception);
/**
* Get a list of all of the failed jobs.
*
* @return array
*/
abstract public function all();
/**
* Get a single failed job.
*
* @param mixed $id
* @return object|null
*/
abstract public function find($id);
/**
* Delete a single failed job from storage.
*
* @param mixed $id
* @return bool
*/
abstract public function forget($id);
/**
* Flush all of the failed jobs from storage.
*
* @return void
*/
abstract public function flush();
}
<?php
namespace think\queue;
use Carbon\Carbon;
use DateInterval;
use DateTimeInterface;
trait InteractsWithTime
{
/**
* Get the number of seconds until the given DateTime.
*
* @param DateTimeInterface|DateInterval|int $delay
* @return int
*/
protected function secondsUntil($delay)
{
$delay = $this->parseDateInterval($delay);
return $delay instanceof DateTimeInterface
? max(0, $delay->getTimestamp() - $this->currentTime())
: (int) $delay;
}
/**
* Get the "available at" UNIX timestamp.
*
* @param DateTimeInterface|DateInterval|int $delay
* @return int
*/
protected function availableAt($delay = 0)
{
$delay = $this->parseDateInterval($delay);
return $delay instanceof DateTimeInterface
? $delay->getTimestamp()
: Carbon::now()->addRealSeconds($delay)->getTimestamp();
}
/**
* If the given value is an interval, convert it to a DateTime instance.
*
* @param DateTimeInterface|DateInterval|int $delay
* @return DateTimeInterface|int
*/
protected function parseDateInterval($delay)
{
if ($delay instanceof DateInterval) {
$delay = Carbon::now()->add($delay);
}
return $delay;
}
/**
* Get the current system time as a UNIX timestamp.
*
* @return int
*/
protected function currentTime()
{
return Carbon::now()->getTimestamp();
}
}
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue;
use Exception;
use think\App;
use think\helper\Arr;
use think\helper\Str;
abstract class Job
{
/**
* The job handler instance.
* @var object
*/
private $instance;
/**
* The JSON decoded version of "$job".
* @var array
*/
private $payload;
/**
* @var App
*/
protected $app;
/**
* The name of the queue the job belongs to.
* @var string
*/
protected $queue;
/**
* The name of the connection the job belongs to.
*/
protected $connection;
/**
* Indicates if the job has been deleted.
* @var bool
*/
protected $deleted = false;
/**
* Indicates if the job has been released.
* @var bool
*/
protected $released = false;
/**
* Indicates if the job has failed.
*
* @var bool
*/
protected $failed = false;
/**
* Get the decoded body of the job.
*
* @return mixed
*/
public function payload($name = null, $default = null)
{
if (empty($this->payload)) {
$this->payload = json_decode($this->getRawBody(), true);
}
if (empty($name)) {
return $this->payload;
}
return Arr::get($this->payload, $name, $default);
}
/**
* Fire the job.
* @return void
*/
public function fire()
{
$instance = $this->getResolvedJob();
[, $method] = $this->getParsedJob();
$instance->{$method}($this, $this->payload('data'));
}
/**
* Process an exception that caused the job to fail.
*
* @param Exception $e
* @return void
*/
public function failed($e)
{
$instance = $this->getResolvedJob();
if (method_exists($instance, 'failed')) {
$instance->failed($this->payload('data'), $e);
}
}
/**
* Delete the job from the queue.
* @return void
*/
public function delete()
{
$this->deleted = true;
}
/**
* Determine if the job has been deleted.
* @return bool
*/
public function isDeleted()
{
return $this->deleted;
}
/**
* Release the job back into the queue.
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
$this->released = true;
}
/**
* Determine if the job was released back into the queue.
* @return bool
*/
public function isReleased()
{
return $this->released;
}
/**
* Determine if the job has been deleted or released.
* @return bool
*/
public function isDeletedOrReleased()
{
return $this->isDeleted() || $this->isReleased();
}
/**
* Get the job identifier.
*
* @return string
*/
abstract public function getJobId();
/**
* Get the number of times the job has been attempted.
* @return int
*/
abstract public function attempts();
/**
* Get the raw body string for the job.
* @return string
*/
abstract public function getRawBody();
/**
* Parse the job declaration into class and method.
* @return array
*/
protected function getParsedJob()
{
$job = $this->payload('job');
$segments = explode('@', $job);
return count($segments) > 1 ? $segments : [$segments[0], 'fire'];
}
/**
* Resolve the given job handler.
* @param string $name
* @return mixed
*/
protected function resolve($name, $param)
{
$namespace = $this->app->getNamespace() . '\\job\\';
$class = false !== strpos($name, '\\') ? $name : $namespace . Str::studly($name);
return $this->app->make($class, [$param], true);
}
public function getResolvedJob()
{
if (empty($this->instance)) {
[$class] = $this->getParsedJob();
$this->instance = $this->resolve($class, $this->payload('data'));
}
return $this->instance;
}
/**
* Determine if the job has been marked as a failure.
*
* @return bool
*/
public function hasFailed()
{
return $this->failed;
}
/**
* Mark the job as "failed".
*
* @return void
*/
public function markAsFailed()
{
$this->failed = true;
}
/**
* Get the number of times to attempt a job.
*
* @return int|null
*/
public function maxTries()
{
return $this->payload('maxTries');
}
/**
* Get the number of seconds the job can run.
*
* @return int|null
*/
public function timeout()
{
return $this->payload('timeout');
}
/**
* Get the timestamp indicating when the job should timeout.
*
* @return int|null
*/
public function timeoutAt()
{
return $this->payload('timeoutAt');
}
/**
* Get the name of the queued job class.
*
* @return string
*/
public function getName()
{
return $this->payload('job');
}
/**
* Get the name of the connection the job belongs to.
*
* @return string
*/
public function getConnection()
{
return $this->connection;
}
/**
* Get the name of the queue the job belongs to.
* @return string
*/
public function getQueue()
{
return $this->queue;
}
}
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue;
use Closure;
use Symfony\Component\Process\PhpExecutableFinder;
use Symfony\Component\Process\Process;
use think\App;
class Listener
{
/**
* @var string
*/
protected $commandPath;
/**
* @var string
*/
protected $workerCommand;
/**
* @var \Closure|null
*/
protected $outputHandler;
/**
* @param string $commandPath
*/
public function __construct($commandPath)
{
$this->commandPath = $commandPath;
}
public static function __make(App $app)
{
return new self($app->getRootPath());
}
/**
* Get the PHP binary.
*
* @return string
*/
protected function phpBinary()
{
return (new PhpExecutableFinder)->find(false);
}
/**
* @param string $connection
* @param string $queue
* @param int $delay
* @param int $sleep
* @param int $maxTries
* @param int $memory
* @param int $timeout
* @return void
*/
public function listen($connection, $queue, $delay = 0, $sleep = 3, $maxTries = 0, $memory = 128, $timeout = 60)
{
$process = $this->makeProcess($connection, $queue, $delay, $sleep, $maxTries, $memory, $timeout);
while (true) {
$this->runProcess($process, $memory);
}
}
/**
* @param string $connection
* @param string $queue
* @param int $delay
* @param int $sleep
* @param int $maxTries
* @param int $memory
* @param int $timeout
* @return Process
*/
public function makeProcess($connection, $queue, $delay, $sleep, $maxTries, $memory, $timeout)
{
$command = array_filter([
$this->phpBinary(),
'think',
'queue:work',
$connection,
'--once',
"--queue={$queue}",
"--delay={$delay}",
"--memory={$memory}",
"--sleep={$sleep}",
"--tries={$maxTries}",
], function ($value) {
return !is_null($value);
});
return new Process($command, $this->commandPath, null, null, $timeout);
}
/**
* @param Process $process
* @param int $memory
*/
public function runProcess(Process $process, $memory)
{
$process->run(function ($type, $line) {
$this->handleWorkerOutput($type, $line);
});
if ($this->memoryExceeded($memory)) {
$this->stop();
}
}
/**
* @param int $type
* @param string $line
* @return void
*/
protected function handleWorkerOutput($type, $line)
{
if (isset($this->outputHandler)) {
call_user_func($this->outputHandler, $type, $line);
}
}
/**
* @param int $memoryLimit
* @return bool
*/
public function memoryExceeded($memoryLimit)
{
return (memory_get_usage() / 1024 / 1024) >= $memoryLimit;
}
/**
* @return void
*/
public function stop()
{
die;
}
/**
* @param \Closure $outputHandler
* @return void
*/
public function setOutputHandler(Closure $outputHandler)
{
$this->outputHandler = $outputHandler;
}
}
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue;
trait Queueable
{
/** @var string 连接 */
public $connection;
/** @var string 队列名称 */
public $queue;
/** @var integer 延迟时间 */
public $delay;
/**
* 设置连接名
* @param $connection
* @return $this
*/
public function onConnection($connection)
{
$this->connection = $connection;
return $this;
}
/**
* 设置队列名
* @param $queue
* @return $this
*/
public function onQueue($queue)
{
$this->queue = $queue;
return $this;
}
/**
* 设置延迟时间
* @param $delay
* @return $this
*/
public function delay($delay)
{
$this->delay = $delay;
return $this;
}
}
<?php
namespace think\queue;
use think\helper\Arr;
use think\helper\Str;
use think\Queue;
use think\queue\command\FailedTable;
use think\queue\command\FlushFailed;
use think\queue\command\ForgetFailed;
use think\queue\command\Listen;
use think\queue\command\ListFailed;
use think\queue\command\Restart;
use think\queue\command\Retry;
use think\queue\command\Table;
use think\queue\command\Work;
class Service extends \think\Service
{
public function register()
{
$this->app->bind('queue', Queue::class);
$this->app->bind('queue.failer', function () {
$config = $this->app->config->get('queue.failed', []);
$type = Arr::pull($config, 'type', 'none');
$class = false !== strpos($type, '\\') ? $type : '\\think\\queue\\failed\\' . Str::studly($type);
return $this->app->invokeClass($class, [$config]);
});
}
public function boot()
{
$this->commands([
FailedJob::class,
Table::class,
FlushFailed::class,
ForgetFailed::class,
ListFailed::class,
Retry::class,
Work::class,
Restart::class,
Listen::class,
FailedTable::class,
]);
}
}
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue;
interface ShouldQueue
{
}
This diff is collapsed.
<?php
namespace think\queue\command;
use think\console\Command;
use think\helper\Str;
use think\migration\Creator;
class FailedTable extends Command
{
protected function configure()
{
$this->setName('queue:failed-table')
->setDescription('Create a migration for the failed queue jobs database table');
}
public function handle()
{
if (!$this->app->has('migration.creator')) {
$this->output->error('Install think-migration first please');
return;
}
$table = $this->app->config->get('queue.failed.table');
$className = Str::studly("create_{$table}_table");
/** @var Creator $creator */
$creator = $this->app->get('migration.creator');
$path = $creator->create($className);
// Load the alternative template if it is defined.
$contents = file_get_contents(__DIR__ . '/stubs/failed_jobs.stub');
// inject the class names appropriate to this migration
$contents = strtr($contents, [
'CreateFailedJobsTable' => $className,
'{{table}}' => $table,
]);
file_put_contents($path, $contents);
$this->output->info('Migration created successfully!');
}
}
<?php
namespace think\queue\command;
use think\console\Command;
class FlushFailed extends Command
{
protected function configure()
{
$this->setName('queue:flush')
->setDescription('Flush all of the failed queue jobs');
}
public function handle()
{
$this->app->get('queue.failer')->flush();
$this->output->info('All failed jobs deleted successfully!');
}
}
<?php
namespace think\queue\command;
use think\console\Command;
use think\console\input\Argument;
class ForgetFailed extends Command
{
protected function configure()
{
$this->setName('queue:forget')
->addArgument('id', Argument::REQUIRED, 'The ID of the failed job')
->setDescription('Delete a failed queue job');
}
public function handle()
{
if ($this->app['queue.failer']->forget($this->input->getArgument('id'))) {
$this->output->info('Failed job deleted successfully!');
} else {
$this->output->error('No failed job matches the given ID.');
}
}
}
<?php
namespace think\queue\command;
use think\console\Command;
use think\console\Table;
use think\helper\Arr;
class ListFailed extends Command
{
/**
* The table headers for the command.
*
* @var array
*/
protected $headers = ['ID', 'Connection', 'Queue', 'Class', 'Fail Time'];
protected function configure()
{
$this->setName('queue:failed')
->setDescription('List all of the failed queue jobs');
}
public function handle()
{
if (count($jobs = $this->getFailedJobs()) === 0) {
$this->output->info('No failed jobs!');
return;
}
$this->displayFailedJobs($jobs);
}
/**
* Display the failed jobs in the console.
*
* @param array $jobs
* @return void
*/
protected function displayFailedJobs(array $jobs)
{
$table = new Table();
$table->setHeader($this->headers);
$table->setRows($jobs);
$this->table($table);
}
/**
* Compile the failed jobs into a displayable format.
*
* @return array
*/
protected function getFailedJobs()
{
$failed = $this->app['queue.failer']->all();
return collect($failed)->map(function ($failed) {
return $this->parseFailedJob((array) $failed);
})->filter()->all();
}
/**
* Parse the failed job row.
*
* @param array $failed
* @return array
*/
protected function parseFailedJob(array $failed)
{
$row = array_values(Arr::except($failed, ['payload', 'exception']));
array_splice($row, 3, 0, $this->extractJobName($failed['payload']));
return $row;
}
/**
* Extract the failed job name from payload.
*
* @param string $payload
* @return string|null
*/
private function extractJobName($payload)
{
$payload = json_decode($payload, true);
if ($payload && (!isset($payload['data']['command']))) {
return $payload['job'] ?? null;
} elseif ($payload && isset($payload['data']['command'])) {
return $this->matchJobName($payload);
}
}
/**
* Match the job name from the payload.
*
* @param array $payload
* @return string
*/
protected function matchJobName($payload)
{
preg_match('/"([^"]+)"/', $payload['data']['command'], $matches);
if (isset($matches[1])) {
return $matches[1];
}
return $payload['job'] ?? null;
}
}
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue\command;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
use think\queue\Listener;
class Listen extends Command
{
/** @var Listener */
protected $listener;
public function __construct(Listener $listener)
{
parent::__construct();
$this->listener = $listener;
$this->listener->setOutputHandler(function ($type, $line) {
$this->output->write($line);
});
}
protected function configure()
{
$this->setName('queue:listen')
->addArgument('connection', Argument::OPTIONAL, 'The name of the queue connection to work', null)
->addOption('queue', null, Option::VALUE_OPTIONAL, 'The queue to listen on', null)
->addOption('delay', null, Option::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0)
->addOption('memory', null, Option::VALUE_OPTIONAL, 'The memory limit in megabytes', 128)
->addOption('timeout', null, Option::VALUE_OPTIONAL, 'Seconds a job may run before timing out', 60)
->addOption('sleep', null, Option::VALUE_OPTIONAL, 'Seconds to wait before checking queue for jobs', 3)
->addOption('tries', null, Option::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 0)
->setDescription('Listen to a given queue');
}
public function execute(Input $input, Output $output)
{
$connection = $input->getArgument('connection') ?: $this->app->config->get('queue.default');
$queue = $input->getOption('queue') ?: $this->app->config->get("queue.connections.{$connection}.queue", 'default');
$delay = $input->getOption('delay');
$memory = $input->getOption('memory');
$timeout = $input->getOption('timeout');
$sleep = $input->getOption('sleep');
$tries = $input->getOption('tries');
$this->listener->listen($connection, $queue, $delay, $sleep, $tries, $memory, $timeout);
}
}
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue\command;
use think\Cache;
use think\console\Command;
use think\queue\InteractsWithTime;
class Restart extends Command
{
use InteractsWithTime;
protected function configure()
{
$this->setName('queue:restart')
->setDescription('Restart queue worker daemons after their current job');
}
public function handle(Cache $cache)
{
$cache->set('think:queue:restart', $this->currentTime());
$this->output->info("Broadcasting queue restart signal.");
}
}
<?php
namespace think\queue\command;
use stdClass;
use think\console\Command;
use think\console\input\Argument;
use think\helper\Arr;
class Retry extends Command
{
protected function configure()
{
$this->setName('queue:retry')
->addArgument('id', Argument::IS_ARRAY | Argument::REQUIRED, 'The ID of the failed job or "all" to retry all jobs')
->setDescription('Retry a failed queue job');
}
public function handle()
{
foreach ($this->getJobIds() as $id) {
$job = $this->app['queue.failer']->find($id);
if (is_null($job)) {
$this->output->error("Unable to find failed job with ID [{$id}].");
} else {
$this->retryJob($job);
$this->output->info("The failed job [{$id}] has been pushed back onto the queue!");
$this->app['queue.failer']->forget($id);
}
}
}
/**
* Retry the queue job.
*
* @param stdClass $job
* @return void
*/
protected function retryJob($job)
{
$this->app['queue']->connection($job['connection'])->pushRaw(
$this->resetAttempts($job['payload']),
$job['queue']
);
}
/**
* Reset the payload attempts.
*
* Applicable to Redis jobs which store attempts in their payload.
*
* @param string $payload
* @return string
*/
protected function resetAttempts($payload)
{
$payload = json_decode($payload, true);
if (isset($payload['attempts'])) {
$payload['attempts'] = 0;
}
return json_encode($payload);
}
/**
* Get the job IDs to be retried.
*
* @return array
*/
protected function getJobIds()
{
$ids = (array) $this->input->getArgument('id');
if (count($ids) === 1 && $ids[0] === 'all') {
$ids = Arr::pluck($this->app['queue.failer']->all(), 'id');
}
return $ids;
}
}
<?php
namespace think\queue\command;
use think\console\Command;
use think\helper\Str;
use think\migration\Creator;
class Table extends Command
{
protected function configure()
{
$this->setName('queue:table')
->setDescription('Create a migration for the queue jobs database table');
}
public function handle()
{
if (!$this->app->has('migration.creator')) {
$this->output->error('Install think-migration first please');
return;
}
$table = $this->app->config->get('queue.connections.database.table');
$className = Str::studly("create_{$table}_table");
/** @var Creator $creator */
$creator = $this->app->get('migration.creator');
$path = $creator->create($className);
// Load the alternative template if it is defined.
$contents = file_get_contents(__DIR__ . '/stubs/jobs.stub');
// inject the class names appropriate to this migration
$contents = strtr($contents, [
'CreateJobsTable' => $className,
'{{table}}' => $table,
]);
file_put_contents($path, $contents);
$this->output->info('Migration created successfully!');
}
}
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue\command;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
use think\queue\event\JobFailed;
use think\queue\event\JobProcessed;
use think\queue\event\JobProcessing;
use think\queue\Job;
use think\queue\Worker;
class Work extends Command
{
/**
* The queue worker instance.
* @var Worker
*/
protected $worker;
public function __construct(Worker $worker)
{
parent::__construct();
$this->worker = $worker;
}
protected function configure()
{
$this->setName('queue:work')
->addArgument('connection', Argument::OPTIONAL, 'The name of the queue connection to work', null)
->addOption('queue', null, Option::VALUE_OPTIONAL, 'The queue to listen on')
->addOption('once', null, Option::VALUE_NONE, 'Only process the next job on the queue')
->addOption('delay', null, Option::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0)
->addOption('force', null, Option::VALUE_NONE, 'Force the worker to run even in maintenance mode')
->addOption('memory', null, Option::VALUE_OPTIONAL, 'The memory limit in megabytes', 128)
->addOption('timeout', null, Option::VALUE_OPTIONAL, 'The number of seconds a child process can run', 60)
->addOption('sleep', null, Option::VALUE_OPTIONAL, 'Number of seconds to sleep when no job is available', 3)
->addOption('tries', null, Option::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 0)
->setDescription('Process the next job on a queue');
}
/**
* Execute the console command.
* @param Input $input
* @param Output $output
* @return int|null|void
*/
public function execute(Input $input, Output $output)
{
$connection = $input->getArgument('connection') ?: $this->app->config->get('queue.default');
$queue = $input->getOption('queue') ?: $this->app->config->get("queue.connections.{$connection}.queue", 'default');
$delay = $input->getOption('delay');
$sleep = $input->getOption('sleep');
$tries = $input->getOption('tries');
$this->listenForEvents();
if ($input->getOption('once')) {
$this->worker->runNextJob($connection, $queue, $delay, $sleep, $tries);
} else {
$memory = $input->getOption('memory');
$timeout = $input->getOption('timeout');
$this->worker->daemon($connection, $queue, $delay, $sleep, $tries, $memory, $timeout);
}
}
/**
* 注册事件
*/
protected function listenForEvents()
{
$this->app->event->listen(JobProcessing::class, function (JobProcessing $event) {
$this->writeOutput($event->job, 'starting');
});
$this->app->event->listen(JobProcessed::class, function (JobProcessed $event) {
$this->writeOutput($event->job, 'success');
});
$this->app->event->listen(JobFailed::class, function (JobFailed $event) {
$this->writeOutput($event->job, 'failed');
$this->logFailedJob($event);
});
}
/**
* Write the status output for the queue worker.
*
* @param Job $job
* @param $status
*/
protected function writeOutput(Job $job, $status)
{
switch ($status) {
case 'starting':
$this->writeStatus($job, 'Processing', 'comment');
break;
case 'success':
$this->writeStatus($job, 'Processed', 'info');
break;
case 'failed':
$this->writeStatus($job, 'Failed', 'error');
break;
}
}
/**
* Format the status output for the queue worker.
*
* @param Job $job
* @param string $status
* @param string $type
* @return void
*/
protected function writeStatus(Job $job, $status, $type)
{
$this->output->writeln(sprintf(
"<{$type}>[%s][%s] %s</{$type}> %s",
date('Y-m-d H:i:s'),
$job->getJobId(),
str_pad("{$status}:", 11),
$job->getName()
));
}
/**
* 记录失败任务
* @param JobFailed $event
*/
protected function logFailedJob(JobFailed $event)
{
$this->app['queue.failer']->log(
$event->connection,
$event->job->getQueue(),
$event->job->getRawBody(),
$event->exception
);
}
}
<?php
use think\migration\db\Column;
use think\migration\Migrator;
class CreateFailedJobsTable extends Migrator
{
public function change()
{
$this->table('{{table}}')
->addColumn(Column::text('connection'))
->addColumn(Column::text('queue'))
->addColumn(Column::longText('payload'))
->addColumn(Column::longText('exception'))
->addColumn(Column::timestamp('fail_time')->setDefault('CURRENT_TIMESTAMP'))
->create();
}
}
<?php
use think\migration\db\Column;
use think\migration\Migrator;
class CreateJobsTable extends Migrator
{
public function change()
{
$this->table('{{table}}')
->addColumn(Column::string('queue'))
->addColumn(Column::longText('payload'))
->addColumn(Column::tinyInteger('attempts')->setUnsigned())
->addColumn(Column::unsignedInteger('reserve_time')->setNullable())
->addColumn(Column::unsignedInteger('available_time'))
->addColumn(Column::unsignedInteger('create_time'))
->addIndex('queue')
->create();
}
}
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue\connector;
use Carbon\Carbon;
use stdClass;
use think\Db;
use think\db\ConnectionInterface;
use think\db\Query;
use think\queue\Connector;
use think\queue\InteractsWithTime;
use think\queue\job\Database as DatabaseJob;
class Database extends Connector
{
use InteractsWithTime;
protected $db;
/**
* The database table that holds the jobs.
*
* @var string
*/
protected $table;
/**
* The name of the default queue.
*
* @var string
*/
protected $default;
/**
* The expiration time of a job.
*
* @var int|null
*/
protected $retryAfter = 60;
public function __construct(ConnectionInterface $db, $table, $default = 'default', $retryAfter = 60)
{
$this->db = $db;
$this->table = $table;
$this->default = $default;
$this->retryAfter = $retryAfter;
}
public static function __make(Db $db, $config)
{
$connection = $db->connect($config['connection'] ?? null);
return new self($connection, $config['table'], $config['queue'], $config['retry_after'] ?? 60);
}
public function size($queue = null)
{
return $this->db
->name($this->table)
->where('queue', $this->getQueue($queue))
->count();
}
public function push($job, $data = '', $queue = null)
{
return $this->pushToDatabase($queue, $this->createPayload($job, $data));
}
public function pushRaw($payload, $queue = null, array $options = [])
{
return $this->pushToDatabase($queue, $payload);
}
public function later($delay, $job, $data = '', $queue = null)
{
return $this->pushToDatabase($queue, $this->createPayload($job, $data), $delay);
}
public function bulk($jobs, $data = '', $queue = null)
{
$queue = $this->getQueue($queue);
$availableAt = $this->availableAt();
return $this->db->name($this->table)->insertAll(collect((array) $jobs)->map(
function ($job) use ($queue, $data, $availableAt) {
return [
'queue' => $queue,
'attempts' => 0,
'reserve_time' => null,
'available_time' => $availableAt,
'create_time' => $this->currentTime(),
'payload' => $this->createPayload($job, $data),
];
}
)->all());
}
/**
* 重新发布任务
*
* @param string $queue
* @param StdClass $job
* @param int $delay
* @return mixed
*/
public function release($queue, $job, $delay)
{
return $this->pushToDatabase($queue, $job->payload, $delay, $job->attempts);
}
/**
* Push a raw payload to the database with a given delay.
*
* @param \DateTime|int $delay
* @param string|null $queue
* @param string $payload
* @param int $attempts
* @return mixed
*/
protected function pushToDatabase($queue, $payload, $delay = 0, $attempts = 0)
{
return $this->db->name($this->table)->insertGetId([
'queue' => $this->getQueue($queue),
'attempts' => $attempts,
'reserve_time' => null,
'available_time' => $this->availableAt($delay),
'create_time' => $this->currentTime(),
'payload' => $payload,
]);
}
public function pop($queue = null)
{
$queue = $this->getQueue($queue);
return $this->db->transaction(function () use ($queue) {
if ($job = $this->getNextAvailableJob($queue)) {
$job = $this->markJobAsReserved($job);
return new DatabaseJob($this->app, $this, $job, $this->connection, $queue);
}
});
}
/**
* 获取下个有效任务
*
* @param string|null $queue
* @return StdClass|null
*/
protected function getNextAvailableJob($queue)
{
$job = $this->db
->name($this->table)
->lock(true)
->where('queue', $this->getQueue($queue))
->where(function (Query $query) {
$query->where(function (Query $query) {
$query->whereNull('reserve_time')
->where('available_time', '<=', $this->currentTime());
});
//超时任务重试
$expiration = Carbon::now()->subSeconds($this->retryAfter)->getTimestamp();
$query->whereOr(function (Query $query) use ($expiration) {
$query->where('reserve_time', '<=', $expiration);
});
})
->order('id', 'asc')
->find();
return $job ? (object) $job : null;
}
/**
* 标记任务正在执行.
*
* @param stdClass $job
* @return stdClass
*/
protected function markJobAsReserved($job)
{
$this->db
->name($this->table)
->where('id', $job->id)
->update([
'reserve_time' => $job->reserve_time = $this->currentTime(),
'attempts' => ++$job->attempts,
]);
return $job;
}
/**
* 删除任务
*
* @param string $id
* @return void
*/
public function deleteReserved($id)
{
$this->db->transaction(function () use ($id) {
if ($this->db->name($this->table)->lock(true)->find($id)) {
$this->db->name($this->table)->where('id', $id)->delete();
}
});
}
protected function getQueue($queue)
{
return $queue ?: $this->default;
}
}
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue\connector;
use Closure;
use Exception;
use RedisException;
use think\helper\Str;
use think\queue\Connector;
use think\queue\InteractsWithTime;
use think\queue\job\Redis as RedisJob;
class Redis extends Connector
{
use InteractsWithTime;
/** @var \Redis */
protected $redis;
/**
* The name of the default queue.
*
* @var string
*/
protected $default;
/**
* The expiration time of a job.
*
* @var int|null
*/
protected $retryAfter = 60;
/**
* The maximum number of seconds to block for a job.
*
* @var int|null
*/
protected $blockFor = null;
public function __construct($redis, $default = 'default', $retryAfter = 60, $blockFor = null)
{
$this->redis = $redis;
$this->default = $default;
$this->retryAfter = $retryAfter;
$this->blockFor = $blockFor;
}
public static function __make($config)
{
if (!extension_loaded('redis')) {
throw new Exception('redis扩展未安装');
}
$redis = new class($config) {
protected $config;
protected $client;
public function __construct($config)
{
$this->config = $config;
$this->client = $this->createClient();
}
protected function createClient()
{
$config = $this->config;
$func = $config['persistent'] ? 'pconnect' : 'connect';
$client = new \Redis;
$client->$func($config['host'], $config['port'], $config['timeout']);
if ('' != $config['password']) {
$client->auth($config['password']);
}
if (0 != $config['select']) {
$client->select($config['select']);
}
return $client;
}
public function __call($name, $arguments)
{
try {
return call_user_func_array([$this->client, $name], $arguments);
} catch (RedisException $e) {
if (Str::contains($e->getMessage(), 'went away')) {
$this->client = $this->createClient();
}
throw $e;
}
}
};
return new self($redis, $config['queue'], $config['retry_after'] ?? 60, $config['block_for'] ?? null);
}
public function size($queue = null)
{
$queue = $this->getQueue($queue);
return $this->redis->lLen($queue) + $this->redis->zCard("{$queue}:delayed") + $this->redis->zCard("{$queue}:reserved");
}
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue);
}
public function pushRaw($payload, $queue = null, array $options = [])
{
if ($this->redis->rPush($this->getQueue($queue), $payload)) {
return json_decode($payload, true)['id'] ?? null;
}
}
public function later($delay, $job, $data = '', $queue = null)
{
return $this->laterRaw($delay, $this->createPayload($job, $data), $queue);
}
protected function laterRaw($delay, $payload, $queue = null)
{
if ($this->redis->zadd(
$this->getQueue($queue) . ':delayed',
$this->availableAt($delay),
$payload
)) {
return json_decode($payload, true)['id'] ?? null;
}
}
public function pop($queue = null)
{
$this->migrate($prefixed = $this->getQueue($queue));
if (empty($nextJob = $this->retrieveNextJob($prefixed))) {
return;
}
[$job, $reserved] = $nextJob;
if ($reserved) {
return new RedisJob($this->app, $this, $job, $reserved, $this->connection, $queue);
}
}
/**
* Migrate any delayed or expired jobs onto the primary queue.
*
* @param string $queue
* @return void
*/
protected function migrate($queue)
{
$this->migrateExpiredJobs($queue . ':delayed', $queue);
if (!is_null($this->retryAfter)) {
$this->migrateExpiredJobs($queue . ':reserved', $queue);
}
}
/**
* 移动延迟任务
*
* @param string $from
* @param string $to
* @param bool $attempt
*/
public function migrateExpiredJobs($from, $to, $attempt = true)
{
$this->redis->watch($from);
$jobs = $this->redis->zRangeByScore($from, '-inf', $this->currentTime());
if (!empty($jobs)) {
$this->transaction(function () use ($from, $to, $jobs, $attempt) {
$this->redis->zRemRangeByRank($from, 0, count($jobs) - 1);
for ($i = 0; $i < count($jobs); $i += 100) {
$values = array_slice($jobs, $i, 100);
$this->redis->rPush($to, ...$values);
}
});
}
$this->redis->unwatch();
}
/**
* Retrieve the next job from the queue.
*
* @param string $queue
* @return array
*/
protected function retrieveNextJob($queue)
{
if (!is_null($this->blockFor)) {
return $this->blockingPop($queue);
}
$job = $this->redis->lpop($queue);
$reserved = false;
if ($job) {
$reserved = json_decode($job, true);
$reserved['attempts']++;
$reserved = json_encode($reserved);
$this->redis->zAdd($queue . ':reserved', $this->availableAt($this->retryAfter), $reserved);
}
return [$job, $reserved];
}
/**
* Retrieve the next job by blocking-pop.
*
* @param string $queue
* @return array
*/
protected function blockingPop($queue)
{
$rawBody = $this->redis->blpop($queue, $this->blockFor);
if (!empty($rawBody)) {
$payload = json_decode($rawBody[1], true);
$payload['attempts']++;
$reserved = json_encode($payload);
$this->redis->zadd($queue . ':reserved', $this->availableAt($this->retryAfter), $reserved);
return [$rawBody[1], $reserved];
}
return [null, null];
}
/**
* 删除任务
*
* @param string $queue
* @param RedisJob $job
* @return void
*/
public function deleteReserved($queue, $job)
{
$this->redis->zRem($this->getQueue($queue) . ':reserved', $job->getReservedJob());
}
/**
* Delete a reserved job from the reserved queue and release it.
*
* @param string $queue
* @param RedisJob $job
* @param int $delay
* @return void
*/
public function deleteAndRelease($queue, $job, $delay)
{
$queue = $this->getQueue($queue);
$reserved = $job->getReservedJob();
$this->redis->zRem($queue . ':reserved', $reserved);
$this->redis->zAdd($queue . ':delayed', $this->availableAt($delay), $reserved);
}
/**
* redis事务
* @param Closure $closure
*/
protected function transaction(Closure $closure)
{
$this->redis->multi();
try {
call_user_func($closure);
if (!$this->redis->exec()) {
$this->redis->discard();
}
} catch (Exception $e) {
$this->redis->discard();
}
}
protected function createPayloadArray($job, $data = '')
{
return array_merge(parent::createPayloadArray($job, $data), [
'id' => $this->getRandomId(),
'attempts' => 0,
]);
}
/**
* 随机id
*
* @return string
*/
protected function getRandomId()
{
return Str::random(32);
}
/**
* 获取队列名
*
* @param string|null $queue
* @return string
*/
protected function getQueue($queue)
{
$queue = $queue ?: $this->default;
return "{queues:{$queue}}";
}
}
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue\connector;
use Exception;
use think\queue\Connector;
use think\queue\event\JobFailed;
use think\queue\event\JobProcessed;
use think\queue\event\JobProcessing;
use think\queue\job\Sync as SyncJob;
use Throwable;
class Sync extends Connector
{
public function size($queue = null)
{
return 0;
}
public function push($job, $data = '', $queue = null)
{
$queueJob = $this->resolveJob($this->createPayload($job, $data), $queue);
try {
$this->triggerEvent(new JobProcessing($this->connection, $job));
$queueJob->fire();
$this->triggerEvent(new JobProcessed($this->connection, $job));
} catch (Exception | Throwable $e) {
$this->triggerEvent(new JobFailed($this->connection, $job, $e));
throw $e;
}
return 0;
}
protected function triggerEvent($event)
{
$this->app->event->trigger($event);
}
public function pop($queue = null)
{
}
protected function resolveJob($payload, $queue)
{
return new SyncJob($this->app, $payload, $this->connection, $queue);
}
public function pushRaw($payload, $queue = null, array $options = [])
{
}
public function later($delay, $job, $data = '', $queue = null)
{
return $this->push($job, $data, $queue);
}
}
<?php
namespace think\queue\event;
use Exception;
use think\queue\Job;
class JobExceptionOccurred
{
/**
* The connection name.
*
* @var string
*/
public $connectionName;
/**
* The job instance.
*
* @var Job
*/
public $job;
/**
* The exception instance.
*
* @var Exception
*/
public $exception;
/**
* Create a new event instance.
*
* @param string $connectionName
* @param Job $job
* @param Exception $exception
* @return void
*/
public function __construct($connectionName, $job, $exception)
{
$this->job = $job;
$this->exception = $exception;
$this->connectionName = $connectionName;
}
}
<?php
namespace think\queue\event;
use think\queue\Job;
class JobFailed
{
/** @var string */
public $connection;
/** @var Job */
public $job;
/** @var \Exception */
public $exception;
public function __construct($connection, $job, $exception)
{
$this->connection = $connection;
$this->job = $job;
$this->exception = $exception;
}
}
<?php
namespace think\queue\event;
use think\queue\Job;
class JobProcessed
{
/** @var string */
public $connection;
/** @var Job */
public $job;
public function __construct($connection, $job)
{
$this->connection = $connection;
$this->job = $job;
}
}
<?php
namespace think\queue\event;
use think\queue\Job;
class JobProcessing
{
/** @var string */
public $connection;
/** @var Job */
public $job;
public function __construct($connection, $job)
{
$this->connection = $connection;
$this->job = $job;
}
}
<?php
namespace think\queue\event;
class WorkerStopping
{
/**
* The exit status.
*
* @var int
*/
public $status;
/**
* Create a new event instance.
*
* @param int $status
* @return void
*/
public function __construct($status = 0)
{
$this->status = $status;
}
}
<?php
namespace think\queue\exception;
use RuntimeException;
class MaxAttemptsExceededException extends RuntimeException
{
}
<?php
namespace think\queue\failed;
use Carbon\Carbon;
use think\Db;
use think\queue\FailedJob;
class Database extends FailedJob
{
/** @var Db */
protected $db;
/**
* The database table.
*
* @var string
*/
protected $table;
public function __construct(Db $db, $table)
{
$this->db = $db;
$this->table = $table;
}
public static function __make(Db $db, $config)
{
return new self($db, $config['table']);
}
/**
* Log a failed job into storage.
*
* @param string $connection
* @param string $queue
* @param string $payload
* @param \Exception $exception
* @return int|null
*/
public function log($connection, $queue, $payload, $exception)
{
$fail_time = Carbon::now()->toDateTimeString();
$exception = (string) $exception;
return $this->getTable()->insertGetId(compact(
'connection',
'queue',
'payload',
'exception',
'fail_time'
));
}
/**
* Get a list of all of the failed jobs.
*
* @return array
*/
public function all()
{
return collect($this->getTable()->order('id', 'desc')->select())->all();
}
/**
* Get a single failed job.
*
* @param mixed $id
* @return object|null
*/
public function find($id)
{
return $this->getTable()->find($id);
}
/**
* Delete a single failed job from storage.
*
* @param mixed $id
* @return bool
*/
public function forget($id)
{
return $this->getTable()->where('id', $id)->delete() > 0;
}
/**
* Flush all of the failed jobs from storage.
*
* @return void
*/
public function flush()
{
$this->getTable()->delete(true);
}
protected function getTable()
{
return $this->db->name($this->table);
}
}
<?php
namespace think\queue\failed;
use think\queue\FailedJob;
class None extends FailedJob
{
/**
* Log a failed job into storage.
*
* @param string $connection
* @param string $queue
* @param string $payload
* @param \Exception $exception
*/
public function log($connection, $queue, $payload, $exception)
{
}
/**
* Get a list of all of the failed jobs.
*
* @return array
*/
public function all()
{
return [];
}
/**
* Get a single failed job.
*
* @param mixed $id
*/
public function find($id)
{
}
/**
* Delete a single failed job from storage.
*
* @param mixed $id
* @return bool
*/
public function forget($id)
{
return true;
}
/**
* Flush all of the failed jobs from storage.
*
* @return void
*/
public function flush()
{
}
}
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue\job;
use think\App;
use think\queue\connector\Database as DatabaseQueue;
use think\queue\Job;
class Database extends Job
{
/**
* The database queue instance.
* @var DatabaseQueue
*/
protected $database;
/**
* The database job payload.
* @var Object
*/
protected $job;
public function __construct(App $app, DatabaseQueue $database, $job, $connection, $queue)
{
$this->app = $app;
$this->job = $job;
$this->queue = $queue;
$this->database = $database;
$this->connection = $connection;
}
/**
* 删除任务
* @return void
*/
public function delete()
{
parent::delete();
$this->database->deleteReserved($this->job->id);
}
/**
* 重新发布任务
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
parent::release($delay);
$this->delete();
$this->database->release($this->queue, $this->job, $delay);
}
/**
* 获取当前任务尝试次数
* @return int
*/
public function attempts()
{
return (int) $this->job->attempts;
}
/**
* Get the raw body string for the job.
* @return string
*/
public function getRawBody()
{
return $this->job->payload;
}
/**
* Get the job identifier.
*
* @return string
*/
public function getJobId()
{
return $this->job->id;
}
}
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue\job;
use think\App;
use think\queue\connector\Redis as RedisQueue;
use think\queue\Job;
class Redis extends Job
{
/**
* The redis queue instance.
* @var RedisQueue
*/
protected $redis;
/**
* The database job payload.
* @var Object
*/
protected $job;
/**
* The Redis job payload inside the reserved queue.
*
* @var string
*/
protected $reserved;
public function __construct(App $app, RedisQueue $redis, $job, $reserved, $connection, $queue)
{
$this->app = $app;
$this->job = $job;
$this->queue = $queue;
$this->connection = $connection;
$this->redis = $redis;
$this->reserved = $reserved;
}
/**
* Get the number of times the job has been attempted.
* @return int
*/
public function attempts()
{
return $this->payload('attempts') + 1;
}
/**
* Get the raw body string for the job.
* @return string
*/
public function getRawBody()
{
return $this->job;
}
/**
* 删除任务
*
* @return void
*/
public function delete()
{
parent::delete();
$this->redis->deleteReserved($this->queue, $this);
}
/**
* 重新发布任务
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
parent::release($delay);
$this->redis->deleteAndRelease($this->queue, $this, $delay);
}
/**
* Get the job identifier.
*
* @return string
*/
public function getJobId()
{
return $this->payload('id');
}
/**
* Get the underlying reserved Redis job.
*
* @return string
*/
public function getReservedJob()
{
return $this->reserved;
}
}
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue\job;
use think\App;
use think\queue\Job;
class Sync extends Job
{
/**
* The queue message data.
*
* @var string
*/
protected $job;
public function __construct(App $app, $job, $connection, $queue)
{
$this->app = $app;
$this->connection = $connection;
$this->queue = $queue;
$this->job = $job;
}
/**
* Get the number of times the job has been attempted.
* @return int
*/
public function attempts()
{
return 1;
}
/**
* Get the raw body string for the job.
* @return string
*/
public function getRawBody()
{
return $this->job;
}
/**
* Get the job identifier.
*
* @return string
*/
public function getJobId()
{
return '';
}
public function getQueue()
{
return 'sync';
}
}
<?php
namespace think\test\queue;
use Carbon\Carbon;
use Mockery as m;
use Mockery\MockInterface;
use ReflectionClass;
use stdClass;
use think\Db;
use think\queue\Connector;
use think\queue\connector\Database;
class DatabaseConnectorTest extends TestCase
{
/** @var Database|MockInterface */
protected $connector;
/** @var Db|MockInterface */
protected $db;
protected function setUp()
{
parent::setUp();
$this->db = m::mock(Db::class);
$this->connector = new Database($this->db, 'table', 'default');
}
public function testPushProperlyPushesJobOntoDatabase()
{
$this->db->shouldReceive('name')->with('table')->andReturn($query = m::mock(stdClass::class));
$query->shouldReceive('insertGetId')->once()->andReturnUsing(function ($array) {
$this->assertEquals('default', $array['queue']);
$this->assertEquals(json_encode(['job' => 'foo', 'maxTries' => null, 'timeout' => null, 'data' => ['data']]), $array['payload']);
$this->assertEquals(0, $array['attempts']);
$this->assertNull($array['reserved_at']);
$this->assertInternalType('int', $array['available_at']);
});
$this->connector->push('foo', ['data']);
}
public function testDelayedPushProperlyPushesJobOntoDatabase()
{
$this->db->shouldReceive('name')->with('table')->andReturn($query = m::mock(stdClass::class));
$query->shouldReceive('insertGetId')->once()->andReturnUsing(function ($array) {
$this->assertEquals('default', $array['queue']);
$this->assertEquals(json_encode(['job' => 'foo', 'maxTries' => null, 'timeout' => null, 'data' => ['data']]), $array['payload']);
$this->assertEquals(0, $array['attempts']);
$this->assertNull($array['reserved_at']);
$this->assertInternalType('int', $array['available_at']);
});
$this->connector->later(10, 'foo', ['data']);
}
public function testFailureToCreatePayloadFromObject()
{
$this->expectException('InvalidArgumentException');
$job = new stdClass;
$job->invalid = "\xc3\x28";
$queue = $this->getMockForAbstractClass(Connector::class);
$class = new ReflectionClass(Connector::class);
$createPayload = $class->getMethod('createPayload');
$createPayload->setAccessible(true);
$createPayload->invokeArgs($queue, [
$job,
'queue-name',
]);
}
public function testFailureToCreatePayloadFromArray()
{
$this->expectException('InvalidArgumentException');
$queue = $this->getMockForAbstractClass(Connector::class);
$class = new ReflectionClass(Connector::class);
$createPayload = $class->getMethod('createPayload');
$createPayload->setAccessible(true);
$createPayload->invokeArgs($queue, [
["\xc3\x28"],
'queue-name',
]);
}
public function testBulkBatchPushesOntoDatabase()
{
$this->db->shouldReceive('name')->with('table')->andReturn($query = m::mock(stdClass::class));
Carbon::setTestNow(
$now = Carbon::now()->addSeconds()
);
$query->shouldReceive('insertAll')->once()->andReturnUsing(function ($records) use ($now) {
$this->assertEquals([
[
'queue' => 'queue',
'payload' => json_encode(['job' => 'foo', 'maxTries' => null, 'timeout' => null, 'data' => ['data']]),
'attempts' => 0,
'reserved_at' => null,
'available_at' => $now->getTimestamp(),
'created_at' => $now->getTimestamp(),
], [
'queue' => 'queue',
'payload' => json_encode(['job' => 'bar', 'maxTries' => null, 'timeout' => null, 'data' => ['data']]),
'attempts' => 0,
'reserved_at' => null,
'available_at' => $now->getTimestamp(),
'created_at' => $now->getTimestamp(),
],
], $records);
});
$this->connector->bulk(['foo', 'bar'], ['data'], 'queue');
}
}
<?php
namespace think\test\queue;
use Mockery as m;
use Mockery\MockInterface;
use Symfony\Component\Process\Process;
use think\queue\Listener;
class ListenerTest extends TestCase
{
/** @var Process|MockInterface */
protected $process;
/** @var Listener|MockInterface */
protected $listener;
public function testRunProcessCallsProcess()
{
/** @var Process|MockInterface $process */
$process = m::mock(Process::class)->makePartial();
$process->shouldReceive('run')->once();
/** @var Listener|MockInterface $listener */
$listener = m::mock(Listener::class)->makePartial();
$listener->shouldReceive('memoryExceeded')->once()->with(1)->andReturn(false);
$listener->runProcess($process, 1);
}
public function testListenerStopsWhenMemoryIsExceeded()
{
/** @var Process|MockInterface $process */
$process = m::mock(Process::class)->makePartial();
$process->shouldReceive('run')->once();
/** @var Listener|MockInterface $listener */
$listener = m::mock(Listener::class)->makePartial();
$listener->shouldReceive('memoryExceeded')->once()->with(1)->andReturn(true);
$listener->shouldReceive('stop')->once();
$listener->runProcess($process, 1);
}
public function testMakeProcessCorrectlyFormatsCommandLine()
{
$listener = new Listener(__DIR__);
$process = $listener->makeProcess('connection', 'queue', 1, 3, 0, 2, 3);
$escape = '\\' === DIRECTORY_SEPARATOR ? '"' : '\'';
$this->assertInstanceOf(Process::class, $process);
$this->assertEquals(__DIR__, $process->getWorkingDirectory());
$this->assertEquals(3, $process->getTimeout());
$this->assertEquals($escape . PHP_BINARY . $escape . " {$escape}think{$escape} {$escape}queue:work{$escape} {$escape}connection{$escape} {$escape}--once{$escape} {$escape}--queue=queue{$escape} {$escape}--delay=1{$escape} {$escape}--memory=2{$escape} {$escape}--sleep=3{$escape} {$escape}--tries=0{$escape}", $process->getCommandLine());
}
public function testMakeProcessCorrectlyFormatsCommandLineWithAnEnvironmentSpecified()
{
$listener = new Listener(__DIR__);
$process = $listener->makeProcess('connection', 'queue', 1, 3, 0, 2, 3);
$escape = '\\' === DIRECTORY_SEPARATOR ? '"' : '\'';
$this->assertInstanceOf(Process::class, $process);
$this->assertEquals(__DIR__, $process->getWorkingDirectory());
$this->assertEquals(3, $process->getTimeout());
$this->assertEquals($escape . PHP_BINARY . $escape . " {$escape}think{$escape} {$escape}queue:work{$escape} {$escape}connection{$escape} {$escape}--once{$escape} {$escape}--queue=queue{$escape} {$escape}--delay=1{$escape} {$escape}--memory=2{$escape} {$escape}--sleep=3{$escape} {$escape}--tries=0{$escape}", $process->getCommandLine());
}
public function testMakeProcessCorrectlyFormatsCommandLineWhenTheConnectionIsNotSpecified()
{
$listener = new Listener(__DIR__);
$process = $listener->makeProcess(null, 'queue', 1, 3, 0, 2, 3);
$escape = '\\' === DIRECTORY_SEPARATOR ? '"' : '\'';
$this->assertInstanceOf(Process::class, $process);
$this->assertEquals(__DIR__, $process->getWorkingDirectory());
$this->assertEquals(3, $process->getTimeout());
$this->assertEquals($escape . PHP_BINARY . $escape . " {$escape}think{$escape} {$escape}queue:work{$escape} {$escape}--once{$escape} {$escape}--queue=queue{$escape} {$escape}--delay=1{$escape} {$escape}--memory=2{$escape} {$escape}--sleep=3{$escape} {$escape}--tries=0{$escape}", $process->getCommandLine());
}
}
<?php
namespace think\test\queue;
use InvalidArgumentException;
use Mockery as m;
use think\Config;
use think\Queue;
use think\queue\connector\Sync;
class QueueTest extends TestCase
{
/** @var Queue */
protected $queue;
protected function setUp()
{
parent::setUp();
$this->queue = new Queue($this->app);
}
public function testDefaultConnectionCanBeResolved()
{
$sync = new Sync();
$this->app->shouldReceive('invokeClass')->once()->with('\think\queue\connector\Sync', [['driver' => 'sync']])->andReturn($sync);
$config = m::mock(Config::class);
$config->shouldReceive('get')->twice()->with('queue.connectors.sync', ['driver' => 'sync'])->andReturn(['driver' => 'sync']);
$config->shouldReceive('get')->once()->with('queue.default', 'sync')->andReturn('sync');
$this->app->shouldReceive('get')->times(3)->with('config')->andReturn($config);
$this->assertSame($sync, $this->queue->driver('sync'));
$this->assertSame($sync, $this->queue->driver());
}
public function testNotSupportDriver()
{
$config = m::mock(Config::class);
$config->shouldReceive('get')->once()->with('queue.connectors.hello', ['driver' => 'sync'])->andReturn(['driver' => 'hello']);
$this->app->shouldReceive('get')->once()->with('config')->andReturn($config);
$this->expectException(InvalidArgumentException::class);
$this->queue->driver('hello');
}
}
<?php
namespace think\test\queue;
use Mockery as m;
use Mockery\MockInterface;
use think\App;
abstract class TestCase extends \PHPUnit\Framework\TestCase
{
/** @var App|MockInterface */
protected $app;
public function tearDown()
{
m::close();
}
protected function setUp()
{
$this->app = m::mock(App::class)->makePartial();
}
}
This diff is collapsed.
<?php
include __DIR__.'/../vendor/autoload.php';
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment