Files
tougao/application/common/PromotionService.php
2026-04-29 15:07:56 +08:00

1083 lines
44 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
namespace app\common;
use think\Db;
use think\Cache;
use think\Queue;
use think\Env;
use PHPMailer\PHPMailer\PHPMailer;
class PromotionService
{
private $logFile;
public function __construct()
{
$this->logFile = ROOT_PATH . 'runtime' . DS . 'promotion_task.log';
}
/**
* Process the next email in a promotion task (called by queue job)
*/
public function processNextEmail($taskId)
{
$task = Db::name('promotion_task')->where('task_id', $taskId)->find();
if (!$task) {
return ['done' => true, 'reason' => 'task_not_found'];
}
if ($task['state'] != 1) {
return ['done' => true, 'reason' => 'task_not_running', 'state' => $task['state']];
}
$currentHour = intval(date('G'));
if ($currentHour < $task['send_start_hour'] || $currentHour >= $task['send_end_hour']) {
$this->enqueueNextEmail($taskId, 300);
return ['done' => false, 'reason' => 'outside_send_window', 'retry_in' => 300];
}
if ($task['sent_count'] > 0 && $task['max_bounce_rate'] > 0) {
$bounceRate = ($task['bounce_count'] / $task['sent_count']) * 100;
if ($bounceRate >= $task['max_bounce_rate']) {
Db::name('promotion_task')->where('task_id', $taskId)->update([
'state' => 2,
'utime' => time(),
]);
$this->log("Task {$taskId} auto-paused: bounce rate {$bounceRate}% >= {$task['max_bounce_rate']}%");
return ['done' => true, 'reason' => 'auto_paused_bounce_rate', 'bounce_rate' => $bounceRate];
}
}
$logEntry = Db::name('promotion_email_log')
->where('task_id', $taskId)
->where('state', 0)
->order('log_id asc')
->find();
if (!$logEntry) {
Db::name('promotion_task')->where('task_id', $taskId)->update([
'state' => 3,
'utime' => time(),
]);
return ['done' => true, 'reason' => 'all_emails_processed'];
}
// 受众分发log.expert_id>0 → 外部 expert 库log.user_id>0 → 系统内部用户
$audienceKind = '';
$expert = null;
if (intval($logEntry['expert_id']) > 0) {
$audienceKind = 'expert';
$expert = Db::name('expert')->where('expert_id', $logEntry['expert_id'])->find();
if (!$expert || $expert['state'] == 4 || $expert['state'] == 5) {
Db::name('promotion_email_log')->where('log_id', $logEntry['log_id'])->update([
'state' => 2,
'error_msg' => 'Expert invalid or deleted (state=' . (isset($expert['state']) ? $expert['state'] : 'null') . ')',
'send_time' => time(),
]);
Db::name('promotion_task')->where('task_id', $taskId)->setInc('fail_count');
Db::name('promotion_task')->where('task_id', $taskId)->update(['utime' => time()]);
$this->enqueueNextEmail($taskId, 2);
return ['done' => false, 'skipped' => $logEntry['email_to'], 'reason' => 'expert_invalid'];
}
if (!empty($expert['unsubscribed'])) {
Db::name('promotion_email_log')->where('log_id', $logEntry['log_id'])->update([
'state' => 2,
'error_msg' => 'Expert unsubscribed',
'send_time' => time(),
]);
Db::name('promotion_task')->where('task_id', $taskId)->update(['utime' => time()]);
$this->enqueueNextEmail($taskId, 2);
return ['done' => false, 'skipped' => $logEntry['email_to'], 'reason' => 'expert_unsubscribed'];
}
} elseif (intval($logEntry['user_id']) > 0) {
$audienceKind = 'user';
$row = Db::name('user')->alias('u')
->join('t_user_reviewer_info uri', 'uri.reviewer_id = u.user_id', 'left')
->where('u.user_id', $logEntry['user_id'])
->field('u.user_id, u.email, u.realname, u.unsubscribed, IFNULL(uri.company, "") as company')
->find();
if (!$row) {
Db::name('promotion_email_log')->where('log_id', $logEntry['log_id'])->update([
'state' => 2,
'error_msg' => 'User not found',
'send_time' => time(),
]);
Db::name('promotion_task')->where('task_id', $taskId)->setInc('fail_count');
Db::name('promotion_task')->where('task_id', $taskId)->update(['utime' => time()]);
$this->enqueueNextEmail($taskId, 2);
return ['done' => false, 'skipped' => $logEntry['email_to'], 'reason' => 'user_invalid'];
}
if (!empty($row['unsubscribed'])) {
Db::name('promotion_email_log')->where('log_id', $logEntry['log_id'])->update([
'state' => 2,
'error_msg' => 'User unsubscribed',
'send_time' => time(),
]);
Db::name('promotion_task')->where('task_id', $taskId)->update(['utime' => time()]);
$this->enqueueNextEmail($taskId, 2);
return ['done' => false, 'skipped' => $logEntry['email_to'], 'reason' => 'user_unsubscribed'];
}
$expert = [
'expert_id' => 0,
'user_id' => intval($row['user_id']),
'name' => (string)$row['realname'],
'email' => (string)$row['email'],
'affiliation' => (string)$row['company'],
];
} else {
Db::name('promotion_email_log')->where('log_id', $logEntry['log_id'])->update([
'state' => 2,
'error_msg' => 'No audience id (expert_id/user_id both empty)',
'send_time' => time(),
]);
Db::name('promotion_task')->where('task_id', $taskId)->setInc('fail_count');
Db::name('promotion_task')->where('task_id', $taskId)->update(['utime' => time()]);
$this->enqueueNextEmail($taskId, 2);
return ['done' => false, 'skipped' => $logEntry['email_to'], 'reason' => 'no_audience_id'];
}
// 注入 role供模板 {{role}} 使用)
$expert['role'] = $this->mapExpertTypeRole(intval($task['expert_type'] ?? 0));
$account = $this->pickSmtpAccountForTask($task);
if (!$account) {
$this->enqueueNextEmail($taskId, 600);
return ['done' => false, 'reason' => 'no_smtp_quota', 'retry_in' => 600];
}
// 优先使用预生成内容;无则现场渲染
$subject = '';
$body = '';
$hasPrepared = !empty($logEntry['subject_prepared']) && !empty($logEntry['body_prepared']);
if ($hasPrepared) {
$subject = $logEntry['subject_prepared'];
$body = $logEntry['body_prepared'];
} else {
$journal = Db::name('journal')->where('journal_id', $task['journal_id'])->find();
// 邀请青年编委申请链接(同 prepareSingleEmail 路径,保持模板变量一致)
$expert['application_link_yeditorial_board'] = self::buildYboardApplyUrl(
intval($expert['expert_id'] ?? 0),
intval($task['journal_id'])
);
if ($audienceKind === 'expert') {
$expert_fields = Db::name('expert_field')
->where('expert_id', $expert['expert_id'])
->where('state', 0)
->order('expert_field_id desc')
->select();
$taskFields = $this->resolveTaskFields($task);
$taskFieldLower = [];
foreach ($taskFields as $tf) {
$tf = strtolower(trim($tf));
if ($tf !== '') $taskFieldLower[$tf] = true;
}
$fieldSet = [];
$matchedTitle = '';
$fallbackTitle = '';
foreach ($expert_fields as $ef) {
$fn = trim($ef['field']);
if ($fn !== '' && !in_array($fn, $fieldSet)) {
$fieldSet[] = $fn;
}
$paper = trim((string)($ef['paper_title'] ?? ''));
if ($paper === '') continue;
if ($matchedTitle === '' && $fn !== '' && isset($taskFieldLower[strtolower($fn)])) {
$matchedTitle = $paper;
}
if ($fallbackTitle === '') {
$fallbackTitle = $paper;
}
if ($matchedTitle !== '' && $fallbackTitle !== '') break;
}
$expert['fields'] = implode(',', $fieldSet);
$expert['representative_work_title'] = $matchedTitle !== '' ? $matchedTitle : $fallbackTitle;
// 现场发送路径:没有提前准备 LLM退回 .env 兜底文案
try {
$llmSvc = new PromotionLlmService();
$expert['llm_description'] = $llmSvc->getFallback();
$expert['ai_advised_topics'] = $llmSvc->getAdvisedFallback();
} catch (\Exception $e) {
$expert['llm_description'] = '';
$expert['ai_advised_topics'] = '';
}
} else {
// 内部受众:领域 / 代表作 / LLM 全部跳过
$expert['fields'] = '';
$expert['representative_work_title'] = '';
$expert['llm_description'] = '';
$expert['ai_advised_topics'] = '';
}
$expertVars = $this->buildExpertVars($expert);
$journalVars = $this->buildJournalVars($journal);
$vars = array_merge($journalVars, $expertVars);
$rendered = $this->renderFromTemplate(
$task['template_id'],
$task['journal_id'],
json_encode($vars, JSON_UNESCAPED_UNICODE),
$task['style_id']
);
if ($rendered['code'] !== 0) {
Db::name('promotion_email_log')->where('log_id', $logEntry['log_id'])->update([
'state' => 2,
'error_msg' => 'Template render failed: ' . $rendered['msg'],
'send_time' => time(),
]);
Db::name('promotion_task')->where('task_id', $taskId)->setInc('fail_count');
Db::name('promotion_task')->where('task_id', $taskId)->update(['utime' => time()]);
$this->enqueueNextEmail($taskId, 2);
return ['done' => false, 'failed' => $logEntry['email_to'], 'reason' => 'template_error'];
}
$subject = $rendered['data']['subject'];
$body = $rendered['data']['body'];
}
$result = $this->doSendEmail($account, $logEntry['email_to'], $subject, $body);
$now = time();
if ($result['status'] === 1) {
Db::name('promotion_email_log')->where('log_id', $logEntry['log_id'])->update([
'j_email_id' => $account['j_email_id'],
'subject' => mb_substr($subject, 0, 512),
'state' => 1,
'send_time' => $now,
]);
Db::name('journal_email')->where('j_email_id', $account['j_email_id'])->setInc('today_sent');
// 仅外部 expert 库回写最近一次推广时间;内部 user 用 promotion_email_log.send_time 计频次
if ($audienceKind === 'expert' && intval($expert['expert_id']) > 0) {
Db::name('expert')->where('expert_id', $expert['expert_id'])->update(['state' => 1, 'ltime' => $now]);
}
Db::name('promotion_task')->where('task_id', $taskId)->setInc('sent_count');
} else {
Db::name('promotion_email_log')->where('log_id', $logEntry['log_id'])->update([
'j_email_id' => $account['j_email_id'],
'subject' => mb_substr($subject, 0, 512),
'state' => 2,
'error_msg' => mb_substr($result['data'], 0, 512),
'send_time' => $now,
]);
Db::name('promotion_task')->where('task_id', $taskId)->setInc('fail_count');
}
Db::name('promotion_task')->where('task_id', $taskId)->update(['utime' => $now]);
$delay = rand(max(5, $task['min_interval']), max($task['min_interval'], $task['max_interval']));
$this->enqueueNextEmail($taskId, $delay);
return [
'done' => false,
'sent' => $result['status'] === 1,
'email' => $logEntry['email_to'],
'next_in' => $delay,
];
}
// ==================== 准备与触发(今日准备、明日发送) ====================
/**
* 调度:为指定任务的所有待准备邮件分发 PromotionPrepareEmail 队列任务。
*
* 每封邮件单独一条 job可在 promotion_email 队列中并行消费(调用 LLM 生成个性化描述)。
* 所有邮件准备完成后,由最后一条 PromotionPrepareEmail 在 finalize 阶段将任务置为 state=5。
*
* @param int $taskId
* @return array ['dispatched' => int, 'already_done' => bool, 'error' => string|null]
*/
public function dispatchPrepareEmails($taskId)
{
$task = Db::name('promotion_task')->where('task_id', $taskId)->find();
if (!$task) {
return ['dispatched' => 0, 'already_done' => false, 'error' => 'task_not_found'];
}
if ($task['state'] != 0) {
return ['dispatched' => 0, 'already_done' => $task['state'] == 5, 'error' => 'task_state_not_draft'];
}
$logIds = Db::name('promotion_email_log')
->where('task_id', $taskId)
->where('state', 0)
->where('prepared_at', 0)
->order('log_id asc')
->column('log_id');
if (empty($logIds)) {
// 没有需要准备的邮件,直接置为已准备
Db::name('promotion_task')->where('task_id', $taskId)->update([
'state' => 5,
'utime' => time(),
]);
$this->log("dispatchPrepareEmails task_id={$taskId} no_logs -> state=5");
return ['dispatched' => 0, 'already_done' => true, 'error' => null];
}
foreach ($logIds as $logId) {
$this->enqueuePrepareEmail(intval($logId));
}
$this->log("dispatchPrepareEmails task_id={$taskId} dispatched=" . count($logIds));
return ['dispatched' => count($logIds), 'already_done' => false, 'error' => null];
}
/**
* 对单封邮件执行准备:拉取 expert / journal调 LLM 生成描述,渲染模板,写回 log。
*
* 由 PromotionPrepareEmail Job 调用;也可手工调用做调试。
*
* @param int $logId
* @return array ['code' => 0|1, 'msg' => string, 'llm_status' => int]
*/
public function prepareSingleEmail($logId)
{
$log = Db::name('promotion_email_log')->where('log_id', $logId)->find();
if (!$log) {
return ['code' => 1, 'msg' => 'log_not_found', 'llm_status' => 0];
}
if ($log['state'] != 0) {
return ['code' => 1, 'msg' => 'log_state_not_pending', 'llm_status' => 0];
}
if (!empty($log['prepared_at'])) {
return ['code' => 0, 'msg' => 'already_prepared', 'llm_status' => intval($log['llm_status'] ?? 0)];
}
$task = Db::name('promotion_task')->where('task_id', $log['task_id'])->find();
if (!$task) {
Db::name('promotion_email_log')->where('log_id', $logId)->update([
'state' => 2,
'error_msg' => 'Task not found',
'send_time' => time(),
]);
return ['code' => 1, 'msg' => 'task_not_found', 'llm_status' => 0];
}
// 受众分发log.expert_id>0 → 外部 expert 库log.user_id>0 → 系统内部用户(编委/主编/...
$expertType = intval($task['expert_type'] ?? 0);
$audienceKind = '';
$expert = null;
if (intval($log['expert_id']) > 0) {
$audienceKind = 'expert';
$expert = Db::name('expert')->where('expert_id', $log['expert_id'])->find();
if (!$expert) {
Db::name('promotion_email_log')->where('log_id', $logId)->update([
'state' => 2,
'error_msg' => 'Expert not found',
'send_time' => time(),
]);
$this->tryFinalizeTask($task['task_id']);
return ['code' => 1, 'msg' => 'expert_not_found', 'llm_status' => 0];
}
if (!empty($expert['unsubscribed'])) {
Db::name('promotion_email_log')->where('log_id', $logId)->update([
'state' => 2,
'error_msg' => 'Expert unsubscribed',
'send_time' => time(),
]);
$this->tryFinalizeTask($task['task_id']);
return ['code' => 1, 'msg' => 'expert_unsubscribed', 'llm_status' => 0];
}
} elseif (intval($log['user_id']) > 0) {
$audienceKind = 'user';
$row = Db::name('user')->alias('u')
->join('t_user_reviewer_info uri', 'uri.reviewer_id = u.user_id', 'left')
->where('u.user_id', $log['user_id'])
->field('u.user_id, u.email, u.realname, u.unsubscribed, IFNULL(uri.company, "") as company')
->find();
if (!$row) {
Db::name('promotion_email_log')->where('log_id', $logId)->update([
'state' => 2,
'error_msg' => 'User not found',
'send_time' => time(),
]);
$this->tryFinalizeTask($task['task_id']);
return ['code' => 1, 'msg' => 'user_not_found', 'llm_status' => 0];
}
if (!empty($row['unsubscribed'])) {
Db::name('promotion_email_log')->where('log_id', $logId)->update([
'state' => 2,
'error_msg' => 'User unsubscribed',
'send_time' => time(),
]);
$this->tryFinalizeTask($task['task_id']);
return ['code' => 1, 'msg' => 'user_unsubscribed', 'llm_status' => 0];
}
// 对齐外部 expert 数据结构buildExpertVars 直接复用
$expert = [
'expert_id' => 0,
'user_id' => intval($row['user_id']),
'name' => (string)$row['realname'],
'email' => (string)$row['email'],
'affiliation' => (string)$row['company'],
];
} else {
Db::name('promotion_email_log')->where('log_id', $logId)->update([
'state' => 2,
'error_msg' => 'No audience id (expert_id/user_id both empty)',
'send_time' => time(),
]);
$this->tryFinalizeTask($task['task_id']);
return ['code' => 1, 'msg' => 'no_audience_id', 'llm_status' => 0];
}
$journal = Db::name('journal')->where('journal_id', $task['journal_id'])->find();
// 邀请青年编委申请链接(无 expert_id / 未配 APPLY_URL 时为空串,模板自行处理)
$expert['application_link_yeditorial_board'] = self::buildYboardApplyUrl(
intval($expert['expert_id'] ?? 0),
intval($task['journal_id'])
);
// 内部受众:跳过 LLMrepresentative_work_title / fields 给空串占位,模板侧不引用即可
if ($audienceKind === 'user') {
$expert['fields'] = '';
$expert['representative_work_title'] = '';
$expert['llm_description'] = '';
$expert['ai_advised_topics'] = '';
$expert['role'] = $this->mapExpertTypeRole($expertType);
$llmText = '';
$llmStatus = 0;
$advisedText = '';
$advisedStatus = 0;
} else {
// 外部 expert 库:领域 + 代表作 + LLM 完整流程
$expert_fields = Db::name('expert_field')
->where('expert_id', $expert['expert_id'])
->where('state', 0)
->order('expert_field_id desc')
->select();
// 领域优先级:任务所属工厂/期刊的目标领域
$taskFields = $this->resolveTaskFields($task);
$taskFieldLower = [];
foreach ($taskFields as $tf) {
$tf = strtolower(trim($tf));
if ($tf !== '') $taskFieldLower[$tf] = true;
}
$fieldSet = [];
$matchedTitle = '';
$fallbackTitle = '';
foreach ($expert_fields as $ef) {
$fn = trim($ef['field']);
if ($fn !== '' && !in_array($fn, $fieldSet)) {
$fieldSet[] = $fn;
}
$paper = trim((string)($ef['paper_title'] ?? ''));
if ($paper === '') continue;
// 匹配到任务领域的 paper 优先(按最新 expert_field_id desc 顺序取第一条)
if ($matchedTitle === '' && $fn !== '' && isset($taskFieldLower[strtolower($fn)])) {
$matchedTitle = $paper;
}
if ($fallbackTitle === '') {
$fallbackTitle = $paper;
}
if ($matchedTitle !== '' && $fallbackTitle !== '') break;
}
$expert['fields'] = implode(',', $fieldSet);
$expert['representative_work_title'] = $matchedTitle !== '' ? $matchedTitle : $fallbackTitle;
// 领域交集(大小写不敏感,保留 expert 原字面值用于展示)
$overlapFields = [];
if (!empty($taskFieldLower)) {
foreach ($fieldSet as $fn) {
if (isset($taskFieldLower[strtolower($fn)])) {
$overlapFields[] = $fn;
}
}
}
// 一次 LLM 调用生成两段内容description + advised_topics
$llmResult = [
'description' => '',
'description_status' => 0,
'advised_topics' => '',
'advised_topics_status' => 0,
];
try {
$llm = new PromotionLlmService();
$llmResult = $llm->generateEmailContent(
$expert,
$journal ?: [],
$overlapFields,
$taskFields,
$fieldSet
);
} catch (\Exception $e) {
$fbDesc = '';
$fbAdvised = '';
try {
$fbSvc = isset($llm) ? $llm : new PromotionLlmService();
$fbDesc = $fbSvc->getFallback();
$fbAdvised = $fbSvc->getAdvisedFallback();
} catch (\Exception $ignore) {
}
$llmResult = [
'description' => $fbDesc,
'description_status' => 2,
'advised_topics' => $fbAdvised,
'advised_topics_status' => 2,
];
$this->log("prepareSingleEmail log_id={$logId} llm_exception=" . $e->getMessage());
}
$llmText = (string)$llmResult['description'];
$llmStatus = intval($llmResult['description_status']);
$advisedText = (string)$llmResult['advised_topics'];
$advisedStatus = intval($llmResult['advised_topics_status']);
$expert['llm_description'] = $llmText;
$expert['ai_advised_topics'] = $advisedText;
$expert['role'] = $this->mapExpertTypeRole($expertType);
}
$expertVars = $this->buildExpertVars($expert);
$journalVars = $this->buildJournalVars($journal);
$vars = array_merge($journalVars, $expertVars);
$rendered = $this->renderFromTemplate(
$task['template_id'],
$task['journal_id'],
json_encode($vars, JSON_UNESCAPED_UNICODE),
$task['style_id']
);
$now = time();
if ($rendered['code'] !== 0) {
Db::name('promotion_email_log')->where('log_id', $logId)->update([
'state' => 2,
'error_msg' => 'Prepare failed: ' . $rendered['msg'],
'llm_description' => mb_substr($llmText, 0, 2000),
'llm_status' => $llmStatus,
'llm_advised_topics' => mb_substr($advisedText, 0, 2000),
'llm_advised_topics_status' => $advisedStatus,
'send_time' => $now,
]);
$this->tryFinalizeTask($task['task_id']);
return [
'code' => 1,
'msg' => $rendered['msg'],
'llm_status' => $llmStatus,
'llm_advised_topics_status' => $advisedStatus,
];
}
Db::name('promotion_email_log')->where('log_id', $logId)->update([
'subject_prepared' => mb_substr($rendered['data']['subject'], 0, 512),
'body_prepared' => $rendered['data']['body'],
'llm_description' => mb_substr($llmText, 0, 2000),
'llm_status' => $llmStatus,
'llm_advised_topics' => mb_substr($advisedText, 0, 2000),
'llm_advised_topics_status' => $advisedStatus,
'prepared_at' => $now,
]);
$this->tryFinalizeTask($task['task_id']);
return [
'code' => 0,
'msg' => 'ok',
'llm_status' => $llmStatus,
'llm_advised_topics_status' => $advisedStatus,
];
}
/**
* 解析 task 的目标领域名称列表。
*
* 优先级:
* 1. task.factory_id > 0读取对应 promotion_factory.fetch_ids联 t_expert_fetch 取 field
* 2. 否则(手工任务):走 journal_promotion_field → t_expert_fetch 取 field
*
* 返回空数组表示没有限定领域。
*/
public function resolveTaskFields($task)
{
if (!is_array($task)) return [];
$factoryId = intval($task['factory_id'] ?? 0);
if ($factoryId > 0) {
$factory = Db::name('promotion_factory')
->where('promotion_factory_id', $factoryId)
->find();
if ($factory && !empty($factory['fetch_ids'])) {
$ids = array_filter(array_map('intval', explode(',', $factory['fetch_ids'])));
if (!empty($ids)) {
$fields = Db::name('expert_fetch')
->where('expert_fetch_id', 'in', $ids)
->where('state', 0)
->column('field');
return array_values(array_unique(array_filter(array_map('trim', $fields))));
}
}
return [];
}
// 手工任务走期刊的推广领域绑定
$journalId = intval($task['journal_id'] ?? 0);
if ($journalId <= 0) return [];
$fields = Db::name('journal_promotion_field')
->alias('jpf')
->join('t_expert_fetch ef_fetch', 'ef_fetch.expert_fetch_id = jpf.expert_fetch_id', 'inner')
->where('jpf.journal_id', $journalId)
->where('jpf.state', 0)
->where('ef_fetch.state', 0)
->column('ef_fetch.field');
return array_values(array_unique(array_filter(array_map('trim', $fields))));
}
/**
* 检查 task 下还有无未准备的 log若全部完成则将 task 置为 state=5已准备
*/
public function tryFinalizeTask($taskId)
{
$taskId = intval($taskId);
if ($taskId <= 0) return;
$task = Db::name('promotion_task')->where('task_id', $taskId)->find();
if (!$task || $task['state'] != 0) return;
$remaining = Db::name('promotion_email_log')
->where('task_id', $taskId)
->where('state', 0)
->where('prepared_at', 0)
->count();
if ($remaining == 0) {
Db::name('promotion_task')->where('task_id', $taskId)->update([
'state' => 5,
'utime' => time(),
]);
$this->log("tryFinalizeTask task_id={$taskId} -> state=5");
}
}
/**
* 为单个 task 准备邮件:走消息队列异步执行。
*
* 流程:
* 1. 推一条 PromotionPrepareTask 到 promotion 队列(调度层)。
* 2. 调度 Job 再拆成 N 条 PromotionPrepareEmail 到 promotion_email 队列LLM 层)。
*
* HTTP 调用方拿到的是"已入队"的结果,不再阻塞等待全部邮件准备完毕,
* 避免邮件量大时请求超时。task 的 state=5 由最后一封邮件在队列中触发
* tryFinalizeTask 时完成。
*
* @param int $taskId
* @return array ['queued' => bool, 'task_id' => int, 'pending' => int, 'error' => string|null]
*/
public function prepareTask($taskId)
{
$taskId = intval($taskId);
$task = Db::name('promotion_task')->where('task_id', $taskId)->find();
if (!$task) {
return ['queued' => false, 'task_id' => $taskId, 'pending' => 0, 'error' => 'task_not_found'];
}
if ($task['state'] != 0) {
return ['queued' => false, 'task_id' => $taskId, 'pending' => 0, 'error' => 'task_state_not_draft'];
}
$pending = Db::name('promotion_email_log')
->where('task_id', $taskId)
->where('state', 0)
->where('prepared_at', 0)
->count();
// 没有待准备邮件时,直接把 task 置为已准备并返回
if ($pending == 0) {
Db::name('promotion_task')->where('task_id', $taskId)->where('state', 0)->update([
'state' => 5,
'utime' => time(),
]);
$this->log("prepareTask task_id={$taskId} no_pending -> state=5");
return ['queued' => false, 'task_id' => $taskId, 'pending' => 0, 'error' => null];
}
$this->enqueuePrepareTask($taskId);
$this->log("prepareTask task_id={$taskId} queued pending={$pending}");
return ['queued' => true, 'task_id' => $taskId, 'pending' => $pending, 'error' => null];
}
/**
* 为指定日期的任务批量预生成邮件(供定时任务调用,如每天 22:00 准备明天的)
*
* 每个 task 通过队列异步执行 prepareTask避免条目过多时 HTTP 请求超时。
*
* @param string $date Y-m-d如 2026-03-12
* @return array ['tasks' => int, 'task_ids' => int[]]
*/
public function prepareTasksForDate($date)
{
$tasks = Db::name('promotion_task')
->where('send_date', $date)
->where('state', 0)
->select();
$taskIds = [];
foreach ($tasks as $task) {
$this->dispatchPrepareEmails($task['task_id']);
// $this->enqueuePrepareTask($task['task_id']);
$taskIds[] = $task['task_id'];
}
$this->log("prepareTasksForDate date={$date} tasks=" . count($tasks) . " queued task_ids=" . implode(',', $taskIds));
return [
'tasks' => count($tasks),
'task_ids' => $taskIds,
];
}
/**
* 将单个 task 的 prepare 推入队列异步执行(调度层)。
*
* 调度 Job 会进一步把每封邮件拆分成 PromotionPrepareEmail 推到 promotion_email 队列,
* 以便并行调用 LLM 生成个性化描述。
*/
public function enqueuePrepareTask($taskId, $delay = 0)
{
$jobClass = 'app\api\job\PromotionPrepareTask@fire';
$data = ['task_id' => intval($taskId)];
if ($delay > 0) {
Queue::later($delay, $jobClass, $data, 'PromotionPrepareTask');
} else {
Queue::push($jobClass, $data, 'PromotionPrepareTask');
}
}
/**
* 将单封邮件的 prepare 推入队列异步执行LLM 层)。
*
* 队列名promotion_email
* 启动 workerphp think queue:listen --queue promotion_email
*/
public function enqueuePrepareEmail($logId, $delay = 0)
{
$jobClass = 'app\api\job\PromotionPrepareEmail@fire';
$data = ['log_id' => intval($logId)];
if ($delay > 0) {
Queue::later($delay, $jobClass, $data, 'PromotionPrepareEmail');
} else {
Queue::push($jobClass, $data, 'PromotionPrepareEmail');
}
}
/**
* 触发指定日期的已准备任务开始发送(供定时任务调用,如每天 8:00 触发今天的)
* 会先对 send_date=date 且 state=0 的任务做一次补准备,再启动所有 state=5 的任务
* @param string $date Y-m-d
* @return array ['prepared' => int, 'started' => int, 'task_ids' => []]
*/
public function startTasksForDate($date)
{
// 补准备:当天日期但尚未准备的任务(如 22:00 后创建),推队列异步执行
$catchUpTasks = Db::name('promotion_task')
->where('send_date', $date)
->where('state', 0)
->select();
foreach ($catchUpTasks as $t) {
$this->enqueuePrepareTask($t['task_id']);
}
$tasks = Db::name('promotion_task')
->where('send_date', $date)
->where('state', 5)
->select();
$started = 0;
$taskIds = [];
foreach ($tasks as $task) {
Db::name('promotion_task')->where('task_id', $task['task_id'])->update([
'state' => 1,
'utime' => time(),
]);
$this->enqueueNextEmail($task['task_id'], 0);
$started++;
$taskIds[] = $task['task_id'];
}
$this->log("startTasksForDate date={$date} started={$started} task_ids=" . implode(',', $taskIds));
return [
'prepared' => count($catchUpTasks),
'started' => $started,
'task_ids' => $taskIds,
];
}
// ==================== Queue ====================
public function enqueueNextEmail($taskId, $delay = 0)
{
$jobClass = 'app\api\job\PromotionSend@fire';
$data = ['task_id' => $taskId];
if ($delay > 0) {
Queue::later($delay, $jobClass, $data, 'PromotionSend');
} else {
Queue::push($jobClass, $data, 'PromotionSend');
}
}
// ==================== SMTP ====================
public function pickSmtpAccountForTask($task)
{
$journalId = $task['journal_id'];
$smtpIds = $task['smtp_ids'] ? array_map('intval', explode(',', $task['smtp_ids'])) : [];
$query = Db::name('journal_email')
->where('journal_id', $journalId)
->where('state', 0);
if (!empty($smtpIds)) {
$query->where('j_email_id', 'in', $smtpIds);
}
$accounts = $query->select();
if (empty($accounts)) {
return null;
}
$best = null;
$bestRemaining = -1;
foreach ($accounts as $acc) {
$this->resetDailyCountIfNeeded($acc);
$remaining = $acc['daily_limit'] - $acc['today_sent'];
if ($remaining > 0 && $remaining > $bestRemaining) {
$best = $acc;
$bestRemaining = $remaining;
}
}
return $best;
}
public function resetDailyCountIfNeeded(&$account)
{
$todayDate = date('Y-m-d');
$cacheKey = 'smtp_reset_' . $account['j_email_id'];
$lastReset = Cache::get($cacheKey);
if ($lastReset !== $todayDate) {
Db::name('journal_email')
->where('j_email_id', $account['j_email_id'])
->update(['today_sent' => 0]);
$account['today_sent'] = 0;
Cache::set($cacheKey, $todayDate, 86400);
}
}
public function doSendEmail($account, $toEmail, $subject, $htmlContent)
{
try {
$mail = new PHPMailer(true);
$mail->isSMTP();
$mail->SMTPDebug = 0;
$mail->CharSet = 'UTF-8';
$mail->Host = $account['smtp_host'];
$mail->Port = intval($account['smtp_port']);
$mail->SMTPAuth = true;
$mail->Username = $account['smtp_user'];
$mail->Password = $account['smtp_password'];
if ($account['smtp_encryption'] === 'ssl') {
$mail->SMTPSecure = 'ssl';
} elseif ($account['smtp_encryption'] === 'tls') {
$mail->SMTPSecure = 'tls';
} else {
$mail->SMTPSecure = false;
$mail->SMTPAutoTLS = false;
}
$fromName = !empty($account['smtp_from_name']) ? $account['smtp_from_name'] : $account['smtp_user'];
$mail->setFrom($account['smtp_user'], $fromName);
$mail->addReplyTo($account['smtp_user'], $fromName);
$mail->addAddress($toEmail);
$mail->isHTML(true);
$mail->Subject = $subject;
$mail->Body = $htmlContent;
$mail->AltBody = strip_tags($htmlContent);
$mail->send();
return ['status' => 1, 'data' => 'success'];
} catch (\Exception $e) {
return ['status' => 0, 'data' => $e->getMessage()];
}
}
// ==================== Template Rendering ====================
public function renderFromTemplate($templateId, $journalId, $varsJson, $styleId = 0)
{
$tpl = Db::name('mail_template')->where('template_id', $templateId)->where('journal_id', $journalId)->where('state', 0)->find();
if (!$tpl) {
return ['code' => 1, 'msg' => 'Template not found'];
}
$vars = [];
if ($varsJson) {
$decoded = json_decode($varsJson, true);
if (is_array($decoded)) $vars = $decoded;
}
$subject = $this->renderVars($tpl['subject'], $vars);
$body = $this->renderVars($tpl['body_html'], $vars);
$finalBody = $body;
if ($styleId) {
$style = Db::name('mail_style')->where('style_id', $styleId)->where('state', 0)->find();
if ($style) {
$header = $style['header_html'] ? $this->renderVars($style['header_html'],$vars):'';
$footer = $style['footer_html'] ? $this->renderVars($style['footer_html'],$vars): '';
$finalBody = $header . $body . $footer;
}
}
return ['code' => 0, 'msg' => 'success', 'data' => ['subject' => $subject, 'body' => $finalBody]];
}
public function buildExpertVars($expert)
{
$llm = $expert['llm_description'] ?? '';
$advised = $expert['ai_advised_topics'] ?? '';
// 退订 URL根据受众身份选择 kind=expert / user
$unsubUrl = '';
$email = (string)($expert['email'] ?? '');
if ($email !== '') {
if (!empty($expert['expert_id'])) {
$unsubUrl = \app\common\UnsubscribeService::buildUrl('expert', intval($expert['expert_id']), $email);
} elseif (!empty($expert['user_id'])) {
$unsubUrl = \app\common\UnsubscribeService::buildUrl('user', intval($expert['user_id']), $email);
}
}
return [
'expert_title' => "Ph.D",
'expert_name' => $expert['name'] ?? '',
'expert_email' => $expert['email'] ?? '',
'expert_affiliation' => $expert['affiliation'] ?? '',
'expert_field' => $expert['fields'] ?? ($expert['field'] ?? ''),
'representative_work_title' => $expert['representative_work_title'] ?? '',
'role' => $expert['role'] ?? '',
'llm_description' => $llm,
'ai_content_analysis' => $llm,
'ai_advised_topics' => $advised,
'llm_advised_topics' => $advised,
'unsubscribe_url' => $unsubUrl,
'application_link_yeditorial_board' => $expert['application_link_yeditorial_board'] ?? '',
];
}
/**
* 构造"邀请青年编委申请"链接,供模板 {{application_link_yeditorial_board}} 使用。
*
* .env 配置([yboard] 段):
* APPLY_URL=https://your-domain.com/yboard/apply
*
* 输出格式:
* {APPLY_URL}?journal_id=X&expert_id=Y
* (若 APPLY_URL 已带 ? 参数则用 & 续接)
*
* 任意参数无效时返回空串,模板侧不会渲染出非法链接。
*/
public static function buildYboardApplyUrl($expertId, $journalId)
{
$expertId = intval($expertId);
$journalId = intval($journalId);
if ($expertId <= 0 || $journalId <= 0) return '';
$base = trim((string)Env::get('yboard.apply_url', ''));
if ($base === '') return '';
$sep = strpos($base, '?') === false ? '?' : '&';
return $base . $sep . 'journal_id=' . $journalId . '&expert_id=' . $expertId;
}
/**
* 把 expert_type 映射成英文角色文案,供模板 {{role}} 使用
*/
public function mapExpertTypeRole($expertType)
{
$map = [
1 => 'Editor-in-Chief',
2 => 'Editorial Board Member',
3 => 'Young Editorial Board Member',
4 => 'Author',
5 => '',
];
$expertType = intval($expertType);
return isset($map[$expertType]) ? $map[$expertType] : '';
}
public function buildJournalVars($journal)
{
if (!$journal) return [];
$zb = Db::name("board_to_journal")
->where("journal_id",$journal['journal_id'])
->where("state",0)
->where('type',0)
->find();
return [
'journal_name' => $journal['title'] ?? '',
'journal_abbr' => $journal['jabbr'] ?? '',
'journal_url' => $journal['website'] ?? '',
'journal_email' => $journal['email'] ?? '',
'indexing_databases' => $journal['databases'] ?? '',
'submission_url' => "https://submission.tmrjournals.com/",
'eic_name' => $zb['realname'] ?? '',
'editor_name' => $journal['editor_name'],
'special_support_deadline'=>date("Y-m-d",strtotime("+30 days"))
];
}
public function renderVars($tpl, $vars)
{
if (!is_string($tpl) || $tpl === '') return '';
if (!is_array($vars) || empty($vars)) return $tpl;
$map = [];
foreach ($vars as $k => $v) {
$key = trim((string)$k);
if ($key === '') continue;
$map[$key] = (string)$v;
}
if (empty($map)) return $tpl;
// 双大括号:允许内部有空格,如 {{ var }} / {{ var }}
$tpl = preg_replace_callback('/\{\{\s*([A-Za-z0-9_\-\.]+)\s*\}\}/', function ($m) use ($map) {
return array_key_exists($m[1], $map) ? $map[$m[1]] : $m[0];
}, $tpl);
// 单大括号:保持严格匹配(不允许内部空格),避免误伤正文
$single = [];
foreach ($map as $k => $v) {
$single['{' . $k . '}'] = $v;
}
return str_replace(array_keys($single), array_values($single), $tpl);
}
// ==================== Logging ====================
public function log($msg)
{
$line = date('Y-m-d H:i:s') . ' ' . $msg . PHP_EOL;
@file_put_contents($this->logFile, $line, FILE_APPEND);
}
}