queueJob = new QueueJob; // 初始化日志路径 $this->logPath = ROOT_PATH . 'public/queue_log/ArticleAiCreateContent_' . date('Ymd') . '.log'; } /** * 安全写入日志(带文件锁) */ private function log($message) { $time = date('H:i:s'); $logMsg = "[$time] $message\n"; $fp = fopen($this->logPath, 'w'); if ($fp) { flock($fp, LOCK_EX); // 排他锁防止并发写入冲突 fwrite($fp, $logMsg); flock($fp, LOCK_UN); fclose($fp); } } // 任务日志添加 public function addLog($aParam = []) { //实例化 return $this->queueJob->addLog($aParam); } // 任务日志修改 public function updateLog($aParam = []) { return $this->queueJob->updateLog($aParam); } /** * 根据错误信息获取重试延迟 */ private function getRetryDelay($errorMsg) { $delayMap = [ 'MySQL server has gone away' => 60, 'timeout' => 30, 'OpenAI' => 45, 'network' => 60 ]; foreach ($delayMap as $keyword => $delay) { if (strpos($errorMsg, $keyword) !== false) { return $delay; } } return 10; // 默认延迟 } public function fire(Job $job, $data) { //日志 $this->log("-----------队列任务开始-----------"); // 获取文章ID $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; if (empty($iArticleId)) { $this->log("无效的article_id,删除任务"); $job->delete(); return; } // 生成唯一任务标识 $sClassName = get_class($this); $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; $sRedisValue = uniqid() . '_' . getmypid(); // 增加进程ID确保唯一性 // 尝试获取Redis锁(原子操作) $isLocked = $this->queueJob->setRedisLock($sRedisKey, $sRedisValue, 86400); if (!$isLocked) { $currentValue = $this->queueJob->getRedisValue($sRedisKey); $this->log("任务已被锁定,避免重复执行 | 锁键: {$sRedisKey} | 锁值: {$currentValue}"); // 检查任务是否已超过最大重试次数 if ($job->attempts() >= 2) { $this->log("任务超过最大重试次数,停止重试"); $job->delete(); } else { $delay = $this->getRetryDelay("任务已锁定"); $this->log("{$delay}秒后重试任务 | 重试次数: {$job->attempts()}"); $job->release($delay); } return; } // 任务基础信息 $aParam = [ 'job_id' => $sRedisKey, 'job_class' => $sClassName, 'status' => 0, // 0:处理中 'create_time' => time(), 'params' => json_encode($data, JSON_UNESCAPED_UNICODE) ]; // 创建任务日志记录 $iLogId = $this->addLog($aParam); if(!$iLogId) { $this->log("日志创建失败,释放锁并删除任务:".json_encode($data)); $this->queueJob->releaseRedisLock($sRedisKey, $sRedisValue); $job->delete(); return; } try { // 执行核心任务 //生成内容 $oAiarticle = new Aiarticle; $aResult = json_decode($oAiarticle->create($data),true); $iStatus = empty($aResult['status']) ? 0 : $aResult['status']; $sMsg = empty($aResult['msg']) ? '内容生成失败' : $aResult['msg']; //更新任务状态 $aParam = ['log_id' => $iLogId,'status' => 1,'update_time' => time(),'error' => $sMsg]; $this->updateLog($aParam); //删除任务 $job->delete(); $this->log("任务执行成功,已删除任务 | 日志ID: {$iLogId}"); } catch (\Exception $e) { //错误信息 $sMsg = empty($e->getMessage()) ? '任务出错' : $e->getMessage(); // 错误信息 $sTrace = empty($e->getTraceAsString()) ? '' : $e->getTraceAsString(); $this->log("任务执行异常: {$sMsg} | 堆栈: {$sTrace}"); // 记录失败日志 $this->updateLog([ 'log_id' => $iLogId, 'status' => 2, 'update_time' => time(), 'error' => $sMsg.':'.$sTrace, ]); // 重试策略 $attempts = $job->attempts(); if ($attempts >= 2) { $this->log("任务已重试{$attempts}次,停止重试"); $job->delete(); } else { $delay = $this->getRetryDelay($sMsg); $this->log("{$delay}秒后重试任务 | 重试次数: {$attempts}"); $job->release($delay); } } finally { // 无论成功失败都释放锁(确保锁值匹配) $releaseResult = $this->queueJob->releaseRedisLock($sRedisKey, $sRedisValue); if (!$releaseResult) { $this->log("释放锁失败 | 锁键: {$sRedisKey} | 锁值: {$sRedisValue}"); } else { $this->log("成功释放锁 | 锁键: {$sRedisKey}"); } gc_collect_cycles(); } } }