This commit is contained in:
wangjinlei
2020-11-12 17:15:37 +08:00
parent 824380664c
commit 1abf99316f
893 changed files with 278997 additions and 0 deletions

View File

@@ -0,0 +1,171 @@
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue\connector;
use think\Db;
use think\queue\Connector;
use think\queue\job\Database as DatabaseJob;
class Database extends Connector
{
protected $db;
protected $options = [
'expire' => 60,
'default' => 'default',
'table' => 'jobs',
'dsn' => []
];
public function __construct($options)
{
if (!empty($options)) {
$this->options = array_merge($this->options, $options);
}
$this->db = Db::connect($this->options['dsn']);
}
public function push($job, $data = '', $queue = null)
{
return $this->pushToDatabase(0, $queue, $this->createPayload($job, $data));
}
public function later($delay, $job, $data = '', $queue = null)
{
return $this->pushToDatabase($delay, $queue, $this->createPayload($job, $data));
}
public function pop($queue = null)
{
$queue = $this->getQueue($queue);
if (!is_null($this->options['expire'])) {
$this->releaseJobsThatHaveBeenReservedTooLong($queue);
}
if ($job = $this->getNextAvailableJob($queue)) {
$this->markJobAsReserved($job->id);
$this->db->commit();
return new DatabaseJob($this, $job, $queue);
}
$this->db->commit();
}
/**
* 重新发布任务
* @param string $queue
* @param \StdClass $job
* @param int $delay
* @return mixed
*/
public function release($queue, $job, $delay)
{
return $this->pushToDatabase($delay, $queue, $job->payload, $job->attempts);
}
/**
* Push a raw payload to the database with a given delay.
*
* @param \DateTime|int $delay
* @param string|null $queue
* @param string $payload
* @param int $attempts
* @return mixed
*/
protected function pushToDatabase($delay, $queue, $payload, $attempts = 0)
{
return $this->db->name($this->options['table'])->insert([
'queue' => $this->getQueue($queue),
'payload' => $payload,
'attempts' => $attempts,
'reserved' => 0,
'reserved_at' => null,
'available_at' => time() + $delay,
'created_at' => time()
]);
}
/**
* 获取下个有效任务
*
* @param string|null $queue
* @return \StdClass|null
*/
protected function getNextAvailableJob($queue)
{
$this->db->startTrans();
$job = $this->db->name($this->options['table'])
->lock(true)
->where('queue', $this->getQueue($queue))
->where('reserved', 0)
->where('available_at', '<=', time())
->order('id', 'asc')
->find();
return $job ? (object) $job : null;
}
/**
* 标记任务正在执行.
*
* @param string $id
* @return void
*/
protected function markJobAsReserved($id)
{
$this->db->name($this->options['table'])->where('id', $id)->update([
'reserved' => 1,
'reserved_at' => time()
]);
}
/**
* 重新发布超时的任务
*
* @param string $queue
* @return void
*/
protected function releaseJobsThatHaveBeenReservedTooLong($queue)
{
$expired = time() - $this->options['expire'];
$this->db->name($this->options['table'])
->where('queue', $this->getQueue($queue))
->where('reserved', 1)
->where('reserved_at', '<=', $expired)
->update([
'reserved' => 0,
'reserved_at' => null,
'attempts' => ['inc', 1]
]);
}
/**
* 删除任务
* @param string $id
* @return void
*/
public function deleteReserved($id)
{
$this->db->name($this->options['table'])->delete($id);
}
protected function getQueue($queue)
{
return $queue ?: $this->options['default'];
}
}

View File

