diff --git a/.env b/.env index 76a1d6d..d4136a1 100644 --- a/.env +++ b/.env @@ -19,6 +19,11 @@ client_id = 616562 client_secret = CfMDrllyqBTFKrUkO2XaE7OmWTYqP3yd hmac = 8aU8WnITYhwaGTXH +[promotion] +PROMOTION_LLM_URL=http://chat.taimed.cn/v1/chat/completions +PROMOTION_LLM_MODEL=your-model-name +PROMOTION_LLM_TIMEOUT=30 +PROMOTION_LLM_FALLBACK="" [journal] ;官网服务器地址 diff --git a/application/api/controller/ExpertFinder.php b/application/api/controller/ExpertFinder.php index 65f1c08..463c0ab 100644 --- a/application/api/controller/ExpertFinder.php +++ b/application/api/controller/ExpertFinder.php @@ -91,8 +91,8 @@ class ExpertFinder extends Base public function batchFillCountry(){ $service = new ExpertFinderService(); - $chain1 = $service->enqueueNextCountryFill(0, 'FetchExpertCity', ''); - $chain2 = $service->enqueueNextCountryFill(0, 'FetchExpertCity1', 'http://125.39.141.154:10002/v1/chat/completions'); +// $chain1 = $service->enqueueNextCountryFill(0, 'FetchExpertCity', ''); + $chain2 = $service->enqueueNextCountryFill(0, 'FetchExpertCityOne', 'http://125.39.141.154:10002/v1/chat/completions'); $pending = Db::name('expert') ->where('affiliation', '<>', '') @@ -101,7 +101,7 @@ class ExpertFinder extends Base ->count(); return jsonSuccess([ - 'chain1_started' => $chain1, +// 'chain1_started' => $chain1, 'chain2_started' => $chain2, 'pending' => $pending, ]); diff --git a/application/api/job/PromotionPrepare.php b/application/api/job/PromotionPrepare.php index 6fc4424..13791ef 100644 --- a/application/api/job/PromotionPrepare.php +++ b/application/api/job/PromotionPrepare.php @@ -6,10 +6,12 @@ use think\queue\Job; use app\common\PromotionService; /** - * 队列任务:对单个 promotion_task 执行 prepareTask(预生成 subject/body)。 + * 【已废弃 / 兼容保留】 * - * 队列名:promotion - * 启动 worker:php think queue:listen --queue promotion + * 旧版 task 级 prepare Job;新逻辑请使用 PromotionPrepareTask + PromotionPrepareEmail。 + * + * 本类仅为兼容队列中可能遗留的旧 payload:接收到旧 job 时转发到新的调度器, + * 保证旧消息不会丢失。新代码不应再 push 此 Job。 */ class PromotionPrepare { @@ -18,18 +20,13 @@ class PromotionPrepare $taskId = intval(isset($data['task_id']) ? $data['task_id'] : 0); $service = new PromotionService(); - if (!$taskId) { - $job->delete(); - return; - } - - try { - $result = $service->prepareTask($taskId); - $service->log('[PromotionPrepare] task=' . $taskId - . ' prepared=' . $result['prepared'] - . ' failed=' . $result['failed']); - } catch (\Exception $e) { - $service->log('[PromotionPrepare] task=' . $taskId . ' exception=' . $e->getMessage()); + if ($taskId > 0) { + try { + $service->enqueuePrepareTask($taskId); + $service->log('[PromotionPrepare][deprecated] forwarded task=' . $taskId . ' -> PromotionPrepareTask'); + } catch (\Exception $e) { + $service->log('[PromotionPrepare][deprecated] task=' . $taskId . ' forward_exception=' . $e->getMessage()); + } } $job->delete(); diff --git a/application/api/job/PromotionPrepareEmail.php b/application/api/job/PromotionPrepareEmail.php new file mode 100644 index 0000000..793e8c3 --- /dev/null +++ b/application/api/job/PromotionPrepareEmail.php @@ -0,0 +1,52 @@ +delete(); + return; + } + + try { + $result = $service->prepareSingleEmail($logId); + $service->log('[PromotionPrepareEmail] log=' . $logId + . ' code=' . $result['code'] + . ' llm_status=' . $result['llm_status'] + . ' msg=' . $result['msg']); + $job->delete(); + } catch (\Exception $e) { + $service->log('[PromotionPrepareEmail] log=' . $logId + . ' attempts=' . $job->attempts() + . ' exception=' . $e->getMessage()); + + if ($job->attempts() >= 3) { + $job->delete(); + } else { + $job->release(30); + } + } + } +} diff --git a/application/api/job/PromotionPrepareTask.php b/application/api/job/PromotionPrepareTask.php new file mode 100644 index 0000000..a0cf2dd --- /dev/null +++ b/application/api/job/PromotionPrepareTask.php @@ -0,0 +1,42 @@ +delete(); + return; + } + + try { + $result = $service->dispatchPrepareEmails($taskId); + $service->log('[PromotionPrepareTask] task=' . $taskId + . ' dispatched=' . $result['dispatched'] + . ' already_done=' . ($result['already_done'] ? 1 : 0) + . (empty($result['error']) ? '' : ' error=' . $result['error'])); + } catch (\Exception $e) { + $service->log('[PromotionPrepareTask] task=' . $taskId . ' exception=' . $e->getMessage()); + } + + $job->delete(); + } +} diff --git a/application/common/PromotionLlmService.php b/application/common/PromotionLlmService.php new file mode 100644 index 0000000..24345c0 --- /dev/null +++ b/application/common/PromotionLlmService.php @@ -0,0 +1,176 @@ + 1|2|3, 'text' => string] + * 1 = LLM 成功生成 + * 2 = LLM 调用失败,text = 兜底文案 + * 3 = 跳过(缺少代表作等前置条件),text = 兜底文案 + */ +class PromotionLlmService +{ + private $url; + private $model; + private $timeout; + private $apiKey; + private $fallback; + + public function __construct() + { + $this->url = trim((string)Env::get('promotion.promotion_llm_url', '')); + $this->model = trim((string)Env::get('promotion.promotion_llm_model', '')); + $this->timeout = max(5, intval(Env::get('promotion.promotion_llm_timeout', 30))); + $this->apiKey = trim((string)Env::get('promotion.promotion_llm_api_key', '')); + $this->fallback = trim((string)Env::get('promotion.promotion_llm_fallback', + 'Your recent work aligns closely with the scope of our journal, and we would be honored to consider a contribution from you.')); + } + + /** + * 生成个性化描述 + * + * @param array $expert 至少包含 name / representative_work_title / fields / affiliation + * @param array $journal 至少包含 title (name) / aims (可选) / databases (可选) + * @return array ['status' => 1|2|3, 'text' => string] + */ + public function generateDescription(array $expert, array $journal): array + { + $paperTitle = trim((string)($expert['representative_work_title'] ?? '')); + $expertName = trim((string)($expert['name'] ?? '')); + $journalName = trim((string)($journal['title'] ?? '')); + + if ($paperTitle === '' || $journalName === '') { + return ['status' => 3, 'text' => $this->fallback]; + } + + if ($this->url === '' || $this->model === '') { + return ['status' => 2, 'text' => $this->fallback]; + } + + $expertField = trim((string)($expert['fields'] ?? ($expert['field'] ?? ''))); + $journalAims = trim((string)($journal['aims'] ?? '')); + $journalDbs = trim((string)($journal['databases'] ?? '')); + + $system = 'You are an academic editorial assistant. ' + . 'Write a short, warm, professional English paragraph (2-3 sentences, <=50 words) ' + . 'that: (1) briefly appreciates the author\'s recent paper, ' + . '(2) explains why it fits the journal\'s scope, ' + . '(3) gently invites a future submission. ' + . 'Do NOT use placeholders, do NOT add greetings, do NOT add signatures, ' + . 'output plain text only (no markdown).'; + + $userLines = []; + $userLines[] = 'Author name: ' . ($expertName !== '' ? $expertName : '(unknown)'); + if ($expertField !== '') { + $userLines[] = 'Author research field: ' . $expertField; + } + $userLines[] = 'Recent paper title: ' . $paperTitle; + $userLines[] = 'Target journal: ' . $journalName; + if ($journalAims !== '') { + $userLines[] = 'Journal aims & scope: ' . mb_substr($journalAims, 0, 500); + } + if ($journalDbs !== '') { + $userLines[] = 'Journal indexing: ' . mb_substr($journalDbs, 0, 200); + } + $userLines[] = 'Return only the paragraph, nothing else.'; + $user = implode("\n", $userLines); + + $payload = [ + 'model' => $this->model, + 'temperature' => 0.4, + 'messages' => [ + ['role' => 'system', 'content' => $system], + ['role' => 'user', 'content' => $user], + ], + ]; + + $content = $this->postChat($payload); + if ($content === null) { + return ['status' => 2, 'text' => $this->fallback]; + } + + $content = $this->cleanContent($content); + if ($content === '') { + return ['status' => 2, 'text' => $this->fallback]; + } + + return ['status' => 1, 'text' => $content]; + } + + /** + * 调用 chat/completions 接口,返回 content 字符串;失败返回 null。 + */ + private function postChat(array $payload) + { + $ch = curl_init(); + curl_setopt($ch, CURLOPT_URL, $this->url); + curl_setopt($ch, CURLOPT_POST, true); + curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($payload, JSON_UNESCAPED_UNICODE)); + curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); + curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, min(10, $this->timeout)); + curl_setopt($ch, CURLOPT_TIMEOUT, $this->timeout); + + $headers = ['Content-Type: application/json']; + if ($this->apiKey !== '') { + $headers[] = 'Authorization: Bearer ' . $this->apiKey; + } + curl_setopt($ch, CURLOPT_HTTPHEADER, $headers); + + $raw = curl_exec($ch); + if ($raw === false) { + curl_close($ch); + return null; + } + $httpCode = intval(curl_getinfo($ch, CURLINFO_HTTP_CODE)); + curl_close($ch); + if ($httpCode < 200 || $httpCode >= 300) { + return null; + } + + $data = json_decode($raw, true); + if (!is_array($data)) return null; + + if (isset($data['choices'][0]['message']['content'])) { + return (string)$data['choices'][0]['message']['content']; + } + if (isset($data['content'])) { + return (string)$data['content']; + } + return null; + } + + /** + * 清洗 LLM 输出:去除 markdown 包裹、多余空白、首尾引号、过长截断。 + */ + private function cleanContent(string $text): string + { + $text = trim($text); + $text = preg_replace('/^```[a-zA-Z]*\s*|```$/m', '', $text); + $text = trim($text); + $text = trim($text, "\"' \t\n\r\0\x0B"); + $text = preg_replace('/\s+/', ' ', $text); + if (mb_strlen($text) > 800) { + $text = mb_substr($text, 0, 800); + } + return trim($text); + } + + public function getFallback(): string + { + return $this->fallback; + } +} diff --git a/application/common/PromotionService.php b/application/common/PromotionService.php index 5f04c70..f86d00e 100644 --- a/application/common/PromotionService.php +++ b/application/common/PromotionService.php @@ -93,20 +93,36 @@ class PromotionService $expert_fields = Db::name('expert_field') ->where('expert_id', $expert['expert_id']) ->where('state', 0) + ->order('expert_field_id desc') ->select(); + + $taskFields = $this->resolveTaskFields($task); + $taskFieldLower = []; + foreach ($taskFields as $tf) { + $tf = strtolower(trim($tf)); + if ($tf !== '') $taskFieldLower[$tf] = true; + } + $fieldSet = []; - $representativeTitle = ''; + $matchedTitle = ''; + $fallbackTitle = ''; foreach ($expert_fields as $ef) { $fn = trim($ef['field']); if ($fn !== '' && !in_array($fn, $fieldSet)) { $fieldSet[] = $fn; } - if ($representativeTitle === '' && !empty($ef['paper_title'])) { - $representativeTitle = trim($ef['paper_title']); + $paper = trim((string)($ef['paper_title'] ?? '')); + if ($paper === '') continue; + if ($matchedTitle === '' && $fn !== '' && isset($taskFieldLower[strtolower($fn)])) { + $matchedTitle = $paper; } + if ($fallbackTitle === '') { + $fallbackTitle = $paper; + } + if ($matchedTitle !== '' && $fallbackTitle !== '') break; } $expert['fields'] = implode(',', $fieldSet); - $expert['representative_work_title'] = $representativeTitle; + $expert['representative_work_title'] = $matchedTitle !== '' ? $matchedTitle : $fallbackTitle; $expertVars = $this->buildExpertVars($expert); $journalVars = $this->buildJournalVars($journal); $vars = array_merge($journalVars, $expertVars); @@ -174,7 +190,254 @@ class PromotionService // ==================== 准备与触发(今日准备、明日发送) ==================== /** - * 为指定任务预生成所有待发邮件的 subject/body,写入 log;完成后将任务置为 state=5(已准备) + * 调度:为指定任务的所有待准备邮件分发 PromotionPrepareEmail 队列任务。 + * + * 每封邮件单独一条 job,可在 promotion_email 队列中并行消费(调用 LLM 生成个性化描述)。 + * 所有邮件准备完成后,由最后一条 PromotionPrepareEmail 在 finalize 阶段将任务置为 state=5。 + * + * @param int $taskId + * @return array ['dispatched' => int, 'already_done' => bool, 'error' => string|null] + */ + public function dispatchPrepareEmails($taskId) + { + $task = Db::name('promotion_task')->where('task_id', $taskId)->find(); + if (!$task) { + return ['dispatched' => 0, 'already_done' => false, 'error' => 'task_not_found']; + } + if ($task['state'] != 0) { + return ['dispatched' => 0, 'already_done' => $task['state'] == 5, 'error' => 'task_state_not_draft']; + } + + $logIds = Db::name('promotion_email_log') + ->where('task_id', $taskId) + ->where('state', 0) + ->where('prepared_at', 0) + ->order('log_id asc') + ->column('log_id'); + + if (empty($logIds)) { + // 没有需要准备的邮件,直接置为已准备 + Db::name('promotion_task')->where('task_id', $taskId)->update([ + 'state' => 5, + 'utime' => time(), + ]); + $this->log("dispatchPrepareEmails task_id={$taskId} no_logs -> state=5"); + return ['dispatched' => 0, 'already_done' => true, 'error' => null]; + } + + foreach ($logIds as $logId) { + $this->enqueuePrepareEmail(intval($logId)); + } + + $this->log("dispatchPrepareEmails task_id={$taskId} dispatched=" . count($logIds)); + return ['dispatched' => count($logIds), 'already_done' => false, 'error' => null]; + } + + /** + * 对单封邮件执行准备:拉取 expert / journal,调 LLM 生成描述,渲染模板,写回 log。 + * + * 由 PromotionPrepareEmail Job 调用;也可手工调用做调试。 + * + * @param int $logId + * @return array ['code' => 0|1, 'msg' => string, 'llm_status' => int] + */ + public function prepareSingleEmail($logId) + { + $log = Db::name('promotion_email_log')->where('log_id', $logId)->find(); + if (!$log) { + return ['code' => 1, 'msg' => 'log_not_found', 'llm_status' => 0]; + } + if ($log['state'] != 0) { + return ['code' => 1, 'msg' => 'log_state_not_pending', 'llm_status' => 0]; + } + if (!empty($log['prepared_at'])) { + return ['code' => 0, 'msg' => 'already_prepared', 'llm_status' => intval($log['llm_status'] ?? 0)]; + } + + $task = Db::name('promotion_task')->where('task_id', $log['task_id'])->find(); + if (!$task) { + Db::name('promotion_email_log')->where('log_id', $logId)->update([ + 'state' => 2, + 'error_msg' => 'Task not found', + 'send_time' => time(), + ]); + return ['code' => 1, 'msg' => 'task_not_found', 'llm_status' => 0]; + } + + $expert = Db::name('expert')->where('expert_id', $log['expert_id'])->find(); + if (!$expert) { + Db::name('promotion_email_log')->where('log_id', $logId)->update([ + 'state' => 2, + 'error_msg' => 'Expert not found', + 'send_time' => time(), + ]); + $this->tryFinalizeTask($task['task_id']); + return ['code' => 1, 'msg' => 'expert_not_found', 'llm_status' => 0]; + } + + $expert_fields = Db::name('expert_field') + ->where('expert_id', $expert['expert_id']) + ->where('state', 0) + ->order('expert_field_id desc') + ->select(); + + // 领域优先级:任务所属工厂/期刊的目标领域 + $taskFields = $this->resolveTaskFields($task); + $taskFieldLower = []; + foreach ($taskFields as $tf) { + $tf = strtolower(trim($tf)); + if ($tf !== '') $taskFieldLower[$tf] = true; + } + + $fieldSet = []; + $matchedTitle = ''; + $fallbackTitle = ''; + foreach ($expert_fields as $ef) { + $fn = trim($ef['field']); + if ($fn !== '' && !in_array($fn, $fieldSet)) { + $fieldSet[] = $fn; + } + $paper = trim((string)($ef['paper_title'] ?? '')); + if ($paper === '') continue; + + // 匹配到任务领域的 paper 优先(按最新 expert_field_id desc 顺序取第一条) + if ($matchedTitle === '' && $fn !== '' && isset($taskFieldLower[strtolower($fn)])) { + $matchedTitle = $paper; + } + if ($fallbackTitle === '') { + $fallbackTitle = $paper; + } + if ($matchedTitle !== '' && $fallbackTitle !== '') break; + } + $expert['fields'] = implode(',', $fieldSet); + $expert['representative_work_title'] = $matchedTitle !== '' ? $matchedTitle : $fallbackTitle; + + $journal = Db::name('journal')->where('journal_id', $task['journal_id'])->find(); + + // 调用 LLM 生成个性化描述(失败/缺条件时回退到兜底文案) + $llmResult = ['status' => 0, 'text' => '']; + try { + $llm = new PromotionLlmService(); + $llmResult = $llm->generateDescription($expert, $journal ?: []); + } catch (\Exception $e) { + $llmResult = ['status' => 2, 'text' => '']; + $this->log("prepareSingleEmail log_id={$logId} llm_exception=" . $e->getMessage()); + } + $llmText = (string)$llmResult['text']; + $llmStatus = intval($llmResult['status']); + + $expert['llm_description'] = $llmText; + + $expertVars = $this->buildExpertVars($expert); + $journalVars = $this->buildJournalVars($journal); + $vars = array_merge($journalVars, $expertVars); + $rendered = $this->renderFromTemplate( + $task['template_id'], + $task['journal_id'], + json_encode($vars, JSON_UNESCAPED_UNICODE), + $task['style_id'] + ); + + $now = time(); + if ($rendered['code'] !== 0) { + Db::name('promotion_email_log')->where('log_id', $logId)->update([ + 'state' => 2, + 'error_msg' => 'Prepare failed: ' . $rendered['msg'], + 'llm_description' => mb_substr($llmText, 0, 2000), + 'llm_status' => $llmStatus, + 'send_time' => $now, + ]); + $this->tryFinalizeTask($task['task_id']); + return ['code' => 1, 'msg' => $rendered['msg'], 'llm_status' => $llmStatus]; + } + + Db::name('promotion_email_log')->where('log_id', $logId)->update([ + 'subject_prepared' => mb_substr($rendered['data']['subject'], 0, 512), + 'body_prepared' => $rendered['data']['body'], + 'llm_description' => mb_substr($llmText, 0, 2000), + 'llm_status' => $llmStatus, + 'prepared_at' => $now, + ]); + + $this->tryFinalizeTask($task['task_id']); + + return ['code' => 0, 'msg' => 'ok', 'llm_status' => $llmStatus]; + } + + /** + * 解析 task 的目标领域名称列表。 + * + * 优先级: + * 1. task.factory_id > 0:读取对应 promotion_factory.fetch_ids,联 t_expert_fetch 取 field + * 2. 否则(手工任务):走 journal_promotion_field → t_expert_fetch 取 field + * + * 返回空数组表示没有限定领域。 + */ + public function resolveTaskFields($task) + { + if (!is_array($task)) return []; + + $factoryId = intval($task['factory_id'] ?? 0); + if ($factoryId > 0) { + $factory = Db::name('promotion_factory') + ->where('promotion_factory_id', $factoryId) + ->find(); + if ($factory && !empty($factory['fetch_ids'])) { + $ids = array_filter(array_map('intval', explode(',', $factory['fetch_ids']))); + if (!empty($ids)) { + $fields = Db::name('expert_fetch') + ->where('expert_fetch_id', 'in', $ids) + ->where('state', 0) + ->column('field'); + return array_values(array_unique(array_filter(array_map('trim', $fields)))); + } + } + return []; + } + + // 手工任务走期刊的推广领域绑定 + $journalId = intval($task['journal_id'] ?? 0); + if ($journalId <= 0) return []; + $fields = Db::name('journal_promotion_field') + ->alias('jpf') + ->join('t_expert_fetch ef_fetch', 'ef_fetch.expert_fetch_id = jpf.expert_fetch_id', 'inner') + ->where('jpf.journal_id', $journalId) + ->where('jpf.state', 0) + ->where('ef_fetch.state', 0) + ->column('ef_fetch.field'); + return array_values(array_unique(array_filter(array_map('trim', $fields)))); + } + + /** + * 检查 task 下还有无未准备的 log,若全部完成则将 task 置为 state=5(已准备)。 + */ + public function tryFinalizeTask($taskId) + { + $taskId = intval($taskId); + if ($taskId <= 0) return; + + $task = Db::name('promotion_task')->where('task_id', $taskId)->find(); + if (!$task || $task['state'] != 0) return; + + $remaining = Db::name('promotion_email_log') + ->where('task_id', $taskId) + ->where('state', 0) + ->where('prepared_at', 0) + ->count(); + + if ($remaining == 0) { + Db::name('promotion_task')->where('task_id', $taskId)->update([ + 'state' => 5, + 'utime' => time(), + ]); + $this->log("tryFinalizeTask task_id={$taskId} -> state=5"); + } + } + + /** + * 兼容旧接口:同步为 task 内所有邮件执行准备(不建议在 HTTP 请求中直接调用, + * 大邮件量场景请走 dispatchPrepareEmails)。 + * * @param int $taskId * @return array ['prepared' => int, 'failed' => int, 'error' => string|null] */ @@ -188,80 +451,28 @@ class PromotionService return ['prepared' => 0, 'failed' => 0, 'error' => 'task_state_not_draft']; } - $journal = Db::name('journal')->where('journal_id', $task['journal_id'])->find(); - $logs = Db::name('promotion_email_log') + $logIds = Db::name('promotion_email_log') ->where('task_id', $taskId) ->where('state', 0) ->where('prepared_at', 0) ->order('log_id asc') - ->select(); + ->column('log_id'); $prepared = 0; $failed = 0; - $now = time(); - - $journalVars = $this->buildJournalVars($journal); - foreach ($logs as $log) { - $expert = Db::name('expert')->where('expert_id', $log['expert_id'])->find(); - if (!$expert) { - Db::name('promotion_email_log')->where('log_id', $log['log_id'])->update([ - 'state' => 2, - 'error_msg' => 'Expert not found', - 'send_time' => $now, - ]); + foreach ($logIds as $logId) { + $r = $this->prepareSingleEmail(intval($logId)); + if ($r['code'] === 0) { + $prepared++; + } else { $failed++; - continue; } - - $expert_fields = Db::name('expert_field') - ->where('expert_id', $expert['expert_id']) - ->where('state', 0) - ->select(); - - $fieldSet = []; - $representativeTitle = ''; - foreach ($expert_fields as $ef) { - $fn = trim($ef['field']); - if ($fn !== '' && !in_array($fn, $fieldSet)) { - $fieldSet[] = $fn; - } - if ($representativeTitle === '' && !empty($ef['paper_title'])) { - $representativeTitle = trim($ef['paper_title']); - } - } - $expert['fields'] = implode(',', $fieldSet); - $expert['representative_work_title'] = $representativeTitle; - - $expertVars = $this->buildExpertVars($expert); - $vars = array_merge($journalVars, $expertVars); - $rendered = $this->renderFromTemplate( - $task['template_id'], - $task['journal_id'], - json_encode($vars, JSON_UNESCAPED_UNICODE), - $task['style_id'] - ); - - if ($rendered['code'] !== 0) { - Db::name('promotion_email_log')->where('log_id', $log['log_id'])->update([ - 'state' => 2, - 'error_msg' => 'Prepare failed: ' . $rendered['msg'], - 'send_time' => $now, - ]); - $failed++; - continue; - } - - Db::name('promotion_email_log')->where('log_id', $log['log_id'])->update([ - 'subject_prepared' => mb_substr($rendered['data']['subject'], 0, 512), - 'body_prepared' => $rendered['data']['body'], - 'prepared_at' => $now, - ]); - $prepared++; } - Db::name('promotion_task')->where('task_id', $taskId)->update([ + // 兜底:即使 tryFinalizeTask 已触发,这里再保证 state 置为 5 + Db::name('promotion_task')->where('task_id', $taskId)->where('state', 0)->update([ 'state' => 5, - 'utime' => $now, + 'utime' => time(), ]); $this->log("prepareTask task_id={$taskId} prepared={$prepared} failed={$failed}"); @@ -297,17 +508,38 @@ class PromotionService } /** - * 将单个 task 的 prepare 推入队列异步执行 + * 将单个 task 的 prepare 推入队列异步执行(调度层)。 + * + * 调度 Job 会进一步把每封邮件拆分成 PromotionPrepareEmail 推到 promotion_email 队列, + * 以便并行调用 LLM 生成个性化描述。 */ public function enqueuePrepareTask($taskId, $delay = 0) { - $jobClass = 'app\api\job\PromotionPrepare@fire'; + $jobClass = 'app\api\job\PromotionPrepareTask@fire'; $data = ['task_id' => intval($taskId)]; if ($delay > 0) { - Queue::later($delay, $jobClass, $data, 'promotion'); + Queue::later($delay, $jobClass, $data, 'promotionSend'); } else { - Queue::push($jobClass, $data, 'promotion'); + Queue::push($jobClass, $data, 'promotionSend'); + } + } + + /** + * 将单封邮件的 prepare 推入队列异步执行(LLM 层)。 + * + * 队列名:promotion_email + * 启动 worker:php think queue:listen --queue promotion_email + */ + public function enqueuePrepareEmail($logId, $delay = 0) + { + $jobClass = 'app\api\job\PromotionPrepareEmail@fire'; + $data = ['log_id' => intval($logId)]; + + if ($delay > 0) { + Queue::later($delay, $jobClass, $data, 'promotion_email'); + } else { + Queue::push($jobClass, $data, 'promotion_email'); } } @@ -491,6 +723,7 @@ class PromotionService public function buildExpertVars($expert) { + $llm = $expert['llm_description'] ?? ''; return [ 'expert_title' => "Ph.D", 'expert_name' => $expert['name'] ?? '', @@ -498,6 +731,8 @@ class PromotionService 'expert_affiliation' => $expert['affiliation'] ?? '', 'expert_field' => $expert['fields'] ?? ($expert['field'] ?? ''), 'representative_work_title' => $expert['representative_work_title'] ?? '', + 'llm_description' => $llm, + 'ai_content_analysis' => $llm, ]; } diff --git a/sql/add_llm_fields_to_promotion_email_log.sql b/sql/add_llm_fields_to_promotion_email_log.sql new file mode 100644 index 0000000..09e9107 --- /dev/null +++ b/sql/add_llm_fields_to_promotion_email_log.sql @@ -0,0 +1,7 @@ +-- 为 t_promotion_email_log 增加 LLM 生成字段 +-- llm_description: 大模型生成的个性化描述(用于模板变量 {{llm_description}}) +-- llm_status: 0=未处理 1=成功 2=失败已回退兜底 3=跳过(缺代表作) + +ALTER TABLE `t_promotion_email_log` + ADD COLUMN `llm_description` TEXT NULL COMMENT '大模型生成的个性化描述' AFTER `body_prepared`, + ADD COLUMN `llm_status` TINYINT NOT NULL DEFAULT 0 COMMENT 'LLM 处理状态: 0未处理 1成功 2失败兜底 3跳过' AFTER `llm_description`;