Files
tougao/application/api/job/SendReviewEmail.php
chengxl fcf4498544 队列
2025-07-24 17:24:48 +08:00

164 lines
6.7 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
namespace app\api\job;
use think\queue\Job;
use app\common\QueueJob;
use app\common\QueueRedis;
use app\common\Reviewer;
class SendReviewEmail
{
private $logPath;
private $oQueueJob;
private $QueueRedis;
private $maxRetries = 2;
private $logBuffer = [];
private $lastLogTime = 0;
private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配
private $lockExpire = 1800;
private $completedExprie = 3600;
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
public function __construct()
{
$this->logPath = ROOT_PATH . 'public/queue_log/SendReviewEmail_' . date('Ymd') . '.log';
$this->oQueueJob = new QueueJob;
$this->QueueRedis = QueueRedis::getInstance();
$this->lastLogTime = time();
// 确保日志目录存在
$this->oQueueJob->ensureLogDirExists($this->logPath);
}
public function fire(Job $job, $data)
{
$startTime = microtime(true);
$this->oQueueJob->log("-----------队列任务开始-----------");
// 检查Redis连接状态
if (!$this->QueueRedis->getConnectionStatus()) {
$this->oQueueJob->log("Redis连接失败10秒后重试");
$job->release(10);
$this->oQueueJob->flushLog();
return;
}
// 获取文章ID
// 获取文章ID
$iArticleId = empty($data['article_id']) ? 0 : $data['article_id'];
//作者邮箱
$email = empty($data['email']) ? '' : $data['email'];
//邮件主题
$title = empty($data['title']) ? '' : $data['title'];
//发送来源
$from_name = empty($data['from_name']) ? '' : $data['from_name'];
//邮件内容
$content = empty($data['content']) ? '' : $data['content'];
//邮箱
$memail = empty($data['memail']) ? '' : $data['memail'];
//密码
$mpassword = empty($data['mpassword']) ? '' : $data['mpassword'];
//审稿记录表主键ID
$art_rev_id = empty($data['art_rev_id']) ? 0 : $data['art_rev_id'];
//审稿人ID
$reviewer_id = empty($data['reviewer_id']) ? 0 : $data['reviewer_id'];
//邮件类型
$type = empty($data['type']) ? 1 : $data['type'];
if (empty($iArticleId)) {
$this->oQueueJob->log("无效的article_id删除任务");
$job->delete();
return;
}
// 生成唯一任务标识
$sClassName = get_class($this);
$sRedisKey = "queue_job:{$sClassName}:{$iArticleId}:{$art_rev_id}:{$reviewer_id}:{$email}";
$sRedisValue = uniqid() . '_' . getmypid();
$lockExpire = $this->lockExpire;
$isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire);
if (!$isLocked) {
$jobStatus = $this->QueueRedis->getJobStatus($sRedisKey);
if (in_array($jobStatus, ['completed', 'failed'])) {
$this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}");
$job->delete();
} else {
$attempts = $job->attempts();
if ($attempts >= $this->maxRetries) {
$this->oQueueJob->log("超过最大重试次数,停止重试");
$job->delete();
} else {
$lockTtl = $this->QueueRedis->getLockTtl($sRedisKey);
$delay = $lockTtl > 0 ? $lockTtl + 5 : 30;
$this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})");
$job->release($delay);
}
}
$this->oQueueJob->flushLog();
return;
}
$aParam = [
'job_id' => $sRedisKey,
'job_class' => $sClassName,
'status' => 0,
'create_time' => time(),
'params' => json_encode($data, self::JSON_OPTIONS)
];
$iLogId = $this->oQueueJob->addLog($aParam);
if (!$iLogId) {
$this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS));
$this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue);
$job->delete();
$this->oQueueJob->flushLog();
return;
}
try {
//查询是否发送过邮件
$oReviewer = new Reviewer;
if($type != 3){
$aLog = json_decode($oReviewer->getLog(['article_id' => $iArticleId,'art_rev_id' => $art_rev_id,'reviewer_id' => $reviewer_id,'is_success' => 1,'type' => $type]),true);
$aLog = empty($aLog['data']) ? [] : $aLog['data'];
$sMsg = '邮件已发送:'.json_encode($aLog);
}
if(empty($aLog)){
$aResult = sendEmail($email,$title,$from_name,$content,$memail,$mpassword);
$iStatus = empty($aResult['status']) ? 1 : $aResult['status'];
$iIsSuccess = 2;
$sMsg = empty($aResult['data']) ? '失败' : $aResult['data'];
if($iStatus == 1){
$iIsSuccess = 1;
$sMsg = '成功';
}
//记录邮件发送日志
$aEmailLog = ['article_id' => $iArticleId,'art_rev_id' => $art_rev_id,'reviewer_id' => $reviewer_id,'type' => $type,'email' => $email,'content' => $content,'create_time' => time(),'is_success' => $iIsSuccess,'msg' => $sMsg];
//添加邮件发送日志
$iId = $oReviewer->addLog($aEmailLog);
}
//更新日志
$this->oQueueJob->updateLog([
'log_id' => $iLogId,
'status' => 1,
'update_time' => time(),
'error' => $sMsg
]);
$this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie);
$job->delete();
$this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}");
} catch (\RuntimeException $e) {
$this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
} catch (\LogicException $e) {
$this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job);
} catch (\Exception $e) {
$this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
} finally {
$executionTime = microtime(true) - $startTime;
$this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "");
$this->oQueueJob->flushLog();
gc_collect_cycles();
}
}
}