@@ -0,0 +1,236 @@
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue\connector;
use Exception;
use think\helper\Str;
use think\queue\Connector;
use think\queue\job\Redis as RedisJob;
class Redis extends Connector
{
/** @var \Redis */
protected $redis;
protected $options = [
'expire' => 60,
'default' => 'default',
'host' => '127.0.0.1',
'port' => 6379,
'password' => '',
'select' => 0,
'timeout' => 0,
'persistent' => false
];
public function __construct($options)
{
if (!extension_loaded('redis')) {
throw new Exception('redis扩展未安装');
}
if (!empty($options)) {
$this->options = array_merge($this->options, $options);
}
$func = $this->options['persistent'] ? 'pconnect' : 'connect';
$this->redis = new \Redis;
$this->redis->$func($this->options['host'], $this->options['port'], $this->options['timeout']);
if ('' != $this->options['password']) {
$this->redis->auth($this->options['password']);
}
if (0 != $this->options['select']) {
$this->redis->select($this->options['select']);
}
}
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue);
}
public function later($delay, $job, $data = '', $queue = null)
{
$payload = $this->createPayload($job, $data);
$this->redis->zAdd($this->getQueue($queue) . ':delayed', time() + $delay, $payload);
}
public function pop($queue = null)
{
$original = $queue ?: $this->options['default'];
$queue = $this->getQueue($queue);
$this->migrateExpiredJobs($queue . ':delayed', $queue, false);
if (!is_null($this->options['expire'])) {
$this->migrateExpiredJobs($queue . ':reserved', $queue);
}
$job = $this->redis->lPop($queue);
if ($job !== false) {
$this->redis->zAdd($queue . ':reserved', time() + $this->options['expire'], $job);
return new RedisJob($this, $job, $original);
}
}
/**
* 重新发布任务
*
* @param string $queue
* @param string $payload
* @param int $delay
* @param int $attempts
* @return void
*/
public function release($queue, $payload, $delay, $attempts)
{
$payload = $this->setMeta($payload, 'attempts', $attempts);
$this->redis->zAdd($this->getQueue($queue) . ':delayed', time() + $delay, $payload);
}
public function pushRaw($payload, $queue = null)
{
$this->redis->rPush($this->getQueue($queue), $payload);
return json_decode($payload, true)['id'];
}
protected function createPayload($job, $data = '', $queue = null)
{
$payload = $this->setMeta(
parent::createPayload($job, $data), 'id', $this->getRandomId()
);
return $this->setMeta($payload, 'attempts', 1);
}
/**
* 删除任务
*
* @param string $queue
* @param string $job
* @return void
*/
public function deleteReserved($queue, $job)
{
$this->redis->zRem($this->getQueue($queue) . ':reserved', $job);
}
/**
* 移动延迟任务
*
* @param string $from
* @param string $to
* @param bool $attempt
*/
public function migrateExpiredJobs($from, $to, $attempt = true)
{
$this->redis->watch($from);
$jobs = $this->getExpiredJobs(
$from, $time = time()
);
if (count($jobs) > 0) {
$this->transaction(function () use ($from, $to, $time, $jobs, $attempt) {
$this->removeExpiredJobs($from, $time);
$this->pushExpiredJobsOntoNewQueue($to, $jobs, $attempt);
});
}
$this->redis->unwatch();
}
/**
* redis事务
* @param \Closure $closure
*/
protected function transaction(\Closure $closure)
{
$this->redis->multi();
try {
call_user_func($closure);
if (!$this->redis->exec()) {
$this->redis->discard();
}
} catch (Exception $e) {
$this->redis->discard();
}
}
/**
* 获取所有到期任务
*
* @param string $from
* @param int $time
* @return array
*/
protected function getExpiredJobs($from, $time)
{
return $this->redis->zRangeByScore($from, '-inf', $time);
}
/**
* 删除过期任务
*
* @param string $from
* @param int $time
* @return void
*/
protected function removeExpiredJobs($from, $time)
{
$this->redis->zRemRangeByScore($from, '-inf', $time);
}
/**
* 重新发布到期任务
*
* @param string $to
* @param array $jobs
* @param boolean $attempt
*/
protected function pushExpiredJobsOntoNewQueue($to, $jobs, $attempt = true)
{
if ($attempt) {
foreach ($jobs as &$job) {
$attempts = json_decode($job, true)['attempts'];
$job = $this->setMeta($job, 'attempts', $attempts + 1);
}
}
call_user_func_array([$this->redis, 'rPush'], array_merge([$to], $jobs));
}
/**
* 随机id
*
* @return string
*/
protected function getRandomId()
{
return Str::random(32);
}
/**
* 获取队列名
*
* @param string|null $queue
* @return string
*/
protected function getQueue($queue)
{
return 'queues:' . ($queue ?: $this->options['default']);
}
}

View File

@@ -0,0 +1,57 @@
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue\connector;
use Exception;
use think\queue\Connector;
use think\queue\job\Sync as SyncJob;
use Throwable;
class Sync extends Connector
{
public function push($job, $data = '', $queue = null)
{
$queueJob = $this->resolveJob($this->createPayload($job, $data, $queue));
try {
set_time_limit(0);
$queueJob->fire();
} catch (Exception $e) {
$queueJob->failed();
throw $e;
} catch (Throwable $e) {
$queueJob->failed();
throw $e;
}
return 0;
}
public function later($delay, $job, $data = '', $queue = null)
{
return $this->push($job, $data, $queue);
}
public function pop($queue = null)
{
}
protected function resolveJob($payload)
{
return new SyncJob($payload);
}
}

View File

