864 lines
33 KiB
PHP
864 lines
33 KiB
PHP
<?php
|
||
|
||
namespace app\common;
|
||
|
||
use think\Db;
|
||
use think\Cache;
|
||
use think\Queue;
|
||
use PHPMailer\PHPMailer\PHPMailer;
|
||
|
||
class PromotionService
|
||
{
|
||
private $logFile;
|
||
|
||
public function __construct()
|
||
{
|
||
$this->logFile = ROOT_PATH . 'runtime' . DS . 'promotion_task.log';
|
||
}
|
||
|
||
/**
|
||
* Process the next email in a promotion task (called by queue job)
|
||
*/
|
||
public function processNextEmail($taskId)
|
||
{
|
||
$task = Db::name('promotion_task')->where('task_id', $taskId)->find();
|
||
if (!$task) {
|
||
return ['done' => true, 'reason' => 'task_not_found'];
|
||
}
|
||
if ($task['state'] != 1) {
|
||
return ['done' => true, 'reason' => 'task_not_running', 'state' => $task['state']];
|
||
}
|
||
|
||
$currentHour = intval(date('G'));
|
||
if ($currentHour < $task['send_start_hour'] || $currentHour >= $task['send_end_hour']) {
|
||
$this->enqueueNextEmail($taskId, 300);
|
||
return ['done' => false, 'reason' => 'outside_send_window', 'retry_in' => 300];
|
||
}
|
||
|
||
if ($task['sent_count'] > 0 && $task['max_bounce_rate'] > 0) {
|
||
$bounceRate = ($task['bounce_count'] / $task['sent_count']) * 100;
|
||
if ($bounceRate >= $task['max_bounce_rate']) {
|
||
Db::name('promotion_task')->where('task_id', $taskId)->update([
|
||
'state' => 2,
|
||
'utime' => time(),
|
||
]);
|
||
$this->log("Task {$taskId} auto-paused: bounce rate {$bounceRate}% >= {$task['max_bounce_rate']}%");
|
||
return ['done' => true, 'reason' => 'auto_paused_bounce_rate', 'bounce_rate' => $bounceRate];
|
||
}
|
||
}
|
||
|
||
$logEntry = Db::name('promotion_email_log')
|
||
->where('task_id', $taskId)
|
||
->where('state', 0)
|
||
->order('log_id asc')
|
||
->find();
|
||
|
||
if (!$logEntry) {
|
||
Db::name('promotion_task')->where('task_id', $taskId)->update([
|
||
'state' => 3,
|
||
'utime' => time(),
|
||
]);
|
||
return ['done' => true, 'reason' => 'all_emails_processed'];
|
||
}
|
||
|
||
$expert = Db::name('expert')->where('expert_id', $logEntry['expert_id'])->find();
|
||
if (!$expert || $expert['state'] == 4 || $expert['state'] == 5) {
|
||
Db::name('promotion_email_log')->where('log_id', $logEntry['log_id'])->update([
|
||
'state' => 2,
|
||
'error_msg' => 'Expert invalid or deleted (state=' . (isset($expert['state']) ? $expert['state'] : 'null') . ')',
|
||
'send_time' => time(),
|
||
]);
|
||
Db::name('promotion_task')->where('task_id', $taskId)->setInc('fail_count');
|
||
Db::name('promotion_task')->where('task_id', $taskId)->update(['utime' => time()]);
|
||
$this->enqueueNextEmail($taskId, 2);
|
||
return ['done' => false, 'skipped' => $logEntry['email_to'], 'reason' => 'expert_invalid'];
|
||
}
|
||
|
||
$account = $this->pickSmtpAccountForTask($task);
|
||
if (!$account) {
|
||
$this->enqueueNextEmail($taskId, 600);
|
||
return ['done' => false, 'reason' => 'no_smtp_quota', 'retry_in' => 600];
|
||
}
|
||
|
||
// 优先使用预生成内容;无则现场渲染
|
||
$subject = '';
|
||
$body = '';
|
||
$hasPrepared = !empty($logEntry['subject_prepared']) && !empty($logEntry['body_prepared']);
|
||
|
||
if ($hasPrepared) {
|
||
$subject = $logEntry['subject_prepared'];
|
||
$body = $logEntry['body_prepared'];
|
||
} else {
|
||
$journal = Db::name('journal')->where('journal_id', $task['journal_id'])->find();
|
||
$expert_fields = Db::name('expert_field')
|
||
->where('expert_id', $expert['expert_id'])
|
||
->where('state', 0)
|
||
->order('expert_field_id desc')
|
||
->select();
|
||
|
||
$taskFields = $this->resolveTaskFields($task);
|
||
$taskFieldLower = [];
|
||
foreach ($taskFields as $tf) {
|
||
$tf = strtolower(trim($tf));
|
||
if ($tf !== '') $taskFieldLower[$tf] = true;
|
||
}
|
||
|
||
$fieldSet = [];
|
||
$matchedTitle = '';
|
||
$fallbackTitle = '';
|
||
foreach ($expert_fields as $ef) {
|
||
$fn = trim($ef['field']);
|
||
if ($fn !== '' && !in_array($fn, $fieldSet)) {
|
||
$fieldSet[] = $fn;
|
||
}
|
||
$paper = trim((string)($ef['paper_title'] ?? ''));
|
||
if ($paper === '') continue;
|
||
if ($matchedTitle === '' && $fn !== '' && isset($taskFieldLower[strtolower($fn)])) {
|
||
$matchedTitle = $paper;
|
||
}
|
||
if ($fallbackTitle === '') {
|
||
$fallbackTitle = $paper;
|
||
}
|
||
if ($matchedTitle !== '' && $fallbackTitle !== '') break;
|
||
}
|
||
$expert['fields'] = implode(',', $fieldSet);
|
||
$expert['representative_work_title'] = $matchedTitle !== '' ? $matchedTitle : $fallbackTitle;
|
||
|
||
// 现场发送路径:没有提前准备 LLM,退回 .env 兜底文案
|
||
try {
|
||
$llmSvc = new PromotionLlmService();
|
||
$expert['llm_description'] = $llmSvc->getFallback();
|
||
$expert['ai_advised_topics'] = $llmSvc->getAdvisedFallback();
|
||
} catch (\Exception $e) {
|
||
$expert['llm_description'] = '';
|
||
$expert['ai_advised_topics'] = '';
|
||
}
|
||
|
||
$expertVars = $this->buildExpertVars($expert);
|
||
$journalVars = $this->buildJournalVars($journal);
|
||
$vars = array_merge($journalVars, $expertVars);
|
||
|
||
$rendered = $this->renderFromTemplate(
|
||
$task['template_id'],
|
||
$task['journal_id'],
|
||
json_encode($vars, JSON_UNESCAPED_UNICODE),
|
||
$task['style_id']
|
||
);
|
||
|
||
if ($rendered['code'] !== 0) {
|
||
Db::name('promotion_email_log')->where('log_id', $logEntry['log_id'])->update([
|
||
'state' => 2,
|
||
'error_msg' => 'Template render failed: ' . $rendered['msg'],
|
||
'send_time' => time(),
|
||
]);
|
||
Db::name('promotion_task')->where('task_id', $taskId)->setInc('fail_count');
|
||
Db::name('promotion_task')->where('task_id', $taskId)->update(['utime' => time()]);
|
||
$this->enqueueNextEmail($taskId, 2);
|
||
return ['done' => false, 'failed' => $logEntry['email_to'], 'reason' => 'template_error'];
|
||
}
|
||
|
||
$subject = $rendered['data']['subject'];
|
||
$body = $rendered['data']['body'];
|
||
}
|
||
|
||
$result = $this->doSendEmail($account, $logEntry['email_to'], $subject, $body);
|
||
|
||
$now = time();
|
||
if ($result['status'] === 1) {
|
||
Db::name('promotion_email_log')->where('log_id', $logEntry['log_id'])->update([
|
||
'j_email_id' => $account['j_email_id'],
|
||
'subject' => mb_substr($subject, 0, 512),
|
||
'state' => 1,
|
||
'send_time' => $now,
|
||
]);
|
||
Db::name('journal_email')->where('j_email_id', $account['j_email_id'])->setInc('today_sent');
|
||
Db::name('expert')->where('expert_id', $expert['expert_id'])->update(['state' => 1, 'ltime' => $now]);
|
||
Db::name('promotion_task')->where('task_id', $taskId)->setInc('sent_count');
|
||
} else {
|
||
Db::name('promotion_email_log')->where('log_id', $logEntry['log_id'])->update([
|
||
'j_email_id' => $account['j_email_id'],
|
||
'subject' => mb_substr($subject, 0, 512),
|
||
'state' => 2,
|
||
'error_msg' => mb_substr($result['data'], 0, 512),
|
||
'send_time' => $now,
|
||
]);
|
||
Db::name('promotion_task')->where('task_id', $taskId)->setInc('fail_count');
|
||
}
|
||
|
||
Db::name('promotion_task')->where('task_id', $taskId)->update(['utime' => $now]);
|
||
|
||
$delay = rand(max(5, $task['min_interval']), max($task['min_interval'], $task['max_interval']));
|
||
$this->enqueueNextEmail($taskId, $delay);
|
||
|
||
return [
|
||
'done' => false,
|
||
'sent' => $result['status'] === 1,
|
||
'email' => $logEntry['email_to'],
|
||
'next_in' => $delay,
|
||
];
|
||
}
|
||
|
||
// ==================== 准备与触发(今日准备、明日发送) ====================
|
||
|
||
/**
|
||
* 调度:为指定任务的所有待准备邮件分发 PromotionPrepareEmail 队列任务。
|
||
*
|
||
* 每封邮件单独一条 job,可在 promotion_email 队列中并行消费(调用 LLM 生成个性化描述)。
|
||
* 所有邮件准备完成后,由最后一条 PromotionPrepareEmail 在 finalize 阶段将任务置为 state=5。
|
||
*
|
||
* @param int $taskId
|
||
* @return array ['dispatched' => int, 'already_done' => bool, 'error' => string|null]
|
||
*/
|
||
public function dispatchPrepareEmails($taskId)
|
||
{
|
||
$task = Db::name('promotion_task')->where('task_id', $taskId)->find();
|
||
if (!$task) {
|
||
return ['dispatched' => 0, 'already_done' => false, 'error' => 'task_not_found'];
|
||
}
|
||
if ($task['state'] != 0) {
|
||
return ['dispatched' => 0, 'already_done' => $task['state'] == 5, 'error' => 'task_state_not_draft'];
|
||
}
|
||
|
||
$logIds = Db::name('promotion_email_log')
|
||
->where('task_id', $taskId)
|
||
->where('state', 0)
|
||
->where('prepared_at', 0)
|
||
->order('log_id asc')
|
||
->column('log_id');
|
||
|
||
if (empty($logIds)) {
|
||
// 没有需要准备的邮件,直接置为已准备
|
||
Db::name('promotion_task')->where('task_id', $taskId)->update([
|
||
'state' => 5,
|
||
'utime' => time(),
|
||
]);
|
||
$this->log("dispatchPrepareEmails task_id={$taskId} no_logs -> state=5");
|
||
return ['dispatched' => 0, 'already_done' => true, 'error' => null];
|
||
}
|
||
|
||
foreach ($logIds as $logId) {
|
||
$this->enqueuePrepareEmail(intval($logId));
|
||
}
|
||
|
||
$this->log("dispatchPrepareEmails task_id={$taskId} dispatched=" . count($logIds));
|
||
return ['dispatched' => count($logIds), 'already_done' => false, 'error' => null];
|
||
}
|
||
|
||
/**
|
||
* 对单封邮件执行准备:拉取 expert / journal,调 LLM 生成描述,渲染模板,写回 log。
|
||
*
|
||
* 由 PromotionPrepareEmail Job 调用;也可手工调用做调试。
|
||
*
|
||
* @param int $logId
|
||
* @return array ['code' => 0|1, 'msg' => string, 'llm_status' => int]
|
||
*/
|
||
public function prepareSingleEmail($logId)
|
||
{
|
||
$log = Db::name('promotion_email_log')->where('log_id', $logId)->find();
|
||
if (!$log) {
|
||
return ['code' => 1, 'msg' => 'log_not_found', 'llm_status' => 0];
|
||
}
|
||
if ($log['state'] != 0) {
|
||
return ['code' => 1, 'msg' => 'log_state_not_pending', 'llm_status' => 0];
|
||
}
|
||
if (!empty($log['prepared_at'])) {
|
||
return ['code' => 0, 'msg' => 'already_prepared', 'llm_status' => intval($log['llm_status'] ?? 0)];
|
||
}
|
||
|
||
$task = Db::name('promotion_task')->where('task_id', $log['task_id'])->find();
|
||
if (!$task) {
|
||
Db::name('promotion_email_log')->where('log_id', $logId)->update([
|
||
'state' => 2,
|
||
'error_msg' => 'Task not found',
|
||
'send_time' => time(),
|
||
]);
|
||
return ['code' => 1, 'msg' => 'task_not_found', 'llm_status' => 0];
|
||
}
|
||
|
||
$expert = Db::name('expert')->where('expert_id', $log['expert_id'])->find();
|
||
if (!$expert) {
|
||
Db::name('promotion_email_log')->where('log_id', $logId)->update([
|
||
'state' => 2,
|
||
'error_msg' => 'Expert not found',
|
||
'send_time' => time(),
|
||
]);
|
||
$this->tryFinalizeTask($task['task_id']);
|
||
return ['code' => 1, 'msg' => 'expert_not_found', 'llm_status' => 0];
|
||
}
|
||
|
||
$expert_fields = Db::name('expert_field')
|
||
->where('expert_id', $expert['expert_id'])
|
||
->where('state', 0)
|
||
->order('expert_field_id desc')
|
||
->select();
|
||
|
||
// 领域优先级:任务所属工厂/期刊的目标领域
|
||
$taskFields = $this->resolveTaskFields($task);
|
||
$taskFieldLower = [];
|
||
foreach ($taskFields as $tf) {
|
||
$tf = strtolower(trim($tf));
|
||
if ($tf !== '') $taskFieldLower[$tf] = true;
|
||
}
|
||
|
||
$fieldSet = [];
|
||
$matchedTitle = '';
|
||
$fallbackTitle = '';
|
||
foreach ($expert_fields as $ef) {
|
||
$fn = trim($ef['field']);
|
||
if ($fn !== '' && !in_array($fn, $fieldSet)) {
|
||
$fieldSet[] = $fn;
|
||
}
|
||
$paper = trim((string)($ef['paper_title'] ?? ''));
|
||
if ($paper === '') continue;
|
||
|
||
// 匹配到任务领域的 paper 优先(按最新 expert_field_id desc 顺序取第一条)
|
||
if ($matchedTitle === '' && $fn !== '' && isset($taskFieldLower[strtolower($fn)])) {
|
||
$matchedTitle = $paper;
|
||
}
|
||
if ($fallbackTitle === '') {
|
||
$fallbackTitle = $paper;
|
||
}
|
||
if ($matchedTitle !== '' && $fallbackTitle !== '') break;
|
||
}
|
||
$expert['fields'] = implode(',', $fieldSet);
|
||
$expert['representative_work_title'] = $matchedTitle !== '' ? $matchedTitle : $fallbackTitle;
|
||
|
||
// 领域交集(大小写不敏感,保留 expert 原字面值用于展示)
|
||
$overlapFields = [];
|
||
if (!empty($taskFieldLower)) {
|
||
foreach ($fieldSet as $fn) {
|
||
if (isset($taskFieldLower[strtolower($fn)])) {
|
||
$overlapFields[] = $fn;
|
||
}
|
||
}
|
||
}
|
||
|
||
$journal = Db::name('journal')->where('journal_id', $task['journal_id'])->find();
|
||
|
||
// 一次 LLM 调用生成两段内容(description + advised_topics)
|
||
$llmResult = [
|
||
'description' => '',
|
||
'description_status' => 0,
|
||
'advised_topics' => '',
|
||
'advised_topics_status' => 0,
|
||
];
|
||
try {
|
||
$llm = new PromotionLlmService();
|
||
$llmResult = $llm->generateEmailContent(
|
||
$expert,
|
||
$journal ?: [],
|
||
$overlapFields,
|
||
$taskFields,
|
||
$fieldSet
|
||
);
|
||
} catch (\Exception $e) {
|
||
// 兜底双占位($llm 实例可能未成功构建,单独拿一个尝试)
|
||
$fbDesc = '';
|
||
$fbAdvised = '';
|
||
try {
|
||
$fbSvc = isset($llm) ? $llm : new PromotionLlmService();
|
||
$fbDesc = $fbSvc->getFallback();
|
||
$fbAdvised = $fbSvc->getAdvisedFallback();
|
||
} catch (\Exception $ignore) {
|
||
// 忽略,使用空串
|
||
}
|
||
$llmResult = [
|
||
'description' => $fbDesc,
|
||
'description_status' => 2,
|
||
'advised_topics' => $fbAdvised,
|
||
'advised_topics_status' => 2,
|
||
];
|
||
$this->log("prepareSingleEmail log_id={$logId} llm_exception=" . $e->getMessage());
|
||
}
|
||
$llmText = (string)$llmResult['description'];
|
||
$llmStatus = intval($llmResult['description_status']);
|
||
$advisedText = (string)$llmResult['advised_topics'];
|
||
$advisedStatus = intval($llmResult['advised_topics_status']);
|
||
|
||
$expert['llm_description'] = $llmText;
|
||
$expert['ai_advised_topics'] = $advisedText;
|
||
|
||
$expertVars = $this->buildExpertVars($expert);
|
||
$journalVars = $this->buildJournalVars($journal);
|
||
$vars = array_merge($journalVars, $expertVars);
|
||
$rendered = $this->renderFromTemplate(
|
||
$task['template_id'],
|
||
$task['journal_id'],
|
||
json_encode($vars, JSON_UNESCAPED_UNICODE),
|
||
$task['style_id']
|
||
);
|
||
|
||
$now = time();
|
||
if ($rendered['code'] !== 0) {
|
||
Db::name('promotion_email_log')->where('log_id', $logId)->update([
|
||
'state' => 2,
|
||
'error_msg' => 'Prepare failed: ' . $rendered['msg'],
|
||
'llm_description' => mb_substr($llmText, 0, 2000),
|
||
'llm_status' => $llmStatus,
|
||
'llm_advised_topics' => mb_substr($advisedText, 0, 2000),
|
||
'llm_advised_topics_status' => $advisedStatus,
|
||
'send_time' => $now,
|
||
]);
|
||
$this->tryFinalizeTask($task['task_id']);
|
||
return [
|
||
'code' => 1,
|
||
'msg' => $rendered['msg'],
|
||
'llm_status' => $llmStatus,
|
||
'llm_advised_topics_status' => $advisedStatus,
|
||
];
|
||
}
|
||
|
||
Db::name('promotion_email_log')->where('log_id', $logId)->update([
|
||
'subject_prepared' => mb_substr($rendered['data']['subject'], 0, 512),
|
||
'body_prepared' => $rendered['data']['body'],
|
||
'llm_description' => mb_substr($llmText, 0, 2000),
|
||
'llm_status' => $llmStatus,
|
||
'llm_advised_topics' => mb_substr($advisedText, 0, 2000),
|
||
'llm_advised_topics_status' => $advisedStatus,
|
||
'prepared_at' => $now,
|
||
]);
|
||
|
||
$this->tryFinalizeTask($task['task_id']);
|
||
|
||
return [
|
||
'code' => 0,
|
||
'msg' => 'ok',
|
||
'llm_status' => $llmStatus,
|
||
'llm_advised_topics_status' => $advisedStatus,
|
||
];
|
||
}
|
||
|
||
/**
|
||
* 解析 task 的目标领域名称列表。
|
||
*
|
||
* 优先级:
|
||
* 1. task.factory_id > 0:读取对应 promotion_factory.fetch_ids,联 t_expert_fetch 取 field
|
||
* 2. 否则(手工任务):走 journal_promotion_field → t_expert_fetch 取 field
|
||
*
|
||
* 返回空数组表示没有限定领域。
|
||
*/
|
||
public function resolveTaskFields($task)
|
||
{
|
||
if (!is_array($task)) return [];
|
||
|
||
$factoryId = intval($task['factory_id'] ?? 0);
|
||
if ($factoryId > 0) {
|
||
$factory = Db::name('promotion_factory')
|
||
->where('promotion_factory_id', $factoryId)
|
||
->find();
|
||
if ($factory && !empty($factory['fetch_ids'])) {
|
||
$ids = array_filter(array_map('intval', explode(',', $factory['fetch_ids'])));
|
||
if (!empty($ids)) {
|
||
$fields = Db::name('expert_fetch')
|
||
->where('expert_fetch_id', 'in', $ids)
|
||
->where('state', 0)
|
||
->column('field');
|
||
return array_values(array_unique(array_filter(array_map('trim', $fields))));
|
||
}
|
||
}
|
||
return [];
|
||
}
|
||
|
||
// 手工任务走期刊的推广领域绑定
|
||
$journalId = intval($task['journal_id'] ?? 0);
|
||
if ($journalId <= 0) return [];
|
||
$fields = Db::name('journal_promotion_field')
|
||
->alias('jpf')
|
||
->join('t_expert_fetch ef_fetch', 'ef_fetch.expert_fetch_id = jpf.expert_fetch_id', 'inner')
|
||
->where('jpf.journal_id', $journalId)
|
||
->where('jpf.state', 0)
|
||
->where('ef_fetch.state', 0)
|
||
->column('ef_fetch.field');
|
||
return array_values(array_unique(array_filter(array_map('trim', $fields))));
|
||
}
|
||
|
||
/**
|
||
* 检查 task 下还有无未准备的 log,若全部完成则将 task 置为 state=5(已准备)。
|
||
*/
|
||
public function tryFinalizeTask($taskId)
|
||
{
|
||
$taskId = intval($taskId);
|
||
if ($taskId <= 0) return;
|
||
|
||
$task = Db::name('promotion_task')->where('task_id', $taskId)->find();
|
||
if (!$task || $task['state'] != 0) return;
|
||
|
||
$remaining = Db::name('promotion_email_log')
|
||
->where('task_id', $taskId)
|
||
->where('state', 0)
|
||
->where('prepared_at', 0)
|
||
->count();
|
||
|
||
if ($remaining == 0) {
|
||
Db::name('promotion_task')->where('task_id', $taskId)->update([
|
||
'state' => 5,
|
||
'utime' => time(),
|
||
]);
|
||
$this->log("tryFinalizeTask task_id={$taskId} -> state=5");
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 为单个 task 准备邮件:走消息队列异步执行。
|
||
*
|
||
* 流程:
|
||
* 1. 推一条 PromotionPrepareTask 到 promotion 队列(调度层)。
|
||
* 2. 调度 Job 再拆成 N 条 PromotionPrepareEmail 到 promotion_email 队列(LLM 层)。
|
||
*
|
||
* HTTP 调用方拿到的是"已入队"的结果,不再阻塞等待全部邮件准备完毕,
|
||
* 避免邮件量大时请求超时。task 的 state=5 由最后一封邮件在队列中触发
|
||
* tryFinalizeTask 时完成。
|
||
*
|
||
* @param int $taskId
|
||
* @return array ['queued' => bool, 'task_id' => int, 'pending' => int, 'error' => string|null]
|
||
*/
|
||
public function prepareTask($taskId)
|
||
{
|
||
$taskId = intval($taskId);
|
||
$task = Db::name('promotion_task')->where('task_id', $taskId)->find();
|
||
if (!$task) {
|
||
return ['queued' => false, 'task_id' => $taskId, 'pending' => 0, 'error' => 'task_not_found'];
|
||
}
|
||
if ($task['state'] != 0) {
|
||
return ['queued' => false, 'task_id' => $taskId, 'pending' => 0, 'error' => 'task_state_not_draft'];
|
||
}
|
||
|
||
$pending = Db::name('promotion_email_log')
|
||
->where('task_id', $taskId)
|
||
->where('state', 0)
|
||
->where('prepared_at', 0)
|
||
->count();
|
||
|
||
// 没有待准备邮件时,直接把 task 置为已准备并返回
|
||
if ($pending == 0) {
|
||
Db::name('promotion_task')->where('task_id', $taskId)->where('state', 0)->update([
|
||
'state' => 5,
|
||
'utime' => time(),
|
||
]);
|
||
$this->log("prepareTask task_id={$taskId} no_pending -> state=5");
|
||
return ['queued' => false, 'task_id' => $taskId, 'pending' => 0, 'error' => null];
|
||
}
|
||
|
||
$this->enqueuePrepareTask($taskId);
|
||
$this->log("prepareTask task_id={$taskId} queued pending={$pending}");
|
||
|
||
return ['queued' => true, 'task_id' => $taskId, 'pending' => $pending, 'error' => null];
|
||
}
|
||
|
||
/**
|
||
* 为指定日期的任务批量预生成邮件(供定时任务调用,如每天 22:00 准备明天的)
|
||
*
|
||
* 每个 task 通过队列异步执行 prepareTask,避免条目过多时 HTTP 请求超时。
|
||
*
|
||
* @param string $date Y-m-d,如 2026-03-12
|
||
* @return array ['tasks' => int, 'task_ids' => int[]]
|
||
*/
|
||
public function prepareTasksForDate($date)
|
||
{
|
||
$tasks = Db::name('promotion_task')
|
||
->where('send_date', $date)
|
||
->where('state', 0)
|
||
->select();
|
||
|
||
$taskIds = [];
|
||
foreach ($tasks as $task) {
|
||
$this->enqueuePrepareTask($task['task_id']);
|
||
$taskIds[] = $task['task_id'];
|
||
}
|
||
|
||
$this->log("prepareTasksForDate date={$date} tasks=" . count($tasks) . " queued task_ids=" . implode(',', $taskIds));
|
||
return [
|
||
'tasks' => count($tasks),
|
||
'task_ids' => $taskIds,
|
||
];
|
||
}
|
||
|
||
/**
|
||
* 将单个 task 的 prepare 推入队列异步执行(调度层)。
|
||
*
|
||
* 调度 Job 会进一步把每封邮件拆分成 PromotionPrepareEmail 推到 promotion_email 队列,
|
||
* 以便并行调用 LLM 生成个性化描述。
|
||
*/
|
||
public function enqueuePrepareTask($taskId, $delay = 0)
|
||
{
|
||
$jobClass = 'app\api\job\PromotionPrepareTask@fire';
|
||
$data = ['task_id' => intval($taskId)];
|
||
|
||
if ($delay > 0) {
|
||
Queue::later($delay, $jobClass, $data, 'promotionPrepareTask');
|
||
} else {
|
||
Queue::push($jobClass, $data, 'promotionPrepareTask');
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 将单封邮件的 prepare 推入队列异步执行(LLM 层)。
|
||
*
|
||
* 队列名:promotion_email
|
||
* 启动 worker:php think queue:listen --queue promotion_email
|
||
*/
|
||
public function enqueuePrepareEmail($logId, $delay = 0)
|
||
{
|
||
$jobClass = 'app\api\job\PromotionPrepareEmail@fire';
|
||
$data = ['log_id' => intval($logId)];
|
||
|
||
if ($delay > 0) {
|
||
Queue::later($delay, $jobClass, $data, 'promotionmmm');
|
||
} else {
|
||
Queue::push($jobClass, $data, 'promotionmmm');
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 触发指定日期的已准备任务开始发送(供定时任务调用,如每天 8:00 触发今天的)
|
||
* 会先对 send_date=date 且 state=0 的任务做一次补准备,再启动所有 state=5 的任务
|
||
* @param string $date Y-m-d
|
||
* @return array ['prepared' => int, 'started' => int, 'task_ids' => []]
|
||
*/
|
||
public function startTasksForDate($date)
|
||
{
|
||
// 补准备:当天日期但尚未准备的任务(如 22:00 后创建),推队列异步执行
|
||
$catchUpTasks = Db::name('promotion_task')
|
||
->where('send_date', $date)
|
||
->where('state', 0)
|
||
->select();
|
||
foreach ($catchUpTasks as $t) {
|
||
$this->enqueuePrepareTask($t['task_id']);
|
||
}
|
||
|
||
$tasks = Db::name('promotion_task')
|
||
->where('send_date', $date)
|
||
->where('state', 5)
|
||
->select();
|
||
|
||
$started = 0;
|
||
$taskIds = [];
|
||
|
||
foreach ($tasks as $task) {
|
||
Db::name('promotion_task')->where('task_id', $task['task_id'])->update([
|
||
'state' => 1,
|
||
'utime' => time(),
|
||
]);
|
||
$this->enqueueNextEmail($task['task_id'], 0);
|
||
$started++;
|
||
$taskIds[] = $task['task_id'];
|
||
}
|
||
|
||
$this->log("startTasksForDate date={$date} started={$started} task_ids=" . implode(',', $taskIds));
|
||
return [
|
||
'prepared' => count($catchUpTasks),
|
||
'started' => $started,
|
||
'task_ids' => $taskIds,
|
||
];
|
||
}
|
||
|
||
// ==================== Queue ====================
|
||
|
||
public function enqueueNextEmail($taskId, $delay = 0)
|
||
{
|
||
$jobClass = 'app\api\job\PromotionSend@fire';
|
||
$data = ['task_id' => $taskId];
|
||
|
||
if ($delay > 0) {
|
||
Queue::later($delay, $jobClass, $data, 'promotion');
|
||
} else {
|
||
Queue::push($jobClass, $data, 'promotion');
|
||
}
|
||
}
|
||
|
||
// ==================== SMTP ====================
|
||
|
||
public function pickSmtpAccountForTask($task)
|
||
{
|
||
$journalId = $task['journal_id'];
|
||
$smtpIds = $task['smtp_ids'] ? array_map('intval', explode(',', $task['smtp_ids'])) : [];
|
||
|
||
$query = Db::name('journal_email')
|
||
->where('journal_id', $journalId)
|
||
->where('state', 0);
|
||
|
||
if (!empty($smtpIds)) {
|
||
$query->where('j_email_id', 'in', $smtpIds);
|
||
}
|
||
|
||
$accounts = $query->select();
|
||
if (empty($accounts)) {
|
||
return null;
|
||
}
|
||
|
||
$best = null;
|
||
$bestRemaining = -1;
|
||
|
||
foreach ($accounts as $acc) {
|
||
$this->resetDailyCountIfNeeded($acc);
|
||
$remaining = $acc['daily_limit'] - $acc['today_sent'];
|
||
if ($remaining > 0 && $remaining > $bestRemaining) {
|
||
$best = $acc;
|
||
$bestRemaining = $remaining;
|
||
}
|
||
}
|
||
|
||
return $best;
|
||
}
|
||
|
||
public function resetDailyCountIfNeeded(&$account)
|
||
{
|
||
$todayDate = date('Y-m-d');
|
||
$cacheKey = 'smtp_reset_' . $account['j_email_id'];
|
||
$lastReset = Cache::get($cacheKey);
|
||
|
||
if ($lastReset !== $todayDate) {
|
||
Db::name('journal_email')
|
||
->where('j_email_id', $account['j_email_id'])
|
||
->update(['today_sent' => 0]);
|
||
$account['today_sent'] = 0;
|
||
Cache::set($cacheKey, $todayDate, 86400);
|
||
}
|
||
}
|
||
|
||
public function doSendEmail($account, $toEmail, $subject, $htmlContent)
|
||
{
|
||
try {
|
||
$mail = new PHPMailer(true);
|
||
$mail->isSMTP();
|
||
$mail->SMTPDebug = 0;
|
||
$mail->CharSet = 'UTF-8';
|
||
$mail->Host = $account['smtp_host'];
|
||
$mail->Port = intval($account['smtp_port']);
|
||
$mail->SMTPAuth = true;
|
||
$mail->Username = $account['smtp_user'];
|
||
$mail->Password = $account['smtp_password'];
|
||
|
||
if ($account['smtp_encryption'] === 'ssl') {
|
||
$mail->SMTPSecure = 'ssl';
|
||
} elseif ($account['smtp_encryption'] === 'tls') {
|
||
$mail->SMTPSecure = 'tls';
|
||
} else {
|
||
$mail->SMTPSecure = false;
|
||
$mail->SMTPAutoTLS = false;
|
||
}
|
||
|
||
$fromName = !empty($account['smtp_from_name']) ? $account['smtp_from_name'] : $account['smtp_user'];
|
||
$mail->setFrom($account['smtp_user'], $fromName);
|
||
$mail->addReplyTo($account['smtp_user'], $fromName);
|
||
$mail->addAddress($toEmail);
|
||
|
||
$mail->isHTML(true);
|
||
$mail->Subject = $subject;
|
||
$mail->Body = $htmlContent;
|
||
$mail->AltBody = strip_tags($htmlContent);
|
||
|
||
$mail->send();
|
||
|
||
return ['status' => 1, 'data' => 'success'];
|
||
} catch (\Exception $e) {
|
||
return ['status' => 0, 'data' => $e->getMessage()];
|
||
}
|
||
}
|
||
|
||
// ==================== Template Rendering ====================
|
||
|
||
public function renderFromTemplate($templateId, $journalId, $varsJson, $styleId = 0)
|
||
{
|
||
$tpl = Db::name('mail_template')->where('template_id', $templateId)->where('journal_id', $journalId)->where('state', 0)->find();
|
||
if (!$tpl) {
|
||
return ['code' => 1, 'msg' => 'Template not found'];
|
||
}
|
||
|
||
$vars = [];
|
||
if ($varsJson) {
|
||
$decoded = json_decode($varsJson, true);
|
||
if (is_array($decoded)) $vars = $decoded;
|
||
}
|
||
|
||
$subject = $this->renderVars($tpl['subject'], $vars);
|
||
$body = $this->renderVars($tpl['body_html'], $vars);
|
||
$finalBody = $body;
|
||
|
||
if ($styleId) {
|
||
$style = Db::name('mail_style')->where('style_id', $styleId)->where('state', 0)->find();
|
||
if ($style) {
|
||
$header = $style['header_html'] ? $this->renderVars($style['header_html'],$vars):'';
|
||
$footer = $style['footer_html'] ? $this->renderVars($style['footer_html'],$vars): '';
|
||
$finalBody = $header . $body . $footer;
|
||
}
|
||
}
|
||
|
||
return ['code' => 0, 'msg' => 'success', 'data' => ['subject' => $subject, 'body' => $finalBody]];
|
||
}
|
||
|
||
public function buildExpertVars($expert)
|
||
{
|
||
$llm = $expert['llm_description'] ?? '';
|
||
$advised = $expert['ai_advised_topics'] ?? '';
|
||
return [
|
||
'expert_title' => "Ph.D",
|
||
'expert_name' => $expert['name'] ?? '',
|
||
'expert_email' => $expert['email'] ?? '',
|
||
'expert_affiliation' => $expert['affiliation'] ?? '',
|
||
'expert_field' => $expert['fields'] ?? ($expert['field'] ?? ''),
|
||
'representative_work_title' => $expert['representative_work_title'] ?? '',
|
||
'llm_description' => $llm,
|
||
'ai_content_analysis' => $llm,
|
||
'ai_advised_topics' => $advised,
|
||
'llm_advised_topics' => $advised,
|
||
];
|
||
}
|
||
|
||
public function buildJournalVars($journal)
|
||
{
|
||
if (!$journal) return [];
|
||
$zb = Db::name("board_to_journal")
|
||
->where("journal_id",$journal['journal_id'])
|
||
->where("state",0)
|
||
->where('type',0)
|
||
->find();
|
||
|
||
return [
|
||
'journal_name' => $journal['title'] ?? '',
|
||
'journal_abbr' => $journal['jabbr'] ?? '',
|
||
'journal_url' => $journal['website'] ?? '',
|
||
'journal_email' => $journal['email'] ?? '',
|
||
'indexing_databases' => $journal['databases'] ?? '',
|
||
'submission_url' => "https://submission.tmrjournals.com/",
|
||
'eic_name' => $zb['realname'] ?? '',
|
||
'editor_name' => $journal['editor_name'],
|
||
'special_support_deadline'=>date("Y-m-d",strtotime("+30 days"))
|
||
];
|
||
}
|
||
|
||
public function renderVars($tpl, $vars)
|
||
{
|
||
if (!is_string($tpl) || $tpl === '') return '';
|
||
if (!is_array($vars) || empty($vars)) return $tpl;
|
||
|
||
$map = [];
|
||
foreach ($vars as $k => $v) {
|
||
$key = trim((string)$k);
|
||
if ($key === '') continue;
|
||
$map[$key] = (string)$v;
|
||
}
|
||
if (empty($map)) return $tpl;
|
||
|
||
// 双大括号:允许内部有空格,如 {{ var }} / {{ var }}
|
||
$tpl = preg_replace_callback('/\{\{\s*([A-Za-z0-9_\-\.]+)\s*\}\}/', function ($m) use ($map) {
|
||
return array_key_exists($m[1], $map) ? $map[$m[1]] : $m[0];
|
||
}, $tpl);
|
||
|
||
// 单大括号:保持严格匹配(不允许内部空格),避免误伤正文
|
||
$single = [];
|
||
foreach ($map as $k => $v) {
|
||
$single['{' . $k . '}'] = $v;
|
||
}
|
||
return str_replace(array_keys($single), array_values($single), $tpl);
|
||
}
|
||
|
||
// ==================== Logging ====================
|
||
|
||
public function log($msg)
|
||
{
|
||
$line = date('Y-m-d H:i:s') . ' ' . $msg . PHP_EOL;
|
||
@file_put_contents($this->logFile, $line, FILE_APPEND);
|
||
}
|
||
}
|