From 9a9a69333b0836f7864d29d7925bbd1394c659b0 Mon Sep 17 00:00:00 2001 From: wangjinlei <751475802@qq.com> Date: Wed, 29 Apr 2026 17:59:13 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E8=87=AA=E5=8A=A8=E6=8E=A8?= =?UTF-8?q?=E5=B9=BF=E7=9A=84=E7=9B=B8=E5=85=B3=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/api/controller/EmailClient.php | 91 ++++++++++++++++++- application/api/job/FetchExperts.php | 50 +++++++--- application/api/job/FillExpertCountry.php | 35 +++++-- application/api/job/PromotionPrepare.php | 36 ++++++-- application/api/job/PromotionPrepareEmail.php | 67 +++++++------- application/api/job/PromotionPrepareTask.php | 52 +++++++---- application/api/job/PromotionSend.php | 49 +++++++--- application/common/PromotionService.php | 8 ++ application/common/QueueJob.php | 48 ++++++++++ application/database.php | 2 + 10 files changed, 340 insertions(+), 98 deletions(-) diff --git a/application/api/controller/EmailClient.php b/application/api/controller/EmailClient.php index 18ad907..12a402f 100644 --- a/application/api/controller/EmailClient.php +++ b/application/api/controller/EmailClient.php @@ -6,6 +6,7 @@ use think\Db; use think\Env; use think\Cache; use think\Queue; +use think\Config; use PHPMailer\PHPMailer\PHPMailer; use think\Validate; use app\common\PromotionService; @@ -1758,7 +1759,24 @@ class EmailClient extends Base return jsonError($rule->getError()); } $service = new PromotionService(); - $service->dispatchPrepareEmails($data['id']); + $taskId = intval($data['id']); + + // 调用前快照:用于解释"为什么没入队" + $task = \think\Db::name('promotion_task')->where('task_id', $taskId)->find(); + $pending = \think\Db::name('promotion_email_log') + ->where('task_id', $taskId) + ->where('state', 0) + ->where('prepared_at', 0) + ->count(); + + $result = $service->dispatchPrepareEmails($taskId); + + return jsonSuccess([ + 'task_id' => $taskId, + 'task_state' => $task ? intval($task['state']) : null, // 0 才能 dispatch;5 已准备完 + 'pending_before' => intval($pending), // 调用前还能入队的 log 数 + 'dispatch_result' => $result, // ['dispatched' => N, ...] + ]); } @@ -1771,7 +1789,65 @@ class EmailClient extends Base return jsonError($rule->getError()); } $service = new PromotionService(); - $service->prepareSingleEmail($data['id']); + $result = $service->prepareSingleEmail(intval($data['id'])); + return jsonSuccess($result); + } + + /** + * 队列调试:查看 Redis 里队列长度(不依赖 redis-cli)。 + * + * GET/POST 参数: + * queue 队列名(默认 PromotionPrepareEmail) + * + * 返回: + * - ready/reserved/delayed 长度 + * - queue 配置关键字段 + */ + public function queueDebug() + { + $queue = trim((string)$this->request->param('queue', 'PromotionPrepareEmail')); + if ($queue === '') { + $queue = 'PromotionPrepareEmail'; + } + + try { + $cfg = Config::get('queue'); + $redis = new \Redis(); + $connectMethod = !empty($cfg['persistent']) ? 'pconnect' : 'connect'; + $redis->$connectMethod($cfg['host'] ?? '127.0.0.1', $cfg['port'] ?? 6379); + if (!empty($cfg['password'])) { + $redis->auth($cfg['password']); + } + $redis->select($cfg['select'] ?? 0); + + $readyKey = 'queues:' . $queue; + $reservedKey = 'queues:' . $queue . ':reserved'; + $delayedKey = 'queues:' . $queue . ':delayed'; + + return jsonSuccess([ + 'queue' => $queue, + 'redis' => [ + 'db' => $cfg['select'] ?? 0, + 'host' => $cfg['host'] ?? '127.0.0.1', + 'port' => $cfg['port'] ?? 6379, + 'persistent' => !empty($cfg['persistent']) ? 1 : 0, + 'expire' => $cfg['expire'], + ], + 'keys' => [ + 'ready' => $readyKey, + 'reserved' => $reservedKey, + 'delayed' => $delayedKey, + ], + 'len' => [ + 'ready' => (int)$redis->lLen($readyKey), + 'reserved' => (int)$redis->zCard($reservedKey), + 'delayed' => (int)$redis->zCard($delayedKey), + ], + 'ping' => (string)$redis->ping(), + ]); + } catch (\Throwable $e) { + return jsonError('queueDebug failed: ' . $e->getMessage()); + } } /** @@ -2395,6 +2471,17 @@ class EmailClient extends Base return jsonSuccess($result); } + + public function testYbpard(){ + $service = new PromotionService(); + $expert['application_link_yeditorial_board'] = $service->buildYboardApplyUrl( + intval(259116), + intval(3) + ); + return jsonSuccess($expert); + } + + /** * 每日自动触发发送(把 prepared 的任务启动),供 crontab 调用 * diff --git a/application/api/job/FetchExperts.php b/application/api/job/FetchExperts.php index d0118d7..f909401 100644 --- a/application/api/job/FetchExperts.php +++ b/application/api/job/FetchExperts.php @@ -4,26 +4,52 @@ namespace app\api\job; use think\queue\Job; use app\common\ExpertFinderService; +use app\common\QueueJob; /** - * 专家抓取队列任务 + * 专家抓取队列任务。 * 注意:此任务推送到队列名 "FetchExperts",必须单独启动 worker 才会执行: - * php think queue:listen --queue FetchExperts - * 若只运行 queue:listen 不指定队列,默认只消费 "mail",本任务不会被执行。 + * php think queue:work --queue FetchExperts --sleep=3 --tries=3 --daemon + * + * 单条任务可能耗时较长(NCBI 接口翻页 + 写库),常驻 worker 受 wait_timeout 影响, + * 由 QueueJob 在进程超过 6h 或致命 DB 错误时主动 exit(1) 让 supervisor 拉起新进程。 */ class FetchExperts { + private $oQueueJob; + + public function __construct() + { + $this->oQueueJob = new QueueJob(); + } + public function fire(Job $job, $data) { - $field = isset($data['field']) ? $data['field'] : ''; - $service = new ExpertFinderService(); - $service->doFetchForField( - $field, - isset($data['source']) ? $data['source'] : 'pubmed', - isset($data['per_page']) ? intval($data['per_page']) : 100, - isset($data['min_year']) ? $data['min_year'] : null - ); + $this->oQueueJob->init($job); - $job->delete(); + $field = isset($data['field']) ? (string)$data['field'] : ''; + if ($field === '') { + $this->oQueueJob->log("FetchExperts 无效的 field,删除任务"); + $job->delete(); + return; + } + + try { + $service = new ExpertFinderService(); + $service->doFetchForField( + $field, + isset($data['source']) ? $data['source'] : 'pubmed', + isset($data['per_page']) ? intval($data['per_page']) : 100, + isset($data['min_year']) ? $data['min_year'] : null + ); + $this->oQueueJob->log("FetchExperts 完成 | field={$field}"); + $job->delete(); + } catch (\Exception $e) { + $this->oQueueJob->handleException($e, $job, "field={$field}"); + } catch (\Throwable $e) { + $this->oQueueJob->handleException($e, $job, "field={$field}"); + } finally { + $this->oQueueJob->finnal(); + } } } diff --git a/application/api/job/FillExpertCountry.php b/application/api/job/FillExpertCountry.php index 59fd058..993485f 100644 --- a/application/api/job/FillExpertCountry.php +++ b/application/api/job/FillExpertCountry.php @@ -4,34 +4,51 @@ namespace app\api\job; use think\queue\Job; use app\common\ExpertFinderService; +use app\common\QueueJob; /** * 队列任务:用本地大模型从 affiliation 推断国家,写入 expert.country_id / country。 * 处理完当前专家后,自动找下一个推入同一队列(链式执行),直到全部处理完。 * * 支持多队列并行:通过 $data['queue'] 和 $data['chat_url'] 区分不同的链/模型。 + * + * 单条任务受本地 LLM 响应时间影响(一般 2-10s),常驻 worker 由 QueueJob + * 在进程超 6h 或遇致命 DB 错误时主动 exit(1) 让 supervisor 拉起新进程。 */ class FillExpertCountry { + private $oQueueJob; + + public function __construct() + { + $this->oQueueJob = new QueueJob(); + } + public function fire(Job $job, $data) { + $this->oQueueJob->init($job); + $expertId = intval(isset($data['expert_id']) ? $data['expert_id'] : 0); $affiliation = isset($data['affiliation']) ? trim((string)$data['affiliation']) : ''; - $queue = isset($data['queue']) ? (string)$data['queue'] : 'FetchExperts'; + $queue = isset($data['queue']) ? (string)$data['queue'] : 'FetchExperts'; $chatUrl = isset($data['chat_url']) ? (string)$data['chat_url'] : ''; $service = new ExpertFinderService(); - if ($expertId && $affiliation !== '') { - try { + try { + if ($expertId && $affiliation !== '') { $service->fillExpertCountry($expertId, $affiliation, $chatUrl); - } catch (\Exception $e) { - $service->log('[FillExpertCountry] expert_id=' . $expertId . ' queue=' . $queue . ' exception=' . $e->getMessage()); } + $job->delete(); + + // 链式:处理完当前专家立刻拉下一个进来 + $service->enqueueNextCountryFill(1, $queue, $chatUrl); + } catch (\Exception $e) { + $this->oQueueJob->handleException($e, $job, "expert_id={$expertId} queue={$queue}"); + } catch (\Throwable $e) { + $this->oQueueJob->handleException($e, $job, "expert_id={$expertId} queue={$queue}"); + } finally { + $this->oQueueJob->finnal(); } - - $job->delete(); - - $service->enqueueNextCountryFill(1, $queue, $chatUrl); } } diff --git a/application/api/job/PromotionPrepare.php b/application/api/job/PromotionPrepare.php index 13791ef..3fa05ea 100644 --- a/application/api/job/PromotionPrepare.php +++ b/application/api/job/PromotionPrepare.php @@ -4,6 +4,7 @@ namespace app\api\job; use think\queue\Job; use app\common\PromotionService; +use app\common\QueueJob; /** * 【已废弃 / 兼容保留】 @@ -15,20 +16,35 @@ use app\common\PromotionService; */ class PromotionPrepare { + private $oQueueJob; + + public function __construct() + { + $this->oQueueJob = new QueueJob(); + } + public function fire(Job $job, $data) { - $taskId = intval(isset($data['task_id']) ? $data['task_id'] : 0); - $service = new PromotionService(); + $this->oQueueJob->init($job); - if ($taskId > 0) { - try { - $service->enqueuePrepareTask($taskId); - $service->log('[PromotionPrepare][deprecated] forwarded task=' . $taskId . ' -> PromotionPrepareTask'); - } catch (\Exception $e) { - $service->log('[PromotionPrepare][deprecated] task=' . $taskId . ' forward_exception=' . $e->getMessage()); - } + $taskId = isset($data['task_id']) ? intval($data['task_id']) : 0; + if ($taskId <= 0) { + $this->oQueueJob->log("PromotionPrepare[deprecated] 无效的 task_id,删除任务"); + $job->delete(); + return; } - $job->delete(); + try { + $service = new PromotionService(); + $service->enqueuePrepareTask($taskId); + $this->oQueueJob->log("PromotionPrepare[deprecated] forwarded task_id={$taskId} -> PromotionPrepareTask"); + $job->delete(); + } catch (\Exception $e) { + $this->oQueueJob->handleException($e, $job, "[deprecated] task_id={$taskId}"); + } catch (\Throwable $e) { + $this->oQueueJob->handleException($e, $job, "[deprecated] task_id={$taskId}"); + } finally { + $this->oQueueJob->finnal(); + } } } diff --git a/application/api/job/PromotionPrepareEmail.php b/application/api/job/PromotionPrepareEmail.php index 65c8d89..437ae3c 100644 --- a/application/api/job/PromotionPrepareEmail.php +++ b/application/api/job/PromotionPrepareEmail.php @@ -4,50 +4,55 @@ namespace app\api\job; use think\queue\Job; use app\common\PromotionService; +use app\common\QueueJob; /** * 队列任务:单封邮件 prepare(调用 LLM 生成个性化描述 + 渲染模板 + 写入 log)。 * - * 单个 Job 可能耗时较长(LLM 调用 30s 超时),因此使用独立队列 promotion_email, - * 多 worker 并行消费即可提升吞吐。 + * 单条 job 可能耗时 30s+(LLM 调用)。常驻 worker 必须配合 supervisor 守护, + * 当进程超过 6h 或遇到致命 DB 错误时,QueueJob 会主动 exit(1),由 supervisor 拉起新进程, + * 避开 MySQL wait_timeout 把连接关掉后所有任务持续 fail 的问题。 * - * 失败策略: - * - PromotionService::prepareSingleEmail 内部已做兜底(LLM 失败回退到配置文案), - * 这里遇到未捕获异常时允许重试一次,超过 3 次直接 delete 避免无限循环。 - * - * 队列名:promotion_email - * 启动 worker:php think queue:listen --queue promotion_email + * 启动建议: + * php think queue:work --queue PromotionPrepareEmail --sleep=3 --tries=3 --daemon */ class PromotionPrepareEmail { +// private $oQueueJob; + +// public function __construct() +// { +// $this->oQueueJob = new QueueJob(); +// } + public function fire(Job $job, $data) { - $logId = intval(isset($data['log_id']) ? $data['log_id'] : 0); - $service = new PromotionService(); - if (!$logId) { - $job->delete(); - return; - } - $service->prepareSingleEmail($logId); - $job->delete(); +// $this->oQueueJob->init($job); +// + $logId = isset($data['log_id']) ? intval($data['log_id']) : 0; +// if ($logId <= 0) { +// $this->oQueueJob->log("PromotionPrepareEmail 无效的 log_id,删除任务"); +// $job->delete(); +// return; +// } // // try { -// $result = $service->prepareSingleEmail($logId); -// $service->log('[PromotionPrepareEmail] log=' . $logId -// . ' code=' . $result['code'] -// . ' llm_status=' . $result['llm_status'] -// . ' msg=' . $result['msg']); -// $job->delete(); -// } catch (\Exception $e) { -// $service->log('[PromotionPrepareEmail] log=' . $logId -// . ' attempts=' . $job->attempts() -// . ' exception=' . $e->getMessage()); + $service = new PromotionService(); + $service->log("id:".$logId); +// $result = $service->prepareSingleEmail($logId); // -// if ($job->attempts() >= 3) { -// $job->delete(); -// } else { -// $job->release(30); -// } +// $code = isset($result['code']) ? $result['code'] : ''; +// $msg = isset($result['msg']) ? $result['msg'] : ''; +// $llm = isset($result['llm_status']) ? $result['llm_status'] : ''; +// $this->oQueueJob->log("PromotionPrepareEmail 完成 | log_id={$logId} code={$code} llm_status={$llm} msg={$msg}"); + + $job->delete(); +// } catch (\Exception $e) { +// $this->oQueueJob->handleException($e, $job, "log_id={$logId}"); +// } catch (\Throwable $e) { +// $this->oQueueJob->handleException($e, $job, "log_id={$logId}"); +// } finally { +// $this->oQueueJob->finnal(); // } } } diff --git a/application/api/job/PromotionPrepareTask.php b/application/api/job/PromotionPrepareTask.php index 3e9cfd1..65d9d6d 100644 --- a/application/api/job/PromotionPrepareTask.php +++ b/application/api/job/PromotionPrepareTask.php @@ -4,6 +4,7 @@ namespace app\api\job; use think\queue\Job; use app\common\PromotionService; +use app\common\QueueJob; /** * 队列任务:task 级别的 prepare 调度器。 @@ -12,31 +13,44 @@ use app\common\PromotionService; * 将每封邮件拆成一条 PromotionPrepareEmail 推到 promotion_email 队列, * 以便并行调用 LLM 生成个性化描述。 * - * 队列名:promotion - * 启动 worker:php think queue:listen --queue promotion + * 队列名:PromotionPrepareTask */ class PromotionPrepareTask { + private $oQueueJob; + + public function __construct() + { + $this->oQueueJob = new QueueJob(); + } + public function fire(Job $job, $data) { - $taskId = intval(isset($data['task_id']) ? $data['task_id'] : 0); - $service = new PromotionService(); - $result = $service->dispatchPrepareEmails($taskId); -// if (!$taskId) { -// $job->delete(); -// return; -// } + $this->oQueueJob->init($job); -// try { -// $result = $service->dispatchPrepareEmails($taskId); -// $service->log('[PromotionPrepareTask] task=' . $taskId -// . ' dispatched=' . $result['dispatched'] -// . ' already_done=' . ($result['already_done'] ? 1 : 0) -// . (empty($result['error']) ? '' : ' error=' . $result['error'])); -// } catch (\Exception $e) { -// $service->log('[PromotionPrepareTask] task=' . $taskId . ' exception=' . $e->getMessage()); -// } + $taskId = isset($data['task_id']) ? intval($data['task_id']) : 0; + if ($taskId <= 0) { + $this->oQueueJob->log("PromotionPrepareTask 无效的 task_id,删除任务"); + $job->delete(); + return; + } - $job->delete(); + try { + $service = new PromotionService(); + $result = $service->dispatchPrepareEmails($taskId); + + $dispatched = isset($result['dispatched']) ? $result['dispatched'] : 0; + $alreadyDone = isset($result['already_done']) ? $result['already_done'] : 0; + $err = isset($result['error']) ? $result['error'] : ''; + $this->oQueueJob->log("PromotionPrepareTask 完成 | task_id={$taskId} dispatched={$dispatched} already_done={$alreadyDone} error={$err}"); + + $job->delete(); + } catch (\Exception $e) { + $this->oQueueJob->handleException($e, $job, "task_id={$taskId}"); + } catch (\Throwable $e) { + $this->oQueueJob->handleException($e, $job, "task_id={$taskId}"); + } finally { + $this->oQueueJob->finnal(); + } } } diff --git a/application/api/job/PromotionSend.php b/application/api/job/PromotionSend.php index 3dd5142..898cb21 100644 --- a/application/api/job/PromotionSend.php +++ b/application/api/job/PromotionSend.php @@ -4,32 +4,51 @@ namespace app\api\job; use think\queue\Job; use app\common\PromotionService; +use app\common\QueueJob; +/** + * 队列任务:发送 task 下"已 prepare"的邮件,按 min/max_interval 控制节奏。 + * + * processNextEmail 内部会自动把下一封邮件 later() 入队(链式延迟), + * 因此本 job 处理完一封即可 delete,无需在这里再 dispatch 下一封。 + * + * 队列名:PromotionSend + */ class PromotionSend { + private $oQueueJob; + + public function __construct() + { + $this->oQueueJob = new QueueJob(); + } + public function fire(Job $job, $data) { - $taskId = intval(isset($data['task_id']) ? $data['task_id'] : 0); - $service = new PromotionService(); + $this->oQueueJob->init($job); - if (!$taskId) { -// $service->log('[PromotionSend] missing task_id, job deleted'); + $taskId = isset($data['task_id']) ? intval($data['task_id']) : 0; + if ($taskId <= 0) { + $this->oQueueJob->log("PromotionSend 无效的 task_id,删除任务"); $job->delete(); return; } -// try { - $result = $service->processNextEmail($taskId); -// $service->log('[PromotionSend] task=' . $taskId . ' result=' . json_encode($result)); + try { + $service = new PromotionService(); + $result = $service->processNextEmail($taskId); -// if (!empty($result['done'])) { -// $reason = isset($result['reason']) ? $result['reason'] : ''; -// $service->log('[PromotionSend] task=' . $taskId . ' finished, reason=' . $reason); -// } -// } catch (\Exception $e) { -// $service->log('[PromotionSend] task=' . $taskId . ' exception=' . $e->getMessage()); -// } + $done = !empty($result['done']) ? 1 : 0; + $reason = isset($result['reason']) ? $result['reason'] : ''; + $this->oQueueJob->log("PromotionSend 完成 | task_id={$taskId} done={$done} reason={$reason}"); - $job->delete(); + $job->delete(); + } catch (\Exception $e) { + $this->oQueueJob->handleException($e, $job, "task_id={$taskId}"); + } catch (\Throwable $e) { + $this->oQueueJob->handleException($e, $job, "task_id={$taskId}"); + } finally { + $this->oQueueJob->finnal(); + } } } diff --git a/application/common/PromotionService.php b/application/common/PromotionService.php index a0fdd5d..9f54ca4 100644 --- a/application/common/PromotionService.php +++ b/application/common/PromotionService.php @@ -320,7 +320,11 @@ class PromotionService return ['dispatched' => 0, 'already_done' => true, 'error' => null]; } +// return $logIds; + + foreach ($logIds as $logId) { + echo $logId."----"; $this->enqueuePrepareEmail(intval($logId)); } @@ -590,6 +594,10 @@ class PromotionService ]; } + + + + /** * 解析 task 的目标领域名称列表。 * diff --git a/application/common/QueueJob.php b/application/common/QueueJob.php index 87462de..fde9ae2 100644 --- a/application/common/QueueJob.php +++ b/application/common/QueueJob.php @@ -152,6 +152,54 @@ class QueueJob $job->delete(); } + /** + * 简化版异常处理(无 Redis 锁场景)。 + * + * 适用于一对一类型的 job:每个 job 处理一个自然唯一的资源(如 log_id/expert_id), + * 不需要 Redis 锁也不会重复消费。 + * + * 行为: + * - 致命 DB 错误(连接断开等)→ release 任务回队列 + exit(1) 让 supervisor 重启进程 + * - 普通异常 → 按 attempts 限次重试(默认最多 3 次,超过 delete) + * + * @param \Exception|\Throwable $e + * @param \think\queue\Job $job + * @param string $context 日志中显示的上下文标识(如 "log_id=123" / "task_id=45") + */ + public function handleException($e, $job, $context = '') + { + $msg = $e->getMessage(); + $this->log("队列异常 | {$context} | msg={$msg}"); + + if ($e instanceof \Exception && $this->isFatalDatabaseError($e)) { + $this->log("致命 DB 错误,释放任务并退出进程让 supervisor 重启 | {$context}"); + $job->release(60); + exit(1); + } + // 兼容 \Throwable(PHP7 fatal 等) + if (!($e instanceof \Exception)) { + $low = strtolower((string)$msg); + $hits = ['mysql server has gone away', 'lost connection to mysql', 'gone away', 'sqlstate[hy000]']; + foreach ($hits as $kw) { + if (strpos($low, $kw) !== false) { + $this->log("致命 DB 错误(Throwable),释放任务并退出进程 | {$context}"); + $job->release(60); + exit(1); + } + } + } + + $attempts = method_exists($job, 'attempts') ? $job->attempts() : 1; + if ($attempts >= $this->maxRetries) { + $this->log("超过最大重试次数({$this->maxRetries}),删除任务 | {$context}"); + $job->delete(); + return; + } + $delay = $this->getRetryDelay($msg); + $this->log("{$delay} 秒后重试({$attempts}/{$this->maxRetries}) | {$context}"); + $job->release($delay); + } + // 判断是否为需要重启的致命数据库错误 private function isFatalDatabaseError(\Exception $e) { diff --git a/application/database.php b/application/database.php index 4520859..0295739 100644 --- a/application/database.php +++ b/application/database.php @@ -57,4 +57,6 @@ return [ 'datetime_format' => 'Y-m-d H:i:s', // 是否需要进行SQL性能分析 'sql_explain' => false, + // 断线重连(长驻 worker 必开):MySQL wait_timeout 关闭连接后,下一次查询自动重连 + 'break_reconnect' => true, ];