From 907132709e389a6a60b79329f7b6b913c5e68137 Mon Sep 17 00:00:00 2001 From: "DESKTOP-NH96BIF\\Administrator" <18722597281> Date: Wed, 30 Mar 2022 16:37:30 +0800 Subject: [PATCH] =?UTF-8?q?=E5=87=BA=E5=88=8A=E6=97=B6=E7=BB=99=E4=BD=9C?= =?UTF-8?q?=E8=80=85=E5=8F=91=E9=80=81=E9=82=AE=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/master/controller/Journal.php | 56 ++++- .../library/think/process/pipes/Windows.php | 3 +- vendor/topthink/think-queue/.gitignore | 4 + vendor/topthink/think-queue/LICENSE | 201 +++++++++++++++ vendor/topthink/think-queue/README.md | 132 ++++++++++ vendor/topthink/think-queue/composer.json | 29 +++ vendor/topthink/think-queue/src/Queue.php | 49 ++++ vendor/topthink/think-queue/src/common.php | 36 +++ vendor/topthink/think-queue/src/config.php | 14 ++ .../src/queue/CallQueuedHandler.php | 36 +++ .../think-queue/src/queue/Connector.php | 69 +++++ vendor/topthink/think-queue/src/queue/Job.php | 213 ++++++++++++++++ .../think-queue/src/queue/Listener.php | 164 ++++++++++++ .../think-queue/src/queue/Queueable.php | 46 ++++ .../think-queue/src/queue/ShouldQueue.php | 17 ++ .../topthink/think-queue/src/queue/Worker.php | 119 +++++++++ .../think-queue/src/queue/command/Listen.php | 60 +++++ .../think-queue/src/queue/command/Restart.php | 31 +++ .../src/queue/command/Subscribe.php | 46 ++++ .../think-queue/src/queue/command/Work.php | 210 ++++++++++++++++ .../src/queue/connector/Database.php | 171 +++++++++++++ .../think-queue/src/queue/connector/Redis.php | 236 ++++++++++++++++++ .../think-queue/src/queue/connector/Sync.php | 57 +++++ .../src/queue/connector/Topthink.php | 225 +++++++++++++++++ .../think-queue/src/queue/job/Database.php | 88 +++++++ .../think-queue/src/queue/job/Redis.php | 92 +++++++ .../think-queue/src/queue/job/Sync.php | 56 +++++ .../think-queue/src/queue/job/Topthink.php | 85 +++++++ 28 files changed, 2543 insertions(+), 2 deletions(-) create mode 100644 vendor/topthink/think-queue/.gitignore create mode 100644 vendor/topthink/think-queue/LICENSE create mode 100644 vendor/topthink/think-queue/README.md create mode 100644 vendor/topthink/think-queue/composer.json create mode 100644 vendor/topthink/think-queue/src/Queue.php create mode 100644 vendor/topthink/think-queue/src/common.php create mode 100644 vendor/topthink/think-queue/src/config.php create mode 100644 vendor/topthink/think-queue/src/queue/CallQueuedHandler.php create mode 100644 vendor/topthink/think-queue/src/queue/Connector.php create mode 100644 vendor/topthink/think-queue/src/queue/Job.php create mode 100644 vendor/topthink/think-queue/src/queue/Listener.php create mode 100644 vendor/topthink/think-queue/src/queue/Queueable.php create mode 100644 vendor/topthink/think-queue/src/queue/ShouldQueue.php create mode 100644 vendor/topthink/think-queue/src/queue/Worker.php create mode 100644 vendor/topthink/think-queue/src/queue/command/Listen.php create mode 100644 vendor/topthink/think-queue/src/queue/command/Restart.php create mode 100644 vendor/topthink/think-queue/src/queue/command/Subscribe.php create mode 100644 vendor/topthink/think-queue/src/queue/command/Work.php create mode 100644 vendor/topthink/think-queue/src/queue/connector/Database.php create mode 100644 vendor/topthink/think-queue/src/queue/connector/Redis.php create mode 100644 vendor/topthink/think-queue/src/queue/connector/Sync.php create mode 100644 vendor/topthink/think-queue/src/queue/connector/Topthink.php create mode 100644 vendor/topthink/think-queue/src/queue/job/Database.php create mode 100644 vendor/topthink/think-queue/src/queue/job/Redis.php create mode 100644 vendor/topthink/think-queue/src/queue/job/Sync.php create mode 100644 vendor/topthink/think-queue/src/queue/job/Topthink.php diff --git a/application/master/controller/Journal.php b/application/master/controller/Journal.php index 364bc18..27402d0 100644 --- a/application/master/controller/Journal.php +++ b/application/master/controller/Journal.php @@ -1115,6 +1115,8 @@ class Journal extends Controller { //是否提醒订阅者 if($old['is_publish']==0&&$data['is_publish']==1){ $this->msg_subscript_journal($data['journal_stage_id'],$old['journal_id']); + // 提醒作者 发送邮件 + $this->sendAuthor($data['journal_stage_id'],$old['journal_id']); } if($res){ return json(['code'=>0,'msg'=>'success']); @@ -1186,7 +1188,59 @@ class Journal extends Controller { } return $frag; } - + // 出刊给作者发送邮件 + private function sendAuthor($journal_stage_id,$journal_id){ + $stage_info = $this->journal_stage_obj->where('journal_stage_id', $journal_stage_id)->find(); + $journal_info = $this->journal_obj->where('journal_id',$journal_id)->find(); + // 根据journal_stage_id 去article 中 找article_id + $articles = $this->article_obj->where('journal_stage_id',$journal_stage_id)->where('state',0)->select(); + foreach ($articles as $v){ + // 根据article_id 去article_author 中author + $datas = $this->article_author_obj->where(['article_id'=>$v['article_id'],'email'=>array('neq','')])->select(); + foreach ($datas as $data){ + // 邮件内容 + $content = '
+
+
+ +
+

'; + $content .= 'Dear Author,

Congratulations!

'; + $content .= 'Thank you for choosing to publish with '.$journal_info['title'].'.
'; + $content .= 'Your article has been published on Vol. '.$stage_info['stage_vol'].' Issue '.$stage_info['stage_no'].'.

'; + + $content .= $v['type'].'
'; + $content .= ''.$v['title'].'
'; + $content .= $this->getAuthor($v).'
'; + $content .= $journal_info['title'].' '.$stage_info['stage_year'].' '.$stage_info['stage_vol']."(".$stage_info['stage_no']."). DOI:".$v['doi'].'
'; + $content .= 'Download pdf


'; + + $content .= 'If you want to be sure your research gets the attention it deserves, the following channels may be useful.

'; + $content .= 'https://www.researchgate.net
https://www.academia.edu
https://twitter.com
https://www.linkedin.com
https://www.facebook.com

We look forward to receiving manuscripts from you in future.'; + $content .= '
'; + + $maidata=[ + 'email'=>$data['email'], + 'title'=>'Congratulations on your published article', + 'content'=>$content, + 'tmail'=>'publicrelations@tmrjournals.com', + 'tpassword'=>'pRWU999999' + ]; + Queue::push('app\api\job\mail@fire', $maidata, "mail"); + + } + + } + + + } /** * @title 增加期刊消息 * @description 增加期刊消息 diff --git a/thinkphp/library/think/process/pipes/Windows.php b/thinkphp/library/think/process/pipes/Windows.php index bba7e9b..0c9eddf 100644 --- a/thinkphp/library/think/process/pipes/Windows.php +++ b/thinkphp/library/think/process/pipes/Windows.php @@ -180,7 +180,8 @@ class Windows extends Pipes $this->unblock(); - $r = null !== $this->input ? ['input' => $this->input] : null; + // $r = null !== $this->input ? ['input' => $this->input] : null; + $r = null !== $this->input ? ['input' => $this->input] : []; $w = isset($this->pipes[0]) ? [$this->pipes[0]] : null; $e = null; diff --git a/vendor/topthink/think-queue/.gitignore b/vendor/topthink/think-queue/.gitignore new file mode 100644 index 0000000..4aec782 --- /dev/null +++ b/vendor/topthink/think-queue/.gitignore @@ -0,0 +1,4 @@ +/vendor/ +/.idea/ +/composer.lock +/thinkphp/ diff --git a/vendor/topthink/think-queue/LICENSE b/vendor/topthink/think-queue/LICENSE new file mode 100644 index 0000000..8dada3e --- /dev/null +++ b/vendor/topthink/think-queue/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/topthink/think-queue/README.md b/vendor/topthink/think-queue/README.md new file mode 100644 index 0000000..df032c7 --- /dev/null +++ b/vendor/topthink/think-queue/README.md @@ -0,0 +1,132 @@ +# think-queue + +## 安装 +> composer require topthink/think-queue + +## 配置 +> 配置文件位于 `application/extra/queue.php` +### 公共配置 + +``` +[ + 'connector'=>'sync' //驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动,topthink:Topthink驱动 + //或其他自定义的完整的类名 +] +``` + +### 驱动配置 +> 各个驱动的具体可用配置项在`think\queue\connector`目录下各个驱动类里的`options`属性中,写在上面的`queue`配置里即可覆盖 + + +## 使用 Database +> 创建如下数据表 + +``` +CREATE TABLE `prefix_jobs` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `queue` varchar(255) NOT NULL, + `payload` longtext NOT NULL, + `attempts` tinyint(3) unsigned NOT NULL, + `reserved` tinyint(3) unsigned NOT NULL, + `reserved_at` int(10) unsigned DEFAULT NULL, + `available_at` int(10) unsigned NOT NULL, + `created_at` int(10) unsigned NOT NULL, + PRIMARY KEY (`id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; +``` + +## 创建任务类 +> 单模块项目推荐使用 `app\job` 作为任务类的命名空间 +> 多模块项目可用使用 `app\module\job` 作为任务类的命名空间 +> 也可以放在任意可以自动加载到的地方 + +任务类不需继承任何类,如果这个类只有一个任务,那么就只需要提供一个`fire`方法就可以了,如果有多个小任务,就写多个方法,下面发布任务的时候会有区别 +每个方法会传入两个参数 `think\queue\Job $job`(当前的任务对象) 和 `$data`(发布任务时自定义的数据) + +还有个可选的任务失败执行的方法 `failed` 传入的参数为`$data`(发布任务时自定义的数据) + +### 下面写两个例子 + +``` +namespace app\job; + +use think\queue\Job; + +class Job1{ + + public function fire(Job $job, $data){ + + //....这里执行具体的任务 + + if ($job->attempts() > 3) { + //通过这个方法可以检查这个任务已经重试了几次了 + } + + + //如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法 + $job->delete(); + + // 也可以重新发布这个任务 + $job->release($delay); //$delay为延迟时间 + + } + + public function failed($data){ + + // ...任务达到最大重试次数后,失败了 + } + +} + +``` + +``` + +namespace app\lib\job; + +use think\queue\Job; + +class Job2{ + + public function task1(Job $job, $data){ + + + } + + public function task2(Job $job, $data){ + + + } + + public function failed($data){ + + + } + +} + +``` + + +## 发布任务 +> `think\Queue::push($job, $data = '', $queue = null)` 和 `think\Queue::later($delay, $job, $data = '', $queue = null)` 两个方法,前者是立即执行,后者是在`$delay`秒后执行 + +`$job` 是任务名 +单模块的,且命名空间是`app\job`的,比如上面的例子一,写`Job1`类名即可 +多模块的,且命名空间是`app\module\job`的,写`model/Job1`即可 +其他的需要些完整的类名,比如上面的例子二,需要写完整的类名`app\lib\job\Job2` +如果一个任务类里有多个小任务的话,如上面的例子二,需要用@+方法名`app\lib\job\Job2@task1`、`app\lib\job\Job2@task2` + +`$data` 是你要传到任务里的参数 + +`$queue` 队列名,指定这个任务是在哪个队列上执行,同下面监控队列的时候指定的队列名,可不填 + +## 监听任务并执行 + +> php think queue:listen + +> php think queue:work --daemon(不加--daemon为执行单个任务) + +两种,具体的可选参数可以输入命令加 --help 查看 + +>可配合supervisor使用,保证进程常驻 diff --git a/vendor/topthink/think-queue/composer.json b/vendor/topthink/think-queue/composer.json new file mode 100644 index 0000000..dd16bc3 --- /dev/null +++ b/vendor/topthink/think-queue/composer.json @@ -0,0 +1,29 @@ +{ + "name": "topthink/think-queue", + "description": "The ThinkPHP5 Queue Package", + "type": "think-extend", + "authors": [ + { + "name": "yunwuxin", + "email": "448901948@qq.com" + } + ], + "license": "Apache-2.0", + "autoload": { + "psr-4": { + "think\\": "src" + }, + "files": [ + "src/common.php" + ] + }, + "require": { + "topthink/think-helper": ">=1.0.4", + "topthink/think-installer": ">=1.0.10" + }, + "extra": { + "think-config": { + "queue": "src/config.php" + } + } +} diff --git a/vendor/topthink/think-queue/src/Queue.php b/vendor/topthink/think-queue/src/Queue.php new file mode 100644 index 0000000..42ee371 --- /dev/null +++ b/vendor/topthink/think-queue/src/Queue.php @@ -0,0 +1,49 @@ + +// +---------------------------------------------------------------------- + +namespace think; + +use think\helper\Str; +use think\queue\Connector; + +/** + * Class Queue + * @package think\queue + * + * @method static push($job, $data = '', $queue = null) + * @method static later($delay, $job, $data = '', $queue = null) + * @method static pop($queue = null) + * @method static marshal() + */ +class Queue +{ + /** @var Connector */ + protected static $connector; + + private static function buildConnector() + { + $options = Config::get('queue'); + $type = !empty($options['connector']) ? $options['connector'] : 'Sync'; + + if (!isset(self::$connector)) { + + $class = false !== strpos($type, '\\') ? $type : '\\think\\queue\\connector\\' . Str::studly($type); + + self::$connector = new $class($options); + } + return self::$connector; + } + + public static function __callStatic($name, $arguments) + { + return call_user_func_array([self::buildConnector(), $name], $arguments); + } +} diff --git a/vendor/topthink/think-queue/src/common.php b/vendor/topthink/think-queue/src/common.php new file mode 100644 index 0000000..81b35d1 --- /dev/null +++ b/vendor/topthink/think-queue/src/common.php @@ -0,0 +1,36 @@ + +// +---------------------------------------------------------------------- + +\think\Console::addDefaultCommands([ + "think\\queue\\command\\Work", + "think\\queue\\command\\Restart", + "think\\queue\\command\\Listen", + "think\\queue\\command\\Subscribe" +]); + +if (!function_exists('queue')) { + + /** + * 添加到队列 + * @param $job + * @param string $data + * @param int $delay + * @param null $queue + */ + function queue($job, $data = '', $delay = 0, $queue = null) + { + if ($delay > 0) { + \think\Queue::later($delay, $job, $data, $queue); + } else { + \think\Queue::push($job, $data, $queue); + } + } +} diff --git a/vendor/topthink/think-queue/src/config.php b/vendor/topthink/think-queue/src/config.php new file mode 100644 index 0000000..9223ef6 --- /dev/null +++ b/vendor/topthink/think-queue/src/config.php @@ -0,0 +1,14 @@ + +// +---------------------------------------------------------------------- + +return [ + 'connector' => 'Sync' +]; diff --git a/vendor/topthink/think-queue/src/queue/CallQueuedHandler.php b/vendor/topthink/think-queue/src/queue/CallQueuedHandler.php new file mode 100644 index 0000000..0f1a627 --- /dev/null +++ b/vendor/topthink/think-queue/src/queue/CallQueuedHandler.php @@ -0,0 +1,36 @@ + +// +---------------------------------------------------------------------- + +namespace think\queue; + +class CallQueuedHandler +{ + + public function call(Job $job, array $data) + { + $command = unserialize($data['command']); + + call_user_func([$command, 'handle']); + + if (!$job->isDeletedOrReleased()) { + $job->delete(); + } + } + + public function failed(array $data) + { + $command = unserialize($data['command']); + + if (method_exists($command, 'failed')) { + $command->failed(); + } + } +} diff --git a/vendor/topthink/think-queue/src/queue/Connector.php b/vendor/topthink/think-queue/src/queue/Connector.php new file mode 100644 index 0000000..b069437 --- /dev/null +++ b/vendor/topthink/think-queue/src/queue/Connector.php @@ -0,0 +1,69 @@ + +// +---------------------------------------------------------------------- + +namespace think\queue; + +use InvalidArgumentException; + +abstract class Connector +{ + protected $options = []; + + abstract public function push($job, $data = '', $queue = null); + + abstract public function later($delay, $job, $data = '', $queue = null); + + abstract public function pop($queue = null); + + public function marshal() + { + throw new \RuntimeException('pop queues not support for this type'); + } + + protected function createPayload($job, $data = '', $queue = null) + { + if (is_object($job)) { + $payload = json_encode([ + 'job' => 'think\queue\CallQueuedHandler@call', + 'data' => [ + 'commandName' => get_class($job), + 'command' => serialize(clone $job), + ], + ]); + } else { + $payload = json_encode($this->createPlainPayload($job, $data)); + } + + if (JSON_ERROR_NONE !== json_last_error()) { + throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg()); + } + + return $payload; + } + + protected function createPlainPayload($job, $data) + { + return ['job' => $job, 'data' => $data]; + } + + protected function setMeta($payload, $key, $value) + { + $payload = json_decode($payload, true); + $payload[$key] = $value; + $payload = json_encode($payload); + + if (JSON_ERROR_NONE !== json_last_error()) { + throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg()); + } + + return $payload; + } +} diff --git a/vendor/topthink/think-queue/src/queue/Job.php b/vendor/topthink/think-queue/src/queue/Job.php new file mode 100644 index 0000000..cbca5b3 --- /dev/null +++ b/vendor/topthink/think-queue/src/queue/Job.php @@ -0,0 +1,213 @@ + +// +---------------------------------------------------------------------- + +namespace think\queue; + +use DateTime; +use think\Config; + +abstract class Job +{ + + /** + * The job handler instance. + * @var mixed + */ + protected $instance; + + /** + * The name of the queue the job belongs to. + * @var string + */ + protected $queue; + + /** + * Indicates if the job has been deleted. + * @var bool + */ + protected $deleted = false; + + /** + * Indicates if the job has been released. + * @var bool + */ + protected $released = false; + + /** + * Fire the job. + * @return void + */ + abstract public function fire(); + + /** + * Delete the job from the queue. + * @return void + */ + public function delete() + { + $this->deleted = true; + } + + /** + * Determine if the job has been deleted. + * @return bool + */ + public function isDeleted() + { + return $this->deleted; + } + + /** + * Release the job back into the queue. + * @param int $delay + * @return void + */ + public function release($delay = 0) + { + $this->released = true; + } + + /** + * Determine if the job was released back into the queue. + * @return bool + */ + public function isReleased() + { + return $this->released; + } + + /** + * Determine if the job has been deleted or released. + * @return bool + */ + public function isDeletedOrReleased() + { + return $this->isDeleted() || $this->isReleased(); + } + + /** + * Get the number of times the job has been attempted. + * @return int + */ + abstract public function attempts(); + + /** + * Get the raw body string for the job. + * @return string + */ + abstract public function getRawBody(); + + /** + * Resolve and fire the job handler method. + * @param array $payload + * @return void + */ + protected function resolveAndFire(array $payload) + { + list($class, $method) = $this->parseJob($payload['job']); + + $this->instance = $this->resolve($class); + if ($this->instance) { + $this->instance->{$method}($this, $payload['data']); + } + } + + /** + * Parse the job declaration into class and method. + * @param string $job + * @return array + */ + protected function parseJob($job) + { + $segments = explode('@', $job); + + return count($segments) > 1 ? $segments : [$segments[0], 'fire']; + } + + /** + * Resolve the given job handler. + * @param string $name + * @return mixed + */ + protected function resolve($name) + { + if (strpos($name, '\\') === false) { + + if (strpos($name, '/') === false) { + $module = ''; + } else { + list($module, $name) = explode('/', $name, 2); + } + + $name = Config::get('app_namespace') . ($module ? '\\' . strtolower($module) : '') . '\\job\\' . $name; + } + if (class_exists($name)) { + return new $name(); + } + } + + /** + * Call the failed method on the job instance. + * @return void + */ + public function failed() + { + $payload = json_decode($this->getRawBody(), true); + + list($class, $method) = $this->parseJob($payload['job']); + + $this->instance = $this->resolve($class); + if ($this->instance && method_exists($this->instance, 'failed')) { + $this->instance->failed($payload['data']); + } + } + + /** + * Calculate the number of seconds with the given delay. + * @param \DateTime|int $delay + * @return int + */ + protected function getSeconds($delay) + { + if ($delay instanceof DateTime) { + return max(0, $delay->getTimestamp() - $this->getTime()); + } + + return (int) $delay; + } + + /** + * Get the current system time. + * @return int + */ + protected function getTime() + { + return time(); + } + + /** + * Get the name of the queued job class. + * @return string + */ + public function getName() + { + return json_decode($this->getRawBody(), true)['job']; + } + + /** + * Get the name of the queue the job belongs to. + * @return string + */ + public function getQueue() + { + return $this->queue; + } +} diff --git a/vendor/topthink/think-queue/src/queue/Listener.php b/vendor/topthink/think-queue/src/queue/Listener.php new file mode 100644 index 0000000..e8fcaa4 --- /dev/null +++ b/vendor/topthink/think-queue/src/queue/Listener.php @@ -0,0 +1,164 @@ + +// +---------------------------------------------------------------------- + +namespace think\queue; + +use Closure; +use think\Process; + +class Listener +{ + + /** + * @var string + */ + protected $commandPath; + + /** + * @var int + */ + protected $sleep = 3; + + /** + * @var int + */ + protected $maxTries = 0; + + /** + * @var string + */ + protected $workerCommand; + + /** + * @var \Closure|null + */ + protected $outputHandler; + + /** + * @param string $commandPath + */ + public function __construct($commandPath) + { + $this->commandPath = $commandPath; + $this->workerCommand = + '"' . PHP_BINARY . '" think queue:work --queue="%s" --delay=%s --memory=%s --sleep=%s --tries=%s'; + } + + /** + * @param string $queue + * @param string $delay + * @param string $memory + * @param int $timeout + * @return void + */ + public function listen($queue, $delay, $memory, $timeout = 60) + { + $process = $this->makeProcess($queue, $delay, $memory, $timeout); + + while (true) { + $this->runProcess($process, $memory); + } + } + + /** + * @param \Think\Process $process + * @param int $memory + */ + public function runProcess(Process $process, $memory) + { + $process->run(function ($type, $line) { + $this->handleWorkerOutput($type, $line); + }); + + if ($this->memoryExceeded($memory)) { + $this->stop(); + } + } + + /** + * @param string $queue + * @param int $delay + * @param int $memory + * @param int $timeout + * @return \think\Process + */ + public function makeProcess($queue, $delay, $memory, $timeout) + { + $string = $this->workerCommand; + $command = sprintf($string, $queue, $delay, $memory, $this->sleep, $this->maxTries); + + return new Process($command, $this->commandPath, null, null, $timeout); + } + + /** + * @param int $type + * @param string $line + * @return void + */ + protected function handleWorkerOutput($type, $line) + { + if (isset($this->outputHandler)) { + call_user_func($this->outputHandler, $type, $line); + } + } + + /** + * @param int $memoryLimit + * @return bool + */ + public function memoryExceeded($memoryLimit) + { + return (memory_get_usage() / 1024 / 1024) >= $memoryLimit; + } + + /** + * @return void + */ + public function stop() + { + die; + } + + /** + * @param \Closure $outputHandler + * @return void + */ + public function setOutputHandler(Closure $outputHandler) + { + $this->outputHandler = $outputHandler; + } + + /** + * @return int + */ + public function getSleep() + { + return $this->sleep; + } + + /** + * @param int $sleep + * @return void + */ + public function setSleep($sleep) + { + $this->sleep = $sleep; + } + + /** + * @param int $tries + * @return void + */ + public function setMaxTries($tries) + { + $this->maxTries = $tries; + } +} diff --git a/vendor/topthink/think-queue/src/queue/Queueable.php b/vendor/topthink/think-queue/src/queue/Queueable.php new file mode 100644 index 0000000..2a6ec4e --- /dev/null +++ b/vendor/topthink/think-queue/src/queue/Queueable.php @@ -0,0 +1,46 @@ + +// +---------------------------------------------------------------------- + +namespace think\queue; + +trait Queueable +{ + + /** @var string 队列名称 */ + public $queue; + + /** @var integer 延迟时间 */ + public $delay; + + /** + * 设置队列名 + * @param $queue + * @return $this + */ + public function queue($queue) + { + $this->queue = $queue; + + return $this; + } + + /** + * 设置延迟时间 + * @param $delay + * @return $this + */ + public function delay($delay) + { + $this->delay = $delay; + + return $this; + } +} diff --git a/vendor/topthink/think-queue/src/queue/ShouldQueue.php b/vendor/topthink/think-queue/src/queue/ShouldQueue.php new file mode 100644 index 0000000..cb49c12 --- /dev/null +++ b/vendor/topthink/think-queue/src/queue/ShouldQueue.php @@ -0,0 +1,17 @@ + +// +---------------------------------------------------------------------- + +namespace think\queue; + +interface ShouldQueue +{ + +} diff --git a/vendor/topthink/think-queue/src/queue/Worker.php b/vendor/topthink/think-queue/src/queue/Worker.php new file mode 100644 index 0000000..9e370f0 --- /dev/null +++ b/vendor/topthink/think-queue/src/queue/Worker.php @@ -0,0 +1,119 @@ + +// +---------------------------------------------------------------------- + +namespace think\queue; + +use Exception; +use think\Hook; +use think\Queue; + +class Worker +{ + + /** + * 执行下个任务 + * @param string $queue + * @param int $delay + * @param int $sleep + * @param int $maxTries + * @return array + */ + public function pop($queue = null, $delay = 0, $sleep = 3, $maxTries = 0) + { + + $job = $this->getNextJob($queue); + + if (!is_null($job)) { + Hook::listen('worker_before_process', $queue); + return $this->process($job, $maxTries, $delay); + } + + Hook::listen('worker_before_sleep', $queue); + $this->sleep($sleep); + + return ['job' => null, 'failed' => false]; + } + + /** + * 获取下个任务 + * @param string $queue + * @return Job + */ + protected function getNextJob($queue) + { + if (is_null($queue)) { + return Queue::pop(); + } + + foreach (explode(',', $queue) as $queue) { + if (!is_null($job = Queue::pop($queue))) { + return $job; + } + } + } + + /** + * Process a given job from the queue. + * @param \think\queue\Job $job + * @param int $maxTries + * @param int $delay + * @return array + * @throws Exception + */ + public function process(Job $job, $maxTries = 0, $delay = 0) + { + if ($maxTries > 0 && $job->attempts() > $maxTries) { + return $this->logFailedJob($job); + } + + try { + $job->fire(); + + return ['job' => $job, 'failed' => false]; + } catch (Exception $e) { + if (!$job->isDeleted()) { + $job->release($delay); + } + + throw $e; + } + } + + /** + * Log a failed job into storage. + * @param \Think\Queue\Job $job + * @return array + */ + protected function logFailedJob(Job $job) + { + if (!$job->isDeleted()) { + try { + $job->delete(); + $job->failed(); + } finally { + Hook::listen('queue_failed', $job); + } + } + + return ['job' => $job, 'failed' => true]; + } + + /** + * Sleep the script for a given number of seconds. + * @param int $seconds + * @return void + */ + public function sleep($seconds) + { + sleep($seconds); + } + +} diff --git a/vendor/topthink/think-queue/src/queue/command/Listen.php b/vendor/topthink/think-queue/src/queue/command/Listen.php new file mode 100644 index 0000000..3f6cd63 --- /dev/null +++ b/vendor/topthink/think-queue/src/queue/command/Listen.php @@ -0,0 +1,60 @@ + +// +---------------------------------------------------------------------- + +namespace think\queue\command; + +use think\console\Command; +use think\console\Input; +use think\console\input\Option; +use think\console\Output; +use think\queue\Listener; + +class Listen extends Command +{ + /** @var Listener */ + protected $listener; + + public function configure() + { + $this->setName('queue:listen') + ->addOption('queue', null, Option::VALUE_OPTIONAL, 'The queue to listen on', null) + ->addOption('delay', null, Option::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0) + ->addOption('memory', null, Option::VALUE_OPTIONAL, 'The memory limit in megabytes', 128) + ->addOption('timeout', null, Option::VALUE_OPTIONAL, 'Seconds a job may run before timing out', 60) + ->addOption('sleep', null, Option::VALUE_OPTIONAL, 'Seconds to wait before checking queue for jobs', 3) + ->addOption('tries', null, Option::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 0) + ->setDescription('Listen to a given queue'); + } + + public function initialize(Input $input, Output $output) + { + $this->listener = new Listener(getcwd()); + $this->listener->setSleep($input->getOption('sleep')); + $this->listener->setMaxTries($input->getOption('tries')); + + $this->listener->setOutputHandler(function ($type, $line) use ($output) { + $output->write($line); + }); + } + + public function execute(Input $input, Output $output) + { + $delay = $input->getOption('delay'); + + $memory = $input->getOption('memory'); + + $timeout = $input->getOption('timeout'); + + $queue = $input->getOption('queue') ?: 'default'; + + $this->listener->listen($queue, $delay, $memory, $timeout); + } +} diff --git a/vendor/topthink/think-queue/src/queue/command/Restart.php b/vendor/topthink/think-queue/src/queue/command/Restart.php new file mode 100644 index 0000000..29d4a18 --- /dev/null +++ b/vendor/topthink/think-queue/src/queue/command/Restart.php @@ -0,0 +1,31 @@ + +// +---------------------------------------------------------------------- + +namespace think\queue\command; + +use think\Cache; +use think\console\Command; +use think\console\Input; +use think\console\Output; + +class Restart extends Command +{ + public function configure() + { + $this->setName('queue:restart')->setDescription('Restart queue worker daemons after their current job'); + } + + public function execute(Input $input, Output $output) + { + Cache::set('think:queue:restart', time()); + $output->writeln("Broadcasting queue restart signal."); + } +} diff --git a/vendor/topthink/think-queue/src/queue/command/Subscribe.php b/vendor/topthink/think-queue/src/queue/command/Subscribe.php new file mode 100644 index 0000000..35fc725 --- /dev/null +++ b/vendor/topthink/think-queue/src/queue/command/Subscribe.php @@ -0,0 +1,46 @@ + +// +---------------------------------------------------------------------- + +namespace think\queue\command; + +use think\console\Command; +use think\console\Input; +use think\console\input\Argument; +use think\console\input\Option; +use think\console\Output; +use think\Queue; +use think\Url; + +class Subscribe extends Command +{ + public function configure() + { + $this->setName('queue:subscribe') + ->setDescription('Subscribe a URL to an push queue') + ->addArgument('name', Argument::REQUIRED, 'name') + ->addArgument('url', Argument::REQUIRED, 'The URL to be subscribed.') + ->addArgument('queue', Argument::OPTIONAL, 'The URL to be subscribed.') + ->addOption('option', null, Option::VALUE_IS_ARRAY | Option::VALUE_OPTIONAL, 'the options'); + } + + public function execute(Input $input, Output $output) + { + + $url = $input->getArgument('url'); + if (!preg_match('/^https?:\/\//', $url)) { + $url = Url::build($url); + } + + Queue::subscribe($input->getArgument('name'), $url, $input->getArgument('queue'), $input->getOption('option')); + + $output->write('Queue subscriber added: ' . $input->getArgument('url') . ''); + } +} diff --git a/vendor/topthink/think-queue/src/queue/command/Work.php b/vendor/topthink/think-queue/src/queue/command/Work.php new file mode 100644 index 0000000..f7c2c95 --- /dev/null +++ b/vendor/topthink/think-queue/src/queue/command/Work.php @@ -0,0 +1,210 @@ + +// +---------------------------------------------------------------------- +namespace think\queue\command; + +use think\Config; +use think\console\Command; +use think\console\Input; +use think\console\input\Option; +use think\console\Output; +use think\Hook; +use think\queue\Job; +use think\queue\Worker; +use Exception; +use Throwable; +use think\Cache; +use think\exception\Handle; +use think\exception\ThrowableError; + +class Work extends Command +{ + + /** + * The queue worker instance. + * @var \think\queue\Worker + */ + protected $worker; + + protected function initialize(Input $input, Output $output) + { + $this->worker = new Worker(); + } + + protected function configure() + { + $this->setName('queue:work') + ->addOption('queue', null, Option::VALUE_OPTIONAL, 'The queue to listen on') + ->addOption('daemon', null, Option::VALUE_NONE, 'Run the worker in daemon mode') + ->addOption('delay', null, Option::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0) + ->addOption('force', null, Option::VALUE_NONE, 'Force the worker to run even in maintenance mode') + ->addOption('memory', null, Option::VALUE_OPTIONAL, 'The memory limit in megabytes', 128) + ->addOption('sleep', null, Option::VALUE_OPTIONAL, 'Number of seconds to sleep when no job is available', 3) + ->addOption('tries', null, Option::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 0) + ->setDescription('Process the next job on a queue'); + } + + /** + * Execute the console command. + * @param Input $input + * @param Output $output + * @return int|null|void + */ + public function execute(Input $input, Output $output) + { + $queue = $input->getOption('queue'); + + $delay = $input->getOption('delay'); + + $memory = $input->getOption('memory'); + + if ($input->getOption('daemon')) { + Hook::listen('worker_daemon_start', $queue); + $this->daemon( + $queue, $delay, $memory, + $input->getOption('sleep'), $input->getOption('tries') + ); + } else { + $response = $this->worker->pop($queue, $delay, $input->getOption('sleep'), $input->getOption('tries')); + $this->output($response); + } + } + + protected function output($response) + { + if (!is_null($response['job'])) { + /** @var Job $job */ + $job = $response['job']; + if ($response['failed']) { + $this->output->writeln('Failed: ' . $job->getName()); + } else { + $this->output->writeln('Processed: ' . $job->getName()); + } + } + } + + /** + * 启动一个守护进程执行任务. + * + * @param string $queue + * @param int $delay + * @param int $memory + * @param int $sleep + * @param int $maxTries + * @return array + */ + protected function daemon($queue = null, $delay = 0, $memory = 128, $sleep = 3, $maxTries = 0) + { + $lastRestart = $this->getTimestampOfLastQueueRestart(); + + while (true) { + $this->runNextJobForDaemon( + $queue, $delay, $sleep, $maxTries + ); + + if ( $this->memoryExceeded($memory) ) { + Hook::listen('worker_memory_exceeded', $queue); + $this->stop(); + } + + if ( $this->queueShouldRestart($lastRestart) ) { + Hook::listen('worker_queue_restart', $queue); + $this->stop(); + } + } + } + + /** + * 以守护进程的方式执行下个任务. + * + * @param string $queue + * @param int $delay + * @param int $sleep + * @param int $maxTries + * @return void + */ + protected function runNextJobForDaemon($queue, $delay, $sleep, $maxTries) + { + try { + $response = $this->worker->pop($queue, $delay, $sleep, $maxTries); + + $this->output($response); + } catch (Exception $e) { + $this->getExceptionHandler()->report($e); + } catch (Throwable $e) { + $this->getExceptionHandler()->report(new ThrowableError($e)); + } + } + + /** + * 获取上次重启守护进程的时间 + * + * @return int|null + */ + protected function getTimestampOfLastQueueRestart() + { + return Cache::get('think:queue:restart'); + } + + /** + * 检查是否要重启守护进程 + * + * @param int|null $lastRestart + * @return bool + */ + protected function queueShouldRestart($lastRestart) + { + return $this->getTimestampOfLastQueueRestart() != $lastRestart; + } + + /** + * 检查内存是否超出 + * @param int $memoryLimit + * @return bool + */ + protected function memoryExceeded($memoryLimit) + { + return (memory_get_usage() / 1024 / 1024) >= $memoryLimit; + } + + /** + * 获取异常处理实例 + * + * @return \think\exception\Handle + */ + protected function getExceptionHandler() + { + static $handle; + + if (!$handle) { + + if ($class = Config::get('exception_handle')) { + if (class_exists($class) && is_subclass_of($class, "\\think\\exception\\Handle")) { + $handle = new $class; + } + } + if (!$handle) { + $handle = new Handle(); + } + } + + return $handle; + } + + /** + * 停止执行任务的守护进程. + * @return void + */ + public function stop() + { + die; + } + +} diff --git a/vendor/topthink/think-queue/src/queue/connector/Database.php b/vendor/topthink/think-queue/src/queue/connector/Database.php new file mode 100644 index 0000000..71296a4 --- /dev/null +++ b/vendor/topthink/think-queue/src/queue/connector/Database.php @@ -0,0 +1,171 @@ + +// +---------------------------------------------------------------------- + +namespace think\queue\connector; + +use think\Db; +use think\queue\Connector; +use think\queue\job\Database as DatabaseJob; + +class Database extends Connector +{ + protected $db; + + protected $options = [ + 'expire' => 60, + 'default' => 'default', + 'table' => 'jobs', + 'dsn' => [] + ]; + + public function __construct($options) + { + if (!empty($options)) { + $this->options = array_merge($this->options, $options); + } + + $this->db = Db::connect($this->options['dsn']); + } + + public function push($job, $data = '', $queue = null) + { + return $this->pushToDatabase(0, $queue, $this->createPayload($job, $data)); + } + + public function later($delay, $job, $data = '', $queue = null) + { + return $this->pushToDatabase($delay, $queue, $this->createPayload($job, $data)); + } + + public function pop($queue = null) + { + $queue = $this->getQueue($queue); + + if (!is_null($this->options['expire'])) { + $this->releaseJobsThatHaveBeenReservedTooLong($queue); + } + + if ($job = $this->getNextAvailableJob($queue)) { + $this->markJobAsReserved($job->id); + + $this->db->commit(); + + return new DatabaseJob($this, $job, $queue); + } + + $this->db->commit(); + } + + /** + * 重新发布任务 + * @param string $queue + * @param \StdClass $job + * @param int $delay + * @return mixed + */ + public function release($queue, $job, $delay) + { + return $this->pushToDatabase($delay, $queue, $job->payload, $job->attempts); + } + + /** + * Push a raw payload to the database with a given delay. + * + * @param \DateTime|int $delay + * @param string|null $queue + * @param string $payload + * @param int $attempts + * @return mixed + */ + protected function pushToDatabase($delay, $queue, $payload, $attempts = 0) + { + return $this->db->name($this->options['table'])->insert([ + 'queue' => $this->getQueue($queue), + 'payload' => $payload, + 'attempts' => $attempts, + 'reserved' => 0, + 'reserved_at' => null, + 'available_at' => time() + $delay, + 'created_at' => time() + ]); + } + + /** + * 获取下个有效任务 + * + * @param string|null $queue + * @return \StdClass|null + */ + protected function getNextAvailableJob($queue) + { + $this->db->startTrans(); + + $job = $this->db->name($this->options['table']) + ->lock(true) + ->where('queue', $this->getQueue($queue)) + ->where('reserved', 0) + ->where('available_at', '<=', time()) + ->order('id', 'asc') + ->find(); + + return $job ? (object) $job : null; + } + + /** + * 标记任务正在执行. + * + * @param string $id + * @return void + */ + protected function markJobAsReserved($id) + { + $this->db->name($this->options['table'])->where('id', $id)->update([ + 'reserved' => 1, + 'reserved_at' => time() + ]); + } + + /** + * 重新发布超时的任务 + * + * @param string $queue + * @return void + */ + protected function releaseJobsThatHaveBeenReservedTooLong($queue) + { + $expired = time() - $this->options['expire']; + + $this->db->name($this->options['table']) + ->where('queue', $this->getQueue($queue)) + ->where('reserved', 1) + ->where('reserved_at', '<=', $expired) + ->update([ + 'reserved' => 0, + 'reserved_at' => null, + 'attempts' => ['exp', 'attempts + 1'] + ]); + } + + /** + * 删除任务 + * @param string $id + * @return void + */ + public function deleteReserved($id) + { + $this->db->name($this->options['table'])->delete($id); + } + + protected function getQueue($queue) + { + return $queue ?: $this->options['default']; + } +} diff --git a/vendor/topthink/think-queue/src/queue/connector/Redis.php b/vendor/topthink/think-queue/src/queue/connector/Redis.php new file mode 100644 index 0000000..9320d53 --- /dev/null +++ b/vendor/topthink/think-queue/src/queue/connector/Redis.php @@ -0,0 +1,236 @@ + +// +---------------------------------------------------------------------- + +namespace think\queue\connector; + +use Exception; +use think\helper\Str; +use think\queue\Connector; +use think\queue\job\Redis as RedisJob; + +class Redis extends Connector +{ + /** @var \Redis */ + protected $redis; + + protected $options = [ + 'expire' => 60, + 'default' => 'default', + 'host' => '127.0.0.1', + 'port' => 6379, + 'password' => '', + 'select' => 0, + 'timeout' => 0, + 'persistent' => false + ]; + + public function __construct($options) + { + if (!extension_loaded('redis')) { + throw new Exception('redis扩展未安装'); + } + if (!empty($options)) { + $this->options = array_merge($this->options, $options); + } + + $func = $this->options['persistent'] ? 'pconnect' : 'connect'; + $this->redis = new \Redis; + $this->redis->$func($this->options['host'], $this->options['port'], $this->options['timeout']); + + if ('' != $this->options['password']) { + $this->redis->auth($this->options['password']); + } + + if (0 != $this->options['select']) { + $this->redis->select($this->options['select']); + } + } + + public function push($job, $data = '', $queue = null) + { + return $this->pushRaw($this->createPayload($job, $data), $queue); + } + + public function later($delay, $job, $data = '', $queue = null) + { + $payload = $this->createPayload($job, $data); + + $this->redis->zAdd($this->getQueue($queue) . ':delayed', time() + $delay, $payload); + } + + public function pop($queue = null) + { + $original = $queue ?: $this->options['default']; + + $queue = $this->getQueue($queue); + + $this->migrateExpiredJobs($queue . ':delayed', $queue, false); + + if (!is_null($this->options['expire'])) { + $this->migrateExpiredJobs($queue . ':reserved', $queue); + } + + $job = $this->redis->lPop($queue); + + if ($job !== false) { + $this->redis->zAdd($queue . ':reserved', time() + $this->options['expire'], $job); + + return new RedisJob($this, $job, $original); + } + } + + /** + * 重新发布任务 + * + * @param string $queue + * @param string $payload + * @param int $delay + * @param int $attempts + * @return void + */ + public function release($queue, $payload, $delay, $attempts) + { + $payload = $this->setMeta($payload, 'attempts', $attempts); + + $this->redis->zAdd($this->getQueue($queue) . ':delayed', time() + $delay, $payload); + } + + public function pushRaw($payload, $queue = null) + { + $this->redis->rPush($this->getQueue($queue), $payload); + + return json_decode($payload, true)['id']; + } + + protected function createPayload($job, $data = '', $queue = null) + { + $payload = $this->setMeta( + parent::createPayload($job, $data), 'id', $this->getRandomId() + ); + + return $this->setMeta($payload, 'attempts', 1); + } + + /** + * 删除任务 + * + * @param string $queue + * @param string $job + * @return void + */ + public function deleteReserved($queue, $job) + { + $this->redis->zRem($this->getQueue($queue) . ':reserved', $job); + } + + /** + * 移动延迟任务 + * + * @param string $from + * @param string $to + * @param bool $attempt + */ + public function migrateExpiredJobs($from, $to, $attempt = true) + { + $this->redis->watch($from); + + $jobs = $this->getExpiredJobs( + $from, $time = time() + ); + if (count($jobs) > 0) { + $this->transaction(function () use ($from, $to, $time, $jobs, $attempt) { + $this->removeExpiredJobs($from, $time); + $this->pushExpiredJobsOntoNewQueue($to, $jobs, $attempt); + }); + } + $this->redis->unwatch(); + } + + /** + * redis事务 + * @param \Closure $closure + */ + protected function transaction(\Closure $closure) + { + $this->redis->multi(); + try { + call_user_func($closure); + if (!$this->redis->exec()) { + $this->redis->discard(); + } + } catch (Exception $e) { + $this->redis->discard(); + } + } + + /** + * 获取所有到期任务 + * + * @param string $from + * @param int $time + * @return array + */ + protected function getExpiredJobs($from, $time) + { + return $this->redis->zRangeByScore($from, '-inf', $time); + } + + /** + * 删除过期任务 + * + * @param string $from + * @param int $time + * @return void + */ + protected function removeExpiredJobs($from, $time) + { + $this->redis->zRemRangeByScore($from, '-inf', $time); + } + + /** + * 重新发布到期任务 + * + * @param string $to + * @param array $jobs + * @param boolean $attempt + */ + protected function pushExpiredJobsOntoNewQueue($to, $jobs, $attempt = true) + { + if ($attempt) { + foreach ($jobs as &$job) { + $attempts = json_decode($job, true)['attempts']; + $job = $this->setMeta($job, 'attempts', $attempts + 1); + } + } + call_user_func_array([$this->redis, 'rPush'], array_merge([$to], $jobs)); + } + + /** + * 随机id + * + * @return string + */ + protected function getRandomId() + { + return Str::random(32); + } + + /** + * 获取队列名 + * + * @param string|null $queue + * @return string + */ + protected function getQueue($queue) + { + return 'queues:' . ($queue ?: $this->options['default']); + } +} diff --git a/vendor/topthink/think-queue/src/queue/connector/Sync.php b/vendor/topthink/think-queue/src/queue/connector/Sync.php new file mode 100644 index 0000000..2fbdeda --- /dev/null +++ b/vendor/topthink/think-queue/src/queue/connector/Sync.php @@ -0,0 +1,57 @@ + +// +---------------------------------------------------------------------- + +namespace think\queue\connector; + +use Exception; +use think\queue\Connector; +use think\queue\job\Sync as SyncJob; +use Throwable; + +class Sync extends Connector +{ + + public function push($job, $data = '', $queue = null) + { + $queueJob = $this->resolveJob($this->createPayload($job, $data, $queue)); + + try { + set_time_limit(0); + $queueJob->fire(); + } catch (Exception $e) { + $queueJob->failed(); + + throw $e; + } catch (Throwable $e) { + $queueJob->failed(); + + throw $e; + } + + return 0; + } + + public function later($delay, $job, $data = '', $queue = null) + { + return $this->push($job, $data, $queue); + } + + public function pop($queue = null) + { + + } + + protected function resolveJob($payload) + { + return new SyncJob($payload); + } + +} diff --git a/vendor/topthink/think-queue/src/queue/connector/Topthink.php b/vendor/topthink/think-queue/src/queue/connector/Topthink.php new file mode 100644 index 0000000..a21c054 --- /dev/null +++ b/vendor/topthink/think-queue/src/queue/connector/Topthink.php @@ -0,0 +1,225 @@ + +// +---------------------------------------------------------------------- + +namespace think\queue\connector; + +use think\exception\HttpException; +use think\queue\Connector; +use think\Request; +use think\queue\job\Topthink as TopthinkJob; +use think\Response; + +class Topthink extends Connector +{ + protected $options = [ + 'token' => '', + 'project_id' => '', + 'protocol' => 'https', + 'host' => 'qns.topthink.com', + 'port' => 443, + 'api_version' => 1, + 'max_retries' => 3, + 'default' => 'default' + ]; + + /** @var Request */ + protected $request; + + protected $url; + + protected $curl = null; + + protected $last_status; + + protected $headers = []; + + public function __construct($options) + { + if (!empty($options)) { + $this->options = array_merge($this->options, $options); + } + + $this->url = "{$this->options['protocol']}://{$this->options['host']}:{$this->options['port']}/v{$this->options['api_version']}/"; + + $this->headers['Authorization'] = "Bearer {$this->options['token']}"; + + $this->request = Request::instance(); + } + + public function push($job, $data = '', $queue = null) + { + return $this->pushRaw(0, $queue, $this->createPayload($job, $data)); + } + + public function later($delay, $job, $data = '', $queue = null) + { + return $this->pushRaw($delay, $queue, $this->createPayload($job, $data)); + } + + public function release($queue, $job, $delay) + { + return $this->pushRaw($delay, $queue, $job->payload, $job->attempts); + } + + public function marshal() + { + $job = new TopthinkJob($this, $this->marshalPushedJob(), $this->request->header('topthink-message-queue')); + if ($this->request->header('topthink-message-status') == 'success') { + $job->fire(); + } else { + $job->failed(); + } + return new Response('OK'); + } + + public function pushRaw($delay, $queue, $payload, $attempts = 0) + { + $queue_name = $this->getQueue($queue); + $queue = rawurlencode($queue_name); + $url = "project/{$this->options['project_id']}/queue/{$queue}/message"; + $message = [ + 'payload' => $payload, + 'attempts' => $attempts, + 'delay' => $delay + ]; + + return $this->apiCall('POST', $url, $message)->id; + } + + public function deleteMessage($queue, $id) + { + $queue = rawurlencode($queue); + $url = "project/{$this->options['project_id']}/queue/{$queue}/message/{$id}"; + return $this->apiCall('DELETE', $url); + } + + protected function apiCall($type, $url, $params = []) + { + $url = "{$this->url}$url"; + + if ($this->curl == null) { + $this->curl = curl_init(); + } + + switch ($type = strtoupper($type)) { + case 'DELETE': + curl_setopt($this->curl, CURLOPT_URL, $url); + curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type); + curl_setopt($this->curl, CURLOPT_POSTFIELDS, json_encode($params)); + break; + case 'PUT': + curl_setopt($this->curl, CURLOPT_URL, $url); + curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type); + curl_setopt($this->curl, CURLOPT_POSTFIELDS, json_encode($params)); + break; + case 'POST': + curl_setopt($this->curl, CURLOPT_URL, $url); + curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type); + curl_setopt($this->curl, CURLOPT_POST, true); + curl_setopt($this->curl, CURLOPT_POSTFIELDS, $params); + break; + case 'GET': + curl_setopt($this->curl, CURLOPT_POSTFIELDS, null); + curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type); + curl_setopt($this->curl, CURLOPT_HTTPGET, true); + $url .= '?' . http_build_query($params); + curl_setopt($this->curl, CURLOPT_URL, $url); + break; + } + + curl_setopt($this->curl, CURLOPT_SSL_VERIFYPEER, false); + curl_setopt($this->curl, CURLOPT_RETURNTRANSFER, true); + + $headers = []; + foreach ($this->headers as $k => $v) { + if ($k == 'Connection') { + $v = 'Close'; + } + $headers[] = "$k: $v"; + } + + curl_setopt($this->curl, CURLOPT_HTTPHEADER, $headers); + curl_setopt($this->curl, CURLOPT_CONNECTTIMEOUT, 10); + + return $this->callWithRetries(); + } + + protected function callWithRetries() + { + for ($retry = 0; $retry < $this->options['max_retries']; $retry++) { + $out = curl_exec($this->curl); + if ($out === false) { + $this->reportHttpError(0, curl_error($this->curl)); + } + $this->last_status = curl_getinfo($this->curl, CURLINFO_HTTP_CODE); + + if ($this->last_status >= 200 && $this->last_status < 300) { + return self::jsonDecode($out); + } elseif ($this->last_status >= 500) { + self::waitRandomInterval($retry); + } else { + $this->reportHttpError($this->last_status, $out); + } + } + $this->reportHttpError($this->last_status, "Service unavailable"); + return; + } + + protected static function jsonDecode($response) + { + $data = json_decode($response); + + $json_error = json_last_error(); + if ($json_error != JSON_ERROR_NONE) { + throw new \RuntimeException($json_error); + } + + return $data; + } + + protected static function waitRandomInterval($retry) + { + $max_delay = pow(4, $retry) * 100 * 1000; + usleep(rand(0, $max_delay)); + } + + protected function reportHttpError($status, $text) + { + throw new HttpException($status, "http error: {$status} | {$text}"); + } + + /** + * Marshal out the pushed job and payload. + * + * @return object + */ + protected function marshalPushedJob() + { + return (object) [ + 'id' => $this->request->header('topthink-message-id'), + 'payload' => $this->request->getContent(), + 'attempts' => $this->request->header('topthink-message-attempts') + ]; + } + + public function __destruct() + { + if ($this->curl != null) { + curl_close($this->curl); + $this->curl = null; + } + } + + public function pop($queue = null) + { + throw new \RuntimeException('pop queues not support for this type'); + } +} diff --git a/vendor/topthink/think-queue/src/queue/job/Database.php b/vendor/topthink/think-queue/src/queue/job/Database.php new file mode 100644 index 0000000..06064b3 --- /dev/null +++ b/vendor/topthink/think-queue/src/queue/job/Database.php @@ -0,0 +1,88 @@ + +// +---------------------------------------------------------------------- +namespace think\queue\job; + +use think\queue\Job; +use think\queue\connector\Database as DatabaseQueue; + +class Database extends Job +{ + /** + * The database queue instance. + * @var DatabaseQueue + */ + protected $database; + + /** + * The database job payload. + * @var Object + */ + protected $job; + + public function __construct(DatabaseQueue $database, $job, $queue) + { + $this->job = $job; + $this->queue = $queue; + $this->database = $database; + $this->job->attempts = $this->job->attempts + 1; + } + + /** + * 执行任务 + * @return void + */ + public function fire() + { + $this->resolveAndFire(json_decode($this->job->payload, true)); + } + + /** + * 删除任务 + * @return void + */ + public function delete() + { + parent::delete(); + $this->database->deleteReserved($this->job->id); + } + + /** + * 重新发布任务 + * @param int $delay + * @return void + */ + public function release($delay = 0) + { + parent::release($delay); + + $this->delete(); + + $this->database->release($this->queue, $this->job, $delay); + } + + /** + * 获取当前任务尝试次数 + * @return int + */ + public function attempts() + { + return (int) $this->job->attempts; + } + + /** + * Get the raw body string for the job. + * @return string + */ + public function getRawBody() + { + return $this->job->payload; + } +} diff --git a/vendor/topthink/think-queue/src/queue/job/Redis.php b/vendor/topthink/think-queue/src/queue/job/Redis.php new file mode 100644 index 0000000..10477ce --- /dev/null +++ b/vendor/topthink/think-queue/src/queue/job/Redis.php @@ -0,0 +1,92 @@ + +// +---------------------------------------------------------------------- + +namespace think\queue\job; + +use think\queue\Job; +use think\queue\connector\Redis as RedisQueue; + +class Redis extends Job +{ + + /** + * The redis queue instance. + * @var RedisQueue + */ + protected $redis; + + /** + * The database job payload. + * @var Object + */ + protected $job; + + public function __construct(RedisQueue $redis, $job, $queue) + { + $this->job = $job; + $this->queue = $queue; + $this->redis = $redis; + } + + /** + * Fire the job. + * @return void + */ + public function fire() + { + $this->resolveAndFire(json_decode($this->getRawBody(), true)); + } + + /** + * Get the number of times the job has been attempted. + * @return int + */ + public function attempts() + { + return json_decode($this->job, true)['attempts']; + } + + /** + * Get the raw body string for the job. + * @return string + */ + public function getRawBody() + { + return $this->job; + } + + /** + * 删除任务 + * + * @return void + */ + public function delete() + { + parent::delete(); + + $this->redis->deleteReserved($this->queue, $this->job); + } + + /** + * 重新发布任务 + * + * @param int $delay + * @return void + */ + public function release($delay = 0) + { + parent::release($delay); + + $this->delete(); + + $this->redis->release($this->queue, $this->job, $delay, $this->attempts() + 1); + } +} diff --git a/vendor/topthink/think-queue/src/queue/job/Sync.php b/vendor/topthink/think-queue/src/queue/job/Sync.php new file mode 100644 index 0000000..e81a758 --- /dev/null +++ b/vendor/topthink/think-queue/src/queue/job/Sync.php @@ -0,0 +1,56 @@ + +// +---------------------------------------------------------------------- + +namespace think\queue\job; + +use think\queue\Job; + +class Sync extends Job +{ + /** + * The queue message data. + * + * @var string + */ + protected $payload; + + public function __construct($payload) + { + $this->payload = $payload; + } + + /** + * Fire the job. + * @return void + */ + public function fire() + { + $this->resolveAndFire(json_decode($this->payload, true)); + } + + /** + * Get the number of times the job has been attempted. + * @return int + */ + public function attempts() + { + return 1; + } + + /** + * Get the raw body string for the job. + * @return string + */ + public function getRawBody() + { + return $this->payload; + } +} diff --git a/vendor/topthink/think-queue/src/queue/job/Topthink.php b/vendor/topthink/think-queue/src/queue/job/Topthink.php new file mode 100644 index 0000000..b98b59c --- /dev/null +++ b/vendor/topthink/think-queue/src/queue/job/Topthink.php @@ -0,0 +1,85 @@ + +// +---------------------------------------------------------------------- + +namespace think\queue\job; + +use think\queue\Job; +use think\queue\connector\Topthink as TopthinkQueue; + +class Topthink extends Job +{ + + /** + * The Iron queue instance. + * + * @var TopthinkQueue + */ + protected $topthink; + + /** + * The IronMQ message instance. + * + * @var object + */ + protected $job; + + public function __construct(TopthinkQueue $topthink, $job, $queue) + { + $this->topthink = $topthink; + $this->job = $job; + $this->queue = $queue; + $this->job->attempts = $this->job->attempts + 1; + } + + /** + * Fire the job. + * @return void + */ + public function fire() + { + $this->resolveAndFire(json_decode($this->job->payload, true)); + } + + /** + * Get the number of times the job has been attempted. + * @return int + */ + public function attempts() + { + return (int) $this->job->attempts; + } + + public function delete() + { + parent::delete(); + + $this->topthink->deleteMessage($this->queue, $this->job->id); + } + + public function release($delay = 0) + { + parent::release($delay); + + $this->delete(); + + $this->topthink->release($this->queue, $this->job, $delay); + } + + /** + * Get the raw body string for the job. + * @return string + */ + public function getRawBody() + { + return $this->job->payload; + } + +}