diff --git a/application/api/job/createFieldForQueue.php b/application/api/job/createFieldForQueue.php new file mode 100644 index 0000000..734ff92 --- /dev/null +++ b/application/api/job/createFieldForQueue.php @@ -0,0 +1,163 @@ +queueJob = new QueueJob; + // 初始化日志路径 + $this->logPath = ROOT_PATH . 'public/queue_log/createFieldForQueue_' . date('Ymd') . '.log'; + } + + /** + * 安全写入日志(带文件锁) + */ + private function log($message) + { + $time = date('H:i:s'); + $logMsg = "[$time] $message\n"; + $fp = fopen($this->logPath, 'w'); + if ($fp) { + flock($fp, LOCK_EX); // 排他锁防止并发写入冲突 + fwrite($fp, $logMsg); + flock($fp, LOCK_UN); + fclose($fp); + } + } + + // 任务日志添加 + public function addLog($aParam = []) + { + //实例化 + return $this->queueJob->addLog($aParam); + } + + // 任务日志修改 + public function updateLog($aParam = []) + { + return $this->queueJob->updateLog($aParam); + } + + /** + * 根据错误信息获取重试延迟 + */ + private function getRetryDelay($errorMsg) + { + $delayMap = [ + 'MySQL server has gone away' => 60, + 'timeout' => 30, + 'OpenAI' => 45, + 'network' => 60 + ]; + foreach ($delayMap as $keyword => $delay) { + if (strpos($errorMsg, $keyword) !== false) { + return $delay; + } + } + return 10; // 默认延迟 + } + + public function fire(Job $job, $data) + { + //日志 + $this->log("-----------队列任务开始-----------"); + + // 获取文章ID + $iRedisId = empty($data['redis_id']) ? 0 : $data['redis_id']; + if (empty($iRedisId)) { + $this->log("无效的redis_id,删除任务"); + $job->delete(); + return; + } + + // 生成唯一任务标识 + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iRedisId}"; + $sRedisValue = uniqid() . '_' . getmypid(); // 增加进程ID确保唯一性 + + // 尝试获取Redis锁(原子操作) + $isLocked = $this->queueJob->setRedisLock($sRedisKey, $sRedisValue, 86400); + if (!$isLocked) { + $currentValue = $this->queueJob->getRedisValue($sRedisKey); + $this->log("任务已被锁定,避免重复执行 | 锁键: {$sRedisKey} | 锁值: {$currentValue}"); + // 检查任务是否已超过最大重试次数 + if ($job->attempts() >= 2) { + $this->log("任务超过最大重试次数,停止重试"); + $job->delete(); + } else { + $delay = $this->getRetryDelay("任务已锁定"); + $this->log("{$delay}秒后重试任务 | 重试次数: {$job->attempts()}"); + $job->release($delay); + } + return; + } + + // 任务基础信息 + $aParam = [ + 'job_id' => $sRedisKey, + 'job_class' => $sClassName, + 'status' => 0, // 0:处理中 + 'create_time' => time(), + 'params' => json_encode($data, JSON_UNESCAPED_UNICODE) + ]; + // 创建任务日志记录 + $iLogId = $this->addLog($aParam); + if(!$iLogId) { + $this->log("日志创建失败,释放锁并删除任务:".json_encode($data)); + $this->queueJob->releaseRedisLock($sRedisKey, $sRedisValue); + $job->delete(); + return; + } + try { + // 执行核心任务 + $oOpenAi = new OpenAi; + $aResult = json_decode($oOpenAi->createFieldForQueue($data), true); + $iStatus = empty($aResult['status']) ? 0 : $aResult['status']; + $sMsg = empty($aResult['msg']) ? '内容生成失败' : $aResult['msg']; + //更新任务状态 + $aParam = ['log_id' => $iLogId,'status' => 1,'update_time' => time(),'error' => $sMsg]; + $this->updateLog($aParam); + //删除任务 + $job->delete(); + $this->log("任务执行成功,已删除任务 | 日志ID: {$iLogId}"); + } catch (\Exception $e) { + + //错误信息 + $sMsg = empty($e->getMessage()) ? '任务出错' : $e->getMessage(); // 错误信息 + $sTrace = empty($e->getTraceAsString()) ? '' : $e->getTraceAsString(); + $this->log("任务执行异常: {$sMsg} | 堆栈: {$sTrace}"); + // 记录失败日志 + $this->updateLog([ + 'log_id' => $iLogId, + 'status' => 2, + 'update_time' => time(), + 'error' => $sMsg.':'.$sTrace, + ]); + // 重试策略 + $attempts = $job->attempts(); + if ($attempts >= 2) { + $this->log("任务已重试{$attempts}次,停止重试"); + $job->delete(); + } else { + $delay = $this->getRetryDelay($sMsg); + $this->log("{$delay}秒后重试任务 | 重试次数: {$attempts}"); + $job->release($delay); + } + } finally { + // 无论成功失败都释放锁(确保锁值匹配) + $releaseResult = $this->queueJob->releaseRedisLock($sRedisKey, $sRedisValue); + if (!$releaseResult) { + $this->log("释放锁失败 | 锁键: {$sRedisKey} | 锁值: {$sRedisValue}"); + } else { + $this->log("成功释放锁 | 锁键: {$sRedisKey}"); + } + gc_collect_cycles(); + } + } +} \ No newline at end of file