修改自动推广的相关任务

This commit is contained in:
wangjinlei
2026-04-29 17:59:13 +08:00
parent f2fabac740
commit 9a9a69333b
10 changed files with 340 additions and 98 deletions

View File

@@ -6,6 +6,7 @@ use think\Db;
use think\Env;
use think\Cache;
use think\Queue;
use think\Config;
use PHPMailer\PHPMailer\PHPMailer;
use think\Validate;
use app\common\PromotionService;
@@ -1758,7 +1759,24 @@ class EmailClient extends Base
return jsonError($rule->getError());
}
$service = new PromotionService();
$service->dispatchPrepareEmails($data['id']);
$taskId = intval($data['id']);
// 调用前快照:用于解释"为什么没入队"
$task = \think\Db::name('promotion_task')->where('task_id', $taskId)->find();
$pending = \think\Db::name('promotion_email_log')
->where('task_id', $taskId)
->where('state', 0)
->where('prepared_at', 0)
->count();
$result = $service->dispatchPrepareEmails($taskId);
return jsonSuccess([
'task_id' => $taskId,
'task_state' => $task ? intval($task['state']) : null, // 0 才能 dispatch5 已准备完
'pending_before' => intval($pending), // 调用前还能入队的 log 数
'dispatch_result' => $result, // ['dispatched' => N, ...]
]);
}
@@ -1771,7 +1789,65 @@ class EmailClient extends Base
return jsonError($rule->getError());
}
$service = new PromotionService();
$service->prepareSingleEmail($data['id']);
$result = $service->prepareSingleEmail(intval($data['id']));
return jsonSuccess($result);
}
/**
* 队列调试:查看 Redis 里队列长度(不依赖 redis-cli
*
* GET/POST 参数:
* queue 队列名(默认 PromotionPrepareEmail
*
* 返回:
* - ready/reserved/delayed 长度
* - queue 配置关键字段
*/
public function queueDebug()
{
$queue = trim((string)$this->request->param('queue', 'PromotionPrepareEmail'));
if ($queue === '') {
$queue = 'PromotionPrepareEmail';
}
try {
$cfg = Config::get('queue');
$redis = new \Redis();
$connectMethod = !empty($cfg['persistent']) ? 'pconnect' : 'connect';
$redis->$connectMethod($cfg['host'] ?? '127.0.0.1', $cfg['port'] ?? 6379);
if (!empty($cfg['password'])) {
$redis->auth($cfg['password']);
}
$redis->select($cfg['select'] ?? 0);
$readyKey = 'queues:' . $queue;
$reservedKey = 'queues:' . $queue . ':reserved';
$delayedKey = 'queues:' . $queue . ':delayed';
return jsonSuccess([
'queue' => $queue,
'redis' => [
'db' => $cfg['select'] ?? 0,
'host' => $cfg['host'] ?? '127.0.0.1',
'port' => $cfg['port'] ?? 6379,
'persistent' => !empty($cfg['persistent']) ? 1 : 0,
'expire' => $cfg['expire'],
],
'keys' => [
'ready' => $readyKey,
'reserved' => $reservedKey,
'delayed' => $delayedKey,
],
'len' => [
'ready' => (int)$redis->lLen($readyKey),
'reserved' => (int)$redis->zCard($reservedKey),
'delayed' => (int)$redis->zCard($delayedKey),
],
'ping' => (string)$redis->ping(),
]);
} catch (\Throwable $e) {
return jsonError('queueDebug failed: ' . $e->getMessage());
}
}
/**
@@ -2395,6 +2471,17 @@ class EmailClient extends Base
return jsonSuccess($result);
}
public function testYbpard(){
$service = new PromotionService();
$expert['application_link_yeditorial_board'] = $service->buildYboardApplyUrl(
intval(259116),
intval(3)
);
return jsonSuccess($expert);
}
/**
* 每日自动触发发送(把 prepared 的任务启动),供 crontab 调用
*

View File

@@ -4,26 +4,52 @@ namespace app\api\job;
use think\queue\Job;
use app\common\ExpertFinderService;
use app\common\QueueJob;
/**
* 专家抓取队列任务
* 专家抓取队列任务
* 注意:此任务推送到队列名 "FetchExperts",必须单独启动 worker 才会执行:
* php think queue:listen --queue FetchExperts
* 若只运行 queue:listen 不指定队列,默认只消费 "mail",本任务不会被执行。
* php think queue:work --queue FetchExperts --sleep=3 --tries=3 --daemon
*
* 单条任务可能耗时较长NCBI 接口翻页 + 写库),常驻 worker 受 wait_timeout 影响,
* 由 QueueJob 在进程超过 6h 或致命 DB 错误时主动 exit(1) 让 supervisor 拉起新进程。
*/
class FetchExperts
{
private $oQueueJob;
public function __construct()
{
$this->oQueueJob = new QueueJob();
}
public function fire(Job $job, $data)
{
$field = isset($data['field']) ? $data['field'] : '';
$service = new ExpertFinderService();
$service->doFetchForField(
$field,
isset($data['source']) ? $data['source'] : 'pubmed',
isset($data['per_page']) ? intval($data['per_page']) : 100,
isset($data['min_year']) ? $data['min_year'] : null
);
$this->oQueueJob->init($job);
$job->delete();
$field = isset($data['field']) ? (string)$data['field'] : '';
if ($field === '') {
$this->oQueueJob->log("FetchExperts 无效的 field删除任务");
$job->delete();
return;
}
try {
$service = new ExpertFinderService();
$service->doFetchForField(
$field,
isset($data['source']) ? $data['source'] : 'pubmed',
isset($data['per_page']) ? intval($data['per_page']) : 100,
isset($data['min_year']) ? $data['min_year'] : null
);
$this->oQueueJob->log("FetchExperts 完成 | field={$field}");
$job->delete();
} catch (\Exception $e) {
$this->oQueueJob->handleException($e, $job, "field={$field}");
} catch (\Throwable $e) {
$this->oQueueJob->handleException($e, $job, "field={$field}");
} finally {
$this->oQueueJob->finnal();
}
}
}

