diff --git a/application/api/job/ArticleAiCreateContent.php b/application/api/job/ArticleAiCreateContent.php index b9c8a72..6a59dfa 100644 --- a/application/api/job/ArticleAiCreateContent.php +++ b/application/api/job/ArticleAiCreateContent.php @@ -6,95 +6,159 @@ use app\common\QueueJob; class ArticleAiCreateContent { + private $logPath; // 日志路径 + private $queueJob; + public function __construct() + { + $this->queueJob = new QueueJob; + // 初始化日志路径 + $this->logPath = ROOT_PATH . 'public/queue_log/ArticleAiCreateContent_' . date('Ymd') . '.log'; + } + + /** + * 安全写入日志(带文件锁) + */ + private function log($message) + { + $time = date('H:i:s'); + $logMsg = "[$time] $message\n"; + $fp = fopen($this->logPath, 'w'); + if ($fp) { + flock($fp, LOCK_EX); // 排他锁防止并发写入冲突 + fwrite($fp, $logMsg); + flock($fp, LOCK_UN); + fclose($fp); + } + } - // 最多重试1次 - public $tries = 1; - // 任务日志添加 public function addLog($aParam = []) { //实例化 - $oQueueJob = new QueueJob; - $iLogId = $oQueueJob->addLog($aParam); - return $iLogId; + return $this->queueJob->addLog($aParam); } // 任务日志修改 public function updateLog($aParam = []) { - //实例化 - $oQueueJob = new QueueJob; - return $oQueueJob->updateLog($aParam); + return $this->queueJob->updateLog($aParam); + } + + /** + * 根据错误信息获取重试延迟 + */ + private function getRetryDelay($errorMsg) + { + $delayMap = [ + 'MySQL server has gone away' => 60, + 'timeout' => 30, + 'OpenAI' => 45, + 'network' => 60 + ]; + foreach ($delayMap as $keyword => $delay) { + if (strpos($errorMsg, $keyword) !== false) { + return $delay; + } + } + return 10; // 默认延迟 } - // 文章AI内容生成 public function fire(Job $job, $data) { + //日志 + $this->log("-----------队列任务开始-----------"); - $sLogPath = ROOT_PATH.'public/queue_log/ArticleAiCreateContent_'.date('Ymd').'.log'; - $sTime = date('H:i:s'); - file_put_contents($sLogPath,'-----------Queue job started:'.$sTime.'-----------'); - - //获取文章ID + // 获取文章ID $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; - //获取方法名 - $sClassName = get_class($this); - // 检查任务是否已处理(基于业务唯一标识) - $sRedisKey = $sClassName.'/'.$iArticleId; - $sRedisKey = md5($sRedisKey); - //判断Redis是否存在 - $oQueueJob = new QueueJob; - $result = $oQueueJob->setRedisLabel(['redis_key' => $sRedisKey]); - if($result != 1){ + if (empty($iArticleId)) { + $this->log("无效的article_id,删除任务"); $job->delete(); - file_put_contents($sLogPath,'-----------Queue job already:'.$result."===".$sRedisKey.'==='.$iArticleId."===".$sTime.'-----------'); return; } - //任务数组 + // 生成唯一任务标识 + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; + $sRedisValue = uniqid() . '_' . getmypid(); // 增加进程ID确保唯一性 + + // 尝试获取Redis锁(原子操作) + $isLocked = $this->queueJob->setRedisLock($sRedisKey, $sRedisValue, 86400); + if (!$isLocked) { + $currentValue = $this->queueJob->getRedisValue($sRedisKey); + $this->log("任务已被锁定,避免重复执行 | 锁键: {$sRedisKey} | 锁值: {$currentValue}"); + // 检查任务是否已超过最大重试次数 + if ($job->attempts() >= 2) { + $this->log("任务超过最大重试次数,停止重试"); + $job->delete(); + } else { + $delay = $this->getRetryDelay("任务已锁定"); + $this->log("{$delay}秒后重试任务 | 重试次数: {$job->attempts()}"); + $job->release($delay); + } + return; + } + + // 任务基础信息 $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, + 'job_id' => $sRedisKey, + 'job_class' => $sClassName, + 'status' => 0, // 0:处理中 'create_time' => time(), - 'params' => json_encode($data, JSON_UNESCAPED_UNICODE) + 'params' => json_encode($data, JSON_UNESCAPED_UNICODE) ]; - //执行任务 + // 创建任务日志记录 + $iLogId = $this->addLog($aParam); + if(!$iLogId) { + $this->log("日志创建失败,释放锁并删除任务:".json_encode($data)); + $this->queueJob->releaseRedisLock($sRedisKey, $sRedisValue); + $job->delete(); + return; + } try { - // 任务逻辑 - $sMsg = '文章AI内容生成成功'; - $iLogId = $this->addLog($aParam); + // 执行核心任务 //生成内容 $oAiarticle = new Aiarticle; $aResult = json_decode($oAiarticle->create($data),true); $iStatus = empty($aResult['status']) ? 0 : $aResult['status']; - $sMsg = empty($aResult['msg']) ? '文章AI内容生成失败' : $aResult['msg']; + $sMsg = empty($aResult['msg']) ? '内容生成失败' : $aResult['msg']; //更新任务状态 $aParam = ['log_id' => $iLogId,'status' => 1,'update_time' => time(),'error' => $sMsg]; - $this->updateLog($aParam); + $this->updateLog($aParam); //删除任务 $job->delete(); - - file_put_contents($sLogPath,'-----------Queue job end:'.$sTime.'-----------'); - + $this->log("任务执行成功,已删除任务 | 日志ID: {$iLogId}"); } catch (\Exception $e) { - // 2. 记录失败日志 - $aParam['status'] = 2; // 标记状态为"失败" + //错误信息 $sMsg = empty($e->getMessage()) ? '任务出错' : $e->getMessage(); // 错误信息 - $aParam['error'] = $sMsg; - $this->addLog($aParam); // 调用日志记录方法 - if ($job->attempts() > $this->tries) { - //如果任务尝试次数超过最大重试次数 - $job->delete(); // 删除任务,不再重试 + $sTrace = empty($e->getTraceAsString()) ? '' : $e->getTraceAsString(); + $this->log("任务执行异常: {$sMsg} | 堆栈: {$sTrace}"); + // 记录失败日志 + $this->updateLog([ + 'log_id' => $iLogId, + 'status' => 2, + 'update_time' => time(), + 'error' => $sMsg.':'.$sTrace, + ]); + // 重试策略 + $attempts = $job->attempts(); + if ($attempts >= 2) { + $this->log("任务已重试{$attempts}次,停止重试"); + $job->delete(); } else { - // 3. 如果尝试次数未超过最大重试次数,释放任务回队列 - $job->release(30); // 30秒后重新尝试执行任务 + $delay = $this->getRetryDelay($sMsg); + $this->log("{$delay}秒后重试任务 | 重试次数: {$attempts}"); + $job->release($delay); } - file_put_contents($sLogPath,'-----------Queue job error:'.$sMsg.'-----------'.$sTime); - }finally { - gc_collect_cycles(); // 强制垃圾回收 + } finally { + // 无论成功失败都释放锁(确保锁值匹配) + $releaseResult = $this->queueJob->releaseRedisLock($sRedisKey, $sRedisValue); + if (!$releaseResult) { + $this->log("释放锁失败 | 锁键: {$sRedisKey} | 锁值: {$sRedisValue}"); + } else { + $this->log("成功释放锁 | 锁键: {$sRedisKey}"); + } + gc_collect_cycles(); } } - } \ No newline at end of file