修改自动推广的相关任务

This commit is contained in:
wangjinlei
2026-04-24 13:13:06 +08:00
parent 48b2335063
commit e1bf8c0189
8 changed files with 603 additions and 89 deletions

View File

@@ -93,20 +93,36 @@ class PromotionService
$expert_fields = Db::name('expert_field')
->where('expert_id', $expert['expert_id'])
->where('state', 0)
->order('expert_field_id desc')
->select();
$taskFields = $this->resolveTaskFields($task);
$taskFieldLower = [];
foreach ($taskFields as $tf) {
$tf = strtolower(trim($tf));
if ($tf !== '') $taskFieldLower[$tf] = true;
}
$fieldSet = [];
$representativeTitle = '';
$matchedTitle = '';
$fallbackTitle = '';
foreach ($expert_fields as $ef) {
$fn = trim($ef['field']);
if ($fn !== '' && !in_array($fn, $fieldSet)) {
$fieldSet[] = $fn;
}
if ($representativeTitle === '' && !empty($ef['paper_title'])) {
$representativeTitle = trim($ef['paper_title']);
$paper = trim((string)($ef['paper_title'] ?? ''));
if ($paper === '') continue;
if ($matchedTitle === '' && $fn !== '' && isset($taskFieldLower[strtolower($fn)])) {
$matchedTitle = $paper;
}
if ($fallbackTitle === '') {
$fallbackTitle = $paper;
}
if ($matchedTitle !== '' && $fallbackTitle !== '') break;
}
$expert['fields'] = implode(',', $fieldSet);
$expert['representative_work_title'] = $representativeTitle;
$expert['representative_work_title'] = $matchedTitle !== '' ? $matchedTitle : $fallbackTitle;
$expertVars = $this->buildExpertVars($expert);
$journalVars = $this->buildJournalVars($journal);
$vars = array_merge($journalVars, $expertVars);
@@ -174,7 +190,254 @@ class PromotionService
// ==================== 准备与触发(今日准备、明日发送) ====================
/**
* 为指定任务预生成所有待发邮件的 subject/body写入 log完成后将任务置为 state=5已准备
* 调度:为指定任务的所有待准备邮件分发 PromotionPrepareEmail 队列任务。
*
* 每封邮件单独一条 job可在 promotion_email 队列中并行消费(调用 LLM 生成个性化描述)。
* 所有邮件准备完成后,由最后一条 PromotionPrepareEmail 在 finalize 阶段将任务置为 state=5。
*
* @param int $taskId
* @return array ['dispatched' => int, 'already_done' => bool, 'error' => string|null]
*/
public function dispatchPrepareEmails($taskId)
{
$task = Db::name('promotion_task')->where('task_id', $taskId)->find();
if (!$task) {
return ['dispatched' => 0, 'already_done' => false, 'error' => 'task_not_found'];
}
if ($task['state'] != 0) {
return ['dispatched' => 0, 'already_done' => $task['state'] == 5, 'error' => 'task_state_not_draft'];
}
$logIds = Db::name('promotion_email_log')
->where('task_id', $taskId)
->where('state', 0)
->where('prepared_at', 0)
->order('log_id asc')
->column('log_id');
if (empty($logIds)) {
// 没有需要准备的邮件,直接置为已准备
Db::name('promotion_task')->where('task_id', $taskId)->update([
'state' => 5,
'utime' => time(),
]);
$this->log("dispatchPrepareEmails task_id={$taskId} no_logs -> state=5");
return ['dispatched' => 0, 'already_done' => true, 'error' => null];
}
foreach ($logIds as $logId) {
$this->enqueuePrepareEmail(intval($logId));
}
$this->log("dispatchPrepareEmails task_id={$taskId} dispatched=" . count($logIds));
return ['dispatched' => count($logIds), 'already_done' => false, 'error' => null];
}
/**
* 对单封邮件执行准备:拉取 expert / journal调 LLM 生成描述,渲染模板,写回 log。
*
* 由 PromotionPrepareEmail Job 调用;也可手工调用做调试。
*
* @param int $logId
* @return array ['code' => 0|1, 'msg' => string, 'llm_status' => int]
*/
public function prepareSingleEmail($logId)
{
$log = Db::name('promotion_email_log')->where('log_id', $logId)->find();
if (!$log) {
return ['code' => 1, 'msg' => 'log_not_found', 'llm_status' => 0];
}
if ($log['state'] != 0) {
return ['code' => 1, 'msg' => 'log_state_not_pending', 'llm_status' => 0];
}
if (!empty($log['prepared_at'])) {
return ['code' => 0, 'msg' => 'already_prepared', 'llm_status' => intval($log['llm_status'] ?? 0)];
}
$task = Db::name('promotion_task')->where('task_id', $log['task_id'])->find();
if (!$task) {
Db::name('promotion_email_log')->where('log_id', $logId)->update([
'state' => 2,
'error_msg' => 'Task not found',
'send_time' => time(),
]);
return ['code' => 1, 'msg' => 'task_not_found', 'llm_status' => 0];
}
$expert = Db::name('expert')->where('expert_id', $log['expert_id'])->find();
if (!$expert) {
Db::name('promotion_email_log')->where('log_id', $logId)->update([
'state' => 2,
'error_msg' => 'Expert not found',
'send_time' => time(),
]);
$this->tryFinalizeTask($task['task_id']);
return ['code' => 1, 'msg' => 'expert_not_found', 'llm_status' => 0];
}
$expert_fields = Db::name('expert_field')
->where('expert_id', $expert['expert_id'])
->where('state', 0)
->order('expert_field_id desc')
->select();
// 领域优先级:任务所属工厂/期刊的目标领域
$taskFields = $this->resolveTaskFields($task);
$taskFieldLower = [];
foreach ($taskFields as $tf) {
$tf = strtolower(trim($tf));
if ($tf !== '') $taskFieldLower[$tf] = true;
}
$fieldSet = [];
$matchedTitle = '';
$fallbackTitle = '';
foreach ($expert_fields as $ef) {
$fn = trim($ef['field']);
if ($fn !== '' && !in_array($fn, $fieldSet)) {
$fieldSet[] = $fn;
}
$paper = trim((string)($ef['paper_title'] ?? ''));
if ($paper === '') continue;
// 匹配到任务领域的 paper 优先(按最新 expert_field_id desc 顺序取第一条)
if ($matchedTitle === '' && $fn !== '' && isset($taskFieldLower[strtolower($fn)])) {
$matchedTitle = $paper;
}
if ($fallbackTitle === '') {
$fallbackTitle = $paper;
}
if ($matchedTitle !== '' && $fallbackTitle !== '') break;
}
$expert['fields'] = implode(',', $fieldSet);
$expert['representative_work_title'] = $matchedTitle !== '' ? $matchedTitle : $fallbackTitle;
$journal = Db::name('journal')->where('journal_id', $task['journal_id'])->find();
// 调用 LLM 生成个性化描述(失败/缺条件时回退到兜底文案)
$llmResult = ['status' => 0, 'text' => ''];
try {
$llm = new PromotionLlmService();
$llmResult = $llm->generateDescription($expert, $journal ?: []);
} catch (\Exception $e) {
$llmResult = ['status' => 2, 'text' => ''];
$this->log("prepareSingleEmail log_id={$logId} llm_exception=" . $e->getMessage());
}
$llmText = (string)$llmResult['text'];
$llmStatus = intval($llmResult['status']);
$expert['llm_description'] = $llmText;
$expertVars = $this->buildExpertVars($expert);
$journalVars = $this->buildJournalVars($journal);
$vars = array_merge($journalVars, $expertVars);
$rendered = $this->renderFromTemplate(
$task['template_id'],
$task['journal_id'],
json_encode($vars, JSON_UNESCAPED_UNICODE),
$task['style_id']
);
$now = time();
if ($rendered['code'] !== 0) {
Db::name('promotion_email_log')->where('log_id', $logId)->update([
'state' => 2,
'error_msg' => 'Prepare failed: ' . $rendered['msg'],
'llm_description' => mb_substr($llmText, 0, 2000),
'llm_status' => $llmStatus,
'send_time' => $now,
]);
$this->tryFinalizeTask($task['task_id']);
return ['code' => 1, 'msg' => $rendered['msg'], 'llm_status' => $llmStatus];
}
Db::name('promotion_email_log')->where('log_id', $logId)->update([
'subject_prepared' => mb_substr($rendered['data']['subject'], 0, 512),
'body_prepared' => $rendered['data']['body'],
'llm_description' => mb_substr($llmText, 0, 2000),
'llm_status' => $llmStatus,
'prepared_at' => $now,
]);
$this->tryFinalizeTask($task['task_id']);
return ['code' => 0, 'msg' => 'ok', 'llm_status' => $llmStatus];
}
/**
* 解析 task 的目标领域名称列表。
*
* 优先级:
* 1. task.factory_id > 0读取对应 promotion_factory.fetch_ids联 t_expert_fetch 取 field
* 2. 否则(手工任务):走 journal_promotion_field → t_expert_fetch 取 field
*
* 返回空数组表示没有限定领域。
*/
public function resolveTaskFields($task)
{
if (!is_array($task)) return [];
$factoryId = intval($task['factory_id'] ?? 0);
if ($factoryId > 0) {
$factory = Db::name('promotion_factory')
->where('promotion_factory_id', $factoryId)
->find();
if ($factory && !empty($factory['fetch_ids'])) {
$ids = array_filter(array_map('intval', explode(',', $factory['fetch_ids'])));
if (!empty($ids)) {
$fields = Db::name('expert_fetch')
->where('expert_fetch_id', 'in', $ids)
->where('state', 0)
->column('field');
return array_values(array_unique(array_filter(array_map('trim', $fields))));
}
}
return [];
}
// 手工任务走期刊的推广领域绑定
$journalId = intval($task['journal_id'] ?? 0);
if ($journalId <= 0) return [];
$fields = Db::name('journal_promotion_field')
->alias('jpf')
->join('t_expert_fetch ef_fetch', 'ef_fetch.expert_fetch_id = jpf.expert_fetch_id', 'inner')
->where('jpf.journal_id', $journalId)
->where('jpf.state', 0)
->where('ef_fetch.state', 0)
->column('ef_fetch.field');
return array_values(array_unique(array_filter(array_map('trim', $fields))));
}
/**
* 检查 task 下还有无未准备的 log若全部完成则将 task 置为 state=5已准备
*/
public function tryFinalizeTask($taskId)
{
$taskId = intval($taskId);
if ($taskId <= 0) return;
$task = Db::name('promotion_task')->where('task_id', $taskId)->find();
if (!$task || $task['state'] != 0) return;
$remaining = Db::name('promotion_email_log')
->where('task_id', $taskId)
->where('state', 0)
->where('prepared_at', 0)
->count();
if ($remaining == 0) {
Db::name('promotion_task')->where('task_id', $taskId)->update([
'state' => 5,
'utime' => time(),
]);
$this->log("tryFinalizeTask task_id={$taskId} -> state=5");
}
}
/**
* 兼容旧接口:同步为 task 内所有邮件执行准备(不建议在 HTTP 请求中直接调用,
* 大邮件量场景请走 dispatchPrepareEmails
*
* @param int $taskId
* @return array ['prepared' => int, 'failed' => int, 'error' => string|null]
*/
@@ -188,80 +451,28 @@ class PromotionService
return ['prepared' => 0, 'failed' => 0, 'error' => 'task_state_not_draft'];
}
$journal = Db::name('journal')->where('journal_id', $task['journal_id'])->find();
$logs = Db::name('promotion_email_log')
$logIds = Db::name('promotion_email_log')
->where('task_id', $taskId)
->where('state', 0)
->where('prepared_at', 0)
->order('log_id asc')
->select();
->column('log_id');
$prepared = 0;
$failed = 0;
$now = time();
$journalVars = $this->buildJournalVars($journal);
foreach ($logs as $log) {
$expert = Db::name('expert')->where('expert_id', $log['expert_id'])->find();
if (!$expert) {
Db::name('promotion_email_log')->where('log_id', $log['log_id'])->update([
'state' => 2,
'error_msg' => 'Expert not found',
'send_time' => $now,
]);
foreach ($logIds as $logId) {
$r = $this->prepareSingleEmail(intval($logId));
if ($r['code'] === 0) {
$prepared++;
} else {
$failed++;
continue;
}
$expert_fields = Db::name('expert_field')
->where('expert_id', $expert['expert_id'])
->where('state', 0)
->select();
$fieldSet = [];
$representativeTitle = '';
foreach ($expert_fields as $ef) {
$fn = trim($ef['field']);
if ($fn !== '' && !in_array($fn, $fieldSet)) {
$fieldSet[] = $fn;
}
if ($representativeTitle === '' && !empty($ef['paper_title'])) {
$representativeTitle = trim($ef['paper_title']);
}
}
$expert['fields'] = implode(',', $fieldSet);
$expert['representative_work_title'] = $representativeTitle;
$expertVars = $this->buildExpertVars($expert);
$vars = array_merge($journalVars, $expertVars);
$rendered = $this->renderFromTemplate(
$task['template_id'],
$task['journal_id'],
json_encode($vars, JSON_UNESCAPED_UNICODE),
$task['style_id']
);
if ($rendered['code'] !== 0) {
Db::name('promotion_email_log')->where('log_id', $log['log_id'])->update([
'state' => 2,
'error_msg' => 'Prepare failed: ' . $rendered['msg'],
'send_time' => $now,
]);
$failed++;
continue;
}
Db::name('promotion_email_log')->where('log_id', $log['log_id'])->update([
'subject_prepared' => mb_substr($rendered['data']['subject'], 0, 512),
'body_prepared' => $rendered['data']['body'],
'prepared_at' => $now,
]);
$prepared++;
}
Db::name('promotion_task')->where('task_id', $taskId)->update([
// 兜底:即使 tryFinalizeTask 已触发,这里再保证 state 置为 5
Db::name('promotion_task')->where('task_id', $taskId)->where('state', 0)->update([
'state' => 5,
'utime' => $now,
'utime' => time(),
]);
$this->log("prepareTask task_id={$taskId} prepared={$prepared} failed={$failed}");
@@ -297,17 +508,38 @@ class PromotionService
}
/**
* 将单个 task 的 prepare 推入队列异步执行
* 将单个 task 的 prepare 推入队列异步执行(调度层)。
*
* 调度 Job 会进一步把每封邮件拆分成 PromotionPrepareEmail 推到 promotion_email 队列,
* 以便并行调用 LLM 生成个性化描述。
*/
public function enqueuePrepareTask($taskId, $delay = 0)
{
$jobClass = 'app\api\job\PromotionPrepare@fire';
$jobClass = 'app\api\job\PromotionPrepareTask@fire';
$data = ['task_id' => intval($taskId)];
if ($delay > 0) {
Queue::later($delay, $jobClass, $data, 'promotion');
Queue::later($delay, $jobClass, $data, 'promotionSend');
} else {
Queue::push($jobClass, $data, 'promotion');
Queue::push($jobClass, $data, 'promotionSend');
}
}
/**
* 将单封邮件的 prepare 推入队列异步执行LLM 层)。
*
* 队列名promotion_email
* 启动 workerphp think queue:listen --queue promotion_email
*/
public function enqueuePrepareEmail($logId, $delay = 0)
{
$jobClass = 'app\api\job\PromotionPrepareEmail@fire';
$data = ['log_id' => intval($logId)];
if ($delay > 0) {
Queue::later($delay, $jobClass, $data, 'promotion_email');
} else {
Queue::push($jobClass, $data, 'promotion_email');
}
}
@@ -491,6 +723,7 @@ class PromotionService
public function buildExpertVars($expert)
{
$llm = $expert['llm_description'] ?? '';
return [
'expert_title' => "Ph.D",
'expert_name' => $expert['name'] ?? '',
@@ -498,6 +731,8 @@ class PromotionService
'expert_affiliation' => $expert['affiliation'] ?? '',
'expert_field' => $expert['fields'] ?? ($expert['field'] ?? ''),
'representative_work_title' => $expert['representative_work_title'] ?? '',
'llm_description' => $llm,
'ai_content_analysis' => $llm,
];
}