View File

@@ -4,34 +4,51 @@ namespace app\api\job;
use think\queue\Job;
use app\common\ExpertFinderService;
use app\common\QueueJob;
/**
* 队列任务:用本地大模型从 affiliation 推断国家,写入 expert.country_id / country。
* 处理完当前专家后,自动找下一个推入同一队列(链式执行),直到全部处理完。
*
* 支持多队列并行:通过 $data['queue'] 和 $data['chat_url'] 区分不同的链/模型。
*
* 单条任务受本地 LLM 响应时间影响(一般 2-10s常驻 worker 由 QueueJob
* 在进程超 6h 或遇致命 DB 错误时主动 exit(1) 让 supervisor 拉起新进程。
*/
class FillExpertCountry
{
private $oQueueJob;
public function __construct()
{
$this->oQueueJob = new QueueJob();
}
public function fire(Job $job, $data)
{
$this->oQueueJob->init($job);
$expertId = intval(isset($data['expert_id']) ? $data['expert_id'] : 0);
$affiliation = isset($data['affiliation']) ? trim((string)$data['affiliation']) : '';
$queue = isset($data['queue']) ? (string)$data['queue'] : 'FetchExperts';
$queue = isset($data['queue']) ? (string)$data['queue'] : 'FetchExperts';
$chatUrl = isset($data['chat_url']) ? (string)$data['chat_url'] : '';
$service = new ExpertFinderService();
if ($expertId && $affiliation !== '') {
try {
try {
if ($expertId && $affiliation !== '') {
$service->fillExpertCountry($expertId, $affiliation, $chatUrl);
} catch (\Exception $e) {
$service->log('[FillExpertCountry] expert_id=' . $expertId . ' queue=' . $queue . ' exception=' . $e->getMessage());
}
$job->delete();
// 链式:处理完当前专家立刻拉下一个进来
$service->enqueueNextCountryFill(1, $queue, $chatUrl);
} catch (\Exception $e) {
$this->oQueueJob->handleException($e, $job, "expert_id={$expertId} queue={$queue}");
} catch (\Throwable $e) {
$this->oQueueJob->handleException($e, $job, "expert_id={$expertId} queue={$queue}");
} finally {
$this->oQueueJob->finnal();
}
$job->delete();
$service->enqueueNextCountryFill(1, $queue, $chatUrl);
}
}

View File

@@ -4,6 +4,7 @@ namespace app\api\job;
use think\queue\Job;
use app\common\PromotionService;
use app\common\QueueJob;
/**
* 【已废弃 / 兼容保留】
@@ -15,20 +16,35 @@ use app\common\PromotionService;
*/
class PromotionPrepare
{
private $oQueueJob;
public function __construct()
{
$this->oQueueJob = new QueueJob();
}
public function fire(Job $job, $data)
{
$taskId = intval(isset($data['task_id']) ? $data['task_id'] : 0);
$service = new PromotionService();
$this->oQueueJob->init($job);
if ($taskId > 0) {
try {
$service->enqueuePrepareTask($taskId);
$service->log('[PromotionPrepare][deprecated] forwarded task=' . $taskId . ' -> PromotionPrepareTask');
} catch (\Exception $e) {
$service->log('[PromotionPrepare][deprecated] task=' . $taskId . ' forward_exception=' . $e->getMessage());
}
$taskId = isset($data['task_id']) ? intval($data['task_id']) : 0;
if ($taskId <= 0) {
$this->oQueueJob->log("PromotionPrepare[deprecated] 无效的 task_id删除任务");
$job->delete();
return;
}
$job->delete();
try {
$service = new PromotionService();
$service->enqueuePrepareTask($taskId);
$this->oQueueJob->log("PromotionPrepare[deprecated] forwarded task_id={$taskId} -> PromotionPrepareTask");
$job->delete();
} catch (\Exception $e) {
$this->oQueueJob->handleException($e, $job, "[deprecated] task_id={$taskId}");
} catch (\Throwable $e) {
$this->oQueueJob->handleException($e, $job, "[deprecated] task_id={$taskId}");
} finally {
$this->oQueueJob->finnal();
}
}
}

View File

@@ -4,50 +4,55 @@ namespace app\api\job;
use think\queue\Job;
use app\common\PromotionService;
use app\common\QueueJob;
/**
* 队列任务:单封邮件 prepare调用 LLM 生成个性化描述 + 渲染模板 + 写入 log
*
* 单个 Job 可能耗时较长LLM 调用 30s 超时),因此使用独立队列 promotion_email
* 多 worker 并行消费即可提升吞吐。
* 单条 job 可能耗时 30s+LLM 调用)。常驻 worker 必须配合 supervisor 守护
* 当进程超过 6h 或遇到致命 DB 错误时QueueJob 会主动 exit(1),由 supervisor 拉起新进程,
* 避开 MySQL wait_timeout 把连接关掉后所有任务持续 fail 的问题。
*
* 失败策略
* - PromotionService::prepareSingleEmail 内部已做兜底LLM 失败回退到配置文案),
* 这里遇到未捕获异常时允许重试一次,超过 3 次直接 delete 避免无限循环。
*
* 队列名promotion_email
* 启动 workerphp think queue:listen --queue promotion_email
* 启动建议
* php think queue:work --queue PromotionPrepareEmail --sleep=3 --tries=3 --daemon
*/
class PromotionPrepareEmail
{
// private $oQueueJob;
// public function __construct()
// {
// $this->oQueueJob = new QueueJob();
// }
public function fire(Job $job, $data)
{
$logId = intval(isset($data['log_id']) ? $data['log_id'] : 0);
$service = new PromotionService();
if (!$logId) {
$job->delete();
return;
}
$service->prepareSingleEmail($logId);
$job->delete();
// $this->oQueueJob->init($job);
//
$logId = isset($data['log_id']) ? intval($data['log_id']) : 0;
// if ($logId <= 0) {
// $this->oQueueJob->log("PromotionPrepareEmail 无效的 log_id删除任务");
// $job->delete();
// return;
// }
//
// try {
// $result = $service->prepareSingleEmail($logId);
// $service->log('[PromotionPrepareEmail] log=' . $logId
// . ' code=' . $result['code']
// . ' llm_status=' . $result['llm_status']
// . ' msg=' . $result['msg']);
// $job->delete();
// } catch (\Exception $e) {
// $service->log('[PromotionPrepareEmail] log=' . $logId
// . ' attempts=' . $job->attempts()
// . ' exception=' . $e->getMessage());
$service = new PromotionService();
$service->log("id:".$logId);
// $result = $service->prepareSingleEmail($logId);
//
// if ($job->attempts() >= 3) {
// $job->delete();
// } else {
// $job->release(30);
// }
// $code = isset($result['code']) ? $result['code'] : '';
// $msg = isset($result['msg']) ? $result['msg'] : '';
// $llm = isset($result['llm_status']) ? $result['llm_status'] : '';
// $this->oQueueJob->log("PromotionPrepareEmail 完成 | log_id={$logId} code={$code} llm_status={$llm} msg={$msg}");
$job->delete();
// } catch (\Exception $e) {
// $this->oQueueJob->handleException($e, $job, "log_id={$logId}");
// } catch (\Throwable $e) {
// $this->oQueueJob->handleException($e, $job, "log_id={$logId}");
// } finally {
// $this->oQueueJob->finnal();
// }
}
}

View File

@@ -4,6 +4,7 @@ namespace app\api\job;
use think\queue\Job;
use app\common\PromotionService;
use app\common\QueueJob;
/**
* 队列任务task 级别的 prepare 调度器。
@@ -12,31 +13,44 @@ use app\common\PromotionService;
* 将每封邮件拆成一条 PromotionPrepareEmail 推到 promotion_email 队列,
* 以便并行调用 LLM 生成个性化描述。
*
* 队列名:promotion
* 启动 workerphp think queue:listen --queue promotion
* 队列名:PromotionPrepareTask
*/
class PromotionPrepareTask
{
private $oQueueJob;
public function __construct()
{
$this->oQueueJob = new QueueJob();
}
public function fire(Job $job, $data)
{
$taskId = intval(isset($data['task_id']) ? $data['task_id'] : 0);
$service = new PromotionService();
$result = $service->dispatchPrepareEmails($taskId);
// if (!$taskId) {
// $job->delete();
// return;
// }
$this->oQueueJob->init($job);
// try {
// $result = $service->dispatchPrepareEmails($taskId);
// $service->log('[PromotionPrepareTask] task=' . $taskId
// . ' dispatched=' . $result['dispatched']
// . ' already_done=' . ($result['already_done'] ? 1 : 0)
// . (empty($result['error']) ? '' : ' error=' . $result['error']));
// } catch (\Exception $e) {
// $service->log('[PromotionPrepareTask] task=' . $taskId . ' exception=' . $e->getMessage());
// }
$taskId = isset($data['task_id']) ? intval($data['task_id']) : 0;
if ($taskId <= 0) {
$this->oQueueJob->log("PromotionPrepareTask 无效的 task_id删除任务");
$job->delete();
return;
}
$job->delete();
try {
$service = new PromotionService();
$result = $service->dispatchPrepareEmails($taskId);
$dispatched = isset($result['dispatched']) ? $result['dispatched'] : 0;
$alreadyDone = isset($result['already_done']) ? $result['already_done'] : 0;
$err = isset($result['error']) ? $result['error'] : '';
$this->oQueueJob->log("PromotionPrepareTask 完成 | task_id={$taskId} dispatched={$dispatched} already_done={$alreadyDone} error={$err}");
$job->delete();
} catch (\Exception $e) {
$this->oQueueJob->handleException($e, $job, "task_id={$taskId}");
} catch (\Throwable $e) {
$this->oQueueJob->handleException($e, $job, "task_id={$taskId}");
} finally {
$this->oQueueJob->finnal();
}
}
}

View File

@@ -4,32 +4,51 @@ namespace app\api\job;
use think\queue\Job;
use app\common\PromotionService;
use app\common\QueueJob;
/**
* 队列任务:发送 task 下"已 prepare"的邮件,按 min/max_interval 控制节奏。
*
* processNextEmail 内部会自动把下一封邮件 later() 入队(链式延迟),
* 因此本 job 处理完一封即可 delete无需在这里再 dispatch 下一封。
*
* 队列名PromotionSend
*/
class PromotionSend
{
private $oQueueJob;
public function __construct()
{
$this->oQueueJob = new QueueJob();
}
public function fire(Job $job, $data)
{
$taskId = intval(isset($data['task_id']) ? $data['task_id'] : 0);
$service = new PromotionService();
$this->oQueueJob->init($job);
if (!$taskId) {
// $service->log('[PromotionSend] missing task_id, job deleted');
$taskId = isset($data['task_id']) ? intval($data['task_id']) : 0;
if ($taskId <= 0) {
$this->oQueueJob->log("PromotionSend 无效的 task_id删除任务");
$job->delete();
return;
}
// try {
$result = $service->processNextEmail($taskId);
// $service->log('[PromotionSend] task=' . $taskId . ' result=' . json_encode($result));
try {
$service = new PromotionService();
$result = $service->processNextEmail($taskId);
// if (!empty($result['done'])) {
// $reason = isset($result['reason']) ? $result['reason'] : '';
// $service->log('[PromotionSend] task=' . $taskId . ' finished, reason=' . $reason);
// }
// } catch (\Exception $e) {
// $service->log('[PromotionSend] task=' . $taskId . ' exception=' . $e->getMessage());
// }
$done = !empty($result['done']) ? 1 : 0;
$reason = isset($result['reason']) ? $result['reason'] : '';
$this->oQueueJob->log("PromotionSend 完成 | task_id={$taskId} done={$done} reason={$reason}");
$job->delete();
$job->delete();
} catch (\Exception $e) {
$this->oQueueJob->handleException($e, $job, "task_id={$taskId}");
} catch (\Throwable $e) {
$this->oQueueJob->handleException($e, $job, "task_id={$taskId}");
} finally {
$this->oQueueJob->finnal();
}
}
}

View File

@@ -320,7 +320,11 @@ class PromotionService
return ['dispatched' => 0, 'already_done' => true, 'error' => null];
}
// return $logIds;
foreach ($logIds as $logId) {
echo $logId."----";
$this->enqueuePrepareEmail(intval($logId));
}
@@ -590,6 +594,10 @@ class PromotionService
];
}
/**
* 解析 task 的目标领域名称列表。
*

View File

@@ -152,6 +152,54 @@ class QueueJob
$job->delete();
}
/**
* 简化版异常处理(无 Redis 锁场景)。
*
* 适用于一对一类型的 job每个 job 处理一个自然唯一的资源(如 log_id/expert_id
* 不需要 Redis 锁也不会重复消费。
*
* 行为:
* - 致命 DB 错误(连接断开等)→ release 任务回队列 + exit(1) 让 supervisor 重启进程
* - 普通异常 → 按 attempts 限次重试(默认最多 3 次,超过 delete
*
* @param \Exception|\Throwable $e
* @param \think\queue\Job $job
* @param string $context 日志中显示的上下文标识(如 "log_id=123" / "task_id=45"
*/
public function handleException($e, $job, $context = '')
{
$msg = $e->getMessage();
$this->log("队列异常 | {$context} | msg={$msg}");
if ($e instanceof \Exception && $this->isFatalDatabaseError($e)) {
$this->log("致命 DB 错误,释放任务并退出进程让 supervisor 重启 | {$context}");
$job->release(60);
exit(1);
}
// 兼容 \ThrowablePHP7 fatal 等)
if (!($e instanceof \Exception)) {
$low = strtolower((string)$msg);
$hits = ['mysql server has gone away', 'lost connection to mysql', 'gone away', 'sqlstate[hy000]'];
foreach ($hits as $kw) {
if (strpos($low, $kw) !== false) {
$this->log("致命 DB 错误Throwable释放任务并退出进程 | {$context}");
$job->release(60);
exit(1);
}
}
}
$attempts = method_exists($job, 'attempts') ? $job->attempts() : 1;
if ($attempts >= $this->maxRetries) {
$this->log("超过最大重试次数({$this->maxRetries}),删除任务 | {$context}");
$job->delete();
return;
}
$delay = $this->getRetryDelay($msg);
$this->log("{$delay} 秒后重试({$attempts}/{$this->maxRetries}) | {$context}");
$job->release($delay);
}
// 判断是否为需要重启的致命数据库错误
private function isFatalDatabaseError(\Exception $e)
{

View File

@@ -57,4 +57,6 @@ return [
'datetime_format' => 'Y-m-d H:i:s',
// 是否需要进行SQL性能分析
'sql_explain' => false,
// 断线重连(长驻 worker 必开MySQL wait_timeout 关闭连接后,下一次查询自动重连
'break_reconnect' => true,
];