@@ -0,0 +1,225 @@
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue\connector;
use think\exception\HttpException;
use think\queue\Connector;
use think\Request;
use think\queue\job\Topthink as TopthinkJob;
use think\Response;
class Topthink extends Connector
{
protected $options = [
'token' => '',
'project_id' => '',
'protocol' => 'https',
'host' => 'qns.topthink.com',
'port' => 443,
'api_version' => 1,
'max_retries' => 3,
'default' => 'default'
];
/** @var Request */
protected $request;
protected $url;
protected $curl = null;
protected $last_status;
protected $headers = [];
public function __construct($options)
{
if (!empty($options)) {
$this->options = array_merge($this->options, $options);
}
$this->url = "{$this->options['protocol']}://{$this->options['host']}:{$this->options['port']}/v{$this->options['api_version']}/";
$this->headers['Authorization'] = "Bearer {$this->options['token']}";
$this->request = Request::instance();
}
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw(0, $queue, $this->createPayload($job, $data));
}
public function later($delay, $job, $data = '', $queue = null)
{
return $this->pushRaw($delay, $queue, $this->createPayload($job, $data));
}
public function release($queue, $job, $delay)
{
return $this->pushRaw($delay, $queue, $job->payload, $job->attempts);
}
public function marshal()
{
$job = new TopthinkJob($this, $this->marshalPushedJob(), $this->request->header('topthink-message-queue'));
if ($this->request->header('topthink-message-status') == 'success') {
$job->fire();
} else {
$job->failed();
}
return new Response('OK');
}
public function pushRaw($delay, $queue, $payload, $attempts = 0)
{
$queue_name = $this->getQueue($queue);
$queue = rawurlencode($queue_name);
$url = "project/{$this->options['project_id']}/queue/{$queue}/message";
$message = [
'payload' => $payload,
'attempts' => $attempts,
'delay' => $delay
];
return $this->apiCall('POST', $url, $message)->id;
}
public function deleteMessage($queue, $id)
{
$queue = rawurlencode($queue);
$url = "project/{$this->options['project_id']}/queue/{$queue}/message/{$id}";
return $this->apiCall('DELETE', $url);
}
protected function apiCall($type, $url, $params = [])
{
$url = "{$this->url}$url";
if ($this->curl == null) {
$this->curl = curl_init();
}
switch ($type = strtoupper($type)) {
case 'DELETE':
curl_setopt($this->curl, CURLOPT_URL, $url);
curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type);
curl_setopt($this->curl, CURLOPT_POSTFIELDS, json_encode($params));
break;
case 'PUT':
curl_setopt($this->curl, CURLOPT_URL, $url);
curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type);
curl_setopt($this->curl, CURLOPT_POSTFIELDS, json_encode($params));
break;
case 'POST':
curl_setopt($this->curl, CURLOPT_URL, $url);
curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type);
curl_setopt($this->curl, CURLOPT_POST, true);
curl_setopt($this->curl, CURLOPT_POSTFIELDS, $params);
break;
case 'GET':
curl_setopt($this->curl, CURLOPT_POSTFIELDS, null);
curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type);
curl_setopt($this->curl, CURLOPT_HTTPGET, true);
$url .= '?' . http_build_query($params);
curl_setopt($this->curl, CURLOPT_URL, $url);
break;
}
curl_setopt($this->curl, CURLOPT_SSL_VERIFYPEER, false);
curl_setopt($this->curl, CURLOPT_RETURNTRANSFER, true);
$headers = [];
foreach ($this->headers as $k => $v) {
if ($k == 'Connection') {
$v = 'Close';
}
$headers[] = "$k: $v";
}
curl_setopt($this->curl, CURLOPT_HTTPHEADER, $headers);
curl_setopt($this->curl, CURLOPT_CONNECTTIMEOUT, 10);
return $this->callWithRetries();
}
protected function callWithRetries()
{
for ($retry = 0; $retry < $this->options['max_retries']; $retry++) {
$out = curl_exec($this->curl);
if ($out === false) {
$this->reportHttpError(0, curl_error($this->curl));
}
$this->last_status = curl_getinfo($this->curl, CURLINFO_HTTP_CODE);
if ($this->last_status >= 200 && $this->last_status < 300) {
return self::jsonDecode($out);
} elseif ($this->last_status >= 500) {
self::waitRandomInterval($retry);
} else {
$this->reportHttpError($this->last_status, $out);
}
}
$this->reportHttpError($this->last_status, "Service unavailable");
return;
}
protected static function jsonDecode($response)
{
$data = json_decode($response);
$json_error = json_last_error();
if ($json_error != JSON_ERROR_NONE) {
throw new \RuntimeException($json_error);
}
return $data;
}
protected static function waitRandomInterval($retry)
{
$max_delay = pow(4, $retry) * 100 * 1000;
usleep(rand(0, $max_delay));
}
protected function reportHttpError($status, $text)
{
throw new HttpException($status, "http error: {$status} | {$text}");
}
/**
* Marshal out the pushed job and payload.
*
* @return object
*/
protected function marshalPushedJob()
{
return (object) [
'id' => $this->request->header('topthink-message-id'),
'payload' => $this->request->getContent(),
'attempts' => $this->request->header('topthink-message-attempts')
];
}
public function __destruct()
{
if ($this->curl != null) {
curl_close($this->curl);
$this->curl = null;
}
}
public function pop($queue = null)
{
throw new \RuntimeException('pop queues not support for this type');
}
}