diff --git a/application/api/job/ArticleAiCreateContent.php b/application/api/job/ArticleAiCreateContent.php index 6a59dfa..4e3dc24 100644 --- a/application/api/job/ArticleAiCreateContent.php +++ b/application/api/job/ArticleAiCreateContent.php @@ -1,163 +1,125 @@ queueJob = new QueueJob; - // 初始化日志路径 $this->logPath = ROOT_PATH . 'public/queue_log/ArticleAiCreateContent_' . date('Ymd') . '.log'; - } - - /** - * 安全写入日志(带文件锁) - */ - private function log($message) - { - $time = date('H:i:s'); - $logMsg = "[$time] $message\n"; - $fp = fopen($this->logPath, 'w'); - if ($fp) { - flock($fp, LOCK_EX); // 排他锁防止并发写入冲突 - fwrite($fp, $logMsg); - flock($fp, LOCK_UN); - fclose($fp); - } - } - - // 任务日志添加 - public function addLog($aParam = []) - { - //实例化 - return $this->queueJob->addLog($aParam); - } - - // 任务日志修改 - public function updateLog($aParam = []) - { - return $this->queueJob->updateLog($aParam); - } - - /** - * 根据错误信息获取重试延迟 - */ - private function getRetryDelay($errorMsg) - { - $delayMap = [ - 'MySQL server has gone away' => 60, - 'timeout' => 30, - 'OpenAI' => 45, - 'network' => 60 - ]; - foreach ($delayMap as $keyword => $delay) { - if (strpos($errorMsg, $keyword) !== false) { - return $delay; - } - } - return 10; // 默认延迟 + $this->oQueueJob = new QueueJob; + $this->QueueRedis = QueueRedis::getInstance(); + $this->lastLogTime = time(); + // 确保日志目录存在 + $this->oQueueJob->ensureLogDirExists($this->logPath); } public function fire(Job $job, $data) { - //日志 - $this->log("-----------队列任务开始-----------"); + $startTime = microtime(true); + $this->oQueueJob->log("-----------队列任务开始-----------"); + + // 检查Redis连接状态 + if (!$this->QueueRedis->getConnectionStatus()) { + $this->oQueueJob->log("Redis连接失败,10秒后重试"); + $job->release(10); + $this->oQueueJob->flushLog(); + return; + } // 获取文章ID $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; if (empty($iArticleId)) { - $this->log("无效的article_id,删除任务"); + $this->oQueueJob->log("无效的article_id,删除任务"); $job->delete(); return; } - // 生成唯一任务标识 $sClassName = get_class($this); $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; - $sRedisValue = uniqid() . '_' . getmypid(); // 增加进程ID确保唯一性 + $sRedisValue = uniqid() . '_' . getmypid(); + $lockExpire = $this->lockExpire; - // 尝试获取Redis锁(原子操作) - $isLocked = $this->queueJob->setRedisLock($sRedisKey, $sRedisValue, 86400); + $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); if (!$isLocked) { - $currentValue = $this->queueJob->getRedisValue($sRedisKey); - $this->log("任务已被锁定,避免重复执行 | 锁键: {$sRedisKey} | 锁值: {$currentValue}"); - // 检查任务是否已超过最大重试次数 - if ($job->attempts() >= 2) { - $this->log("任务超过最大重试次数,停止重试"); + $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); + if (in_array($jobStatus, ['completed', 'failed'])) { + $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); $job->delete(); } else { - $delay = $this->getRetryDelay("任务已锁定"); - $this->log("{$delay}秒后重试任务 | 重试次数: {$job->attempts()}"); - $job->release($delay); + $attempts = $job->attempts(); + if ($attempts >= $this->maxRetries) { + $this->oQueueJob->log("超过最大重试次数,停止重试"); + $job->delete(); + } else { + $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); + $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; + $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); + $job->release($delay); + } } + $this->oQueueJob->flushLog(); return; } - // 任务基础信息 $aParam = [ 'job_id' => $sRedisKey, 'job_class' => $sClassName, - 'status' => 0, // 0:处理中 + 'status' => 0, 'create_time' => time(), - 'params' => json_encode($data, JSON_UNESCAPED_UNICODE) + 'params' => json_encode($data, self::JSON_OPTIONS) ]; - // 创建任务日志记录 - $iLogId = $this->addLog($aParam); - if(!$iLogId) { - $this->log("日志创建失败,释放锁并删除任务:".json_encode($data)); - $this->queueJob->releaseRedisLock($sRedisKey, $sRedisValue); + + $iLogId = $this->oQueueJob->addLog($aParam); + if (!$iLogId) { + $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); + $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); $job->delete(); + $this->oQueueJob->flushLog(); return; } + try { - // 执行核心任务 + //生成内容 $oAiarticle = new Aiarticle; $aResult = json_decode($oAiarticle->create($data),true); - $iStatus = empty($aResult['status']) ? 0 : $aResult['status']; $sMsg = empty($aResult['msg']) ? '内容生成失败' : $aResult['msg']; - //更新任务状态 - $aParam = ['log_id' => $iLogId,'status' => 1,'update_time' => time(),'error' => $sMsg]; - $this->updateLog($aParam); - //删除任务 - $job->delete(); - $this->log("任务执行成功,已删除任务 | 日志ID: {$iLogId}"); - } catch (\Exception $e) { - - //错误信息 - $sMsg = empty($e->getMessage()) ? '任务出错' : $e->getMessage(); // 错误信息 - $sTrace = empty($e->getTraceAsString()) ? '' : $e->getTraceAsString(); - $this->log("任务执行异常: {$sMsg} | 堆栈: {$sTrace}"); - // 记录失败日志 - $this->updateLog([ + $this->oQueueJob->updateLog([ 'log_id' => $iLogId, - 'status' => 2, + 'status' => 1, 'update_time' => time(), - 'error' => $sMsg.':'.$sTrace, + 'error' => $sMsg ]); - // 重试策略 - $attempts = $job->attempts(); - if ($attempts >= 2) { - $this->log("任务已重试{$attempts}次,停止重试"); - $job->delete(); - } else { - $delay = $this->getRetryDelay($sMsg); - $this->log("{$delay}秒后重试任务 | 重试次数: {$attempts}"); - $job->release($delay); - } + + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); + $job->delete(); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId}"); + + } catch (\RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + } catch (\LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); + } catch (\Exception $e) { + $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); } finally { - // 无论成功失败都释放锁(确保锁值匹配) - $releaseResult = $this->queueJob->releaseRedisLock($sRedisKey, $sRedisValue); - if (!$releaseResult) { - $this->log("释放锁失败 | 锁键: {$sRedisKey} | 锁值: {$sRedisValue}"); - } else { - $this->log("成功释放锁 | 锁键: {$sRedisKey}"); - } + $executionTime = microtime(true) - $startTime; + $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); + $this->oQueueJob->flushLog(); gc_collect_cycles(); } } diff --git a/application/api/job/RecommendReviewer.php b/application/api/job/RecommendReviewer.php index 0aeab06..11e4dc6 100644 --- a/application/api/job/RecommendReviewer.php +++ b/application/api/job/RecommendReviewer.php @@ -1,76 +1,106 @@ addLog($aParam); - return $iLogId; - } - - // 任务日志修改 - public function updateLog($aParam = []) - { - //实例化 - $oQueueJob = new QueueJob; - return $oQueueJob->updateLog($aParam); + $this->logPath = ROOT_PATH . 'public/queue_log/RecommendReviewer_' . date('Ymd') . '.log'; + $this->oQueueJob = new QueueJob; + $this->QueueRedis = QueueRedis::getInstance(); + $this->lastLogTime = time(); + // 确保日志目录存在 + $this->oQueueJob->ensureLogDirExists($this->logPath); } - // 推荐审稿人任务 public function fire(Job $job, $data) { + $startTime = microtime(true); + $this->oQueueJob->log("-----------队列任务开始-----------"); - $sLogPath = ROOT_PATH.'public/queue_log/RecommendReviewer_'.date('Ymd').'.log'; - $sTime = date('H:i:s'); - file_put_contents($sLogPath,'-----------Queue job started:'.$sTime.'-----------'); - - //获取文章ID - $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; - //获取方法名 - $sClassName = get_class($this); - // 检查任务是否已处理(基于业务唯一标识) - $sRedisKey = $sClassName.'/'.$iArticleId; - $sRedisKey = md5($sRedisKey); - //判断Redis是否存在 - $oQueueJob = new QueueJob; - $result = $oQueueJob->setRedisLabel(['redis_key' => $sRedisKey]); - if($result != 1){ - $job->delete(); - file_put_contents($sLogPath,'-----------Queue job already:'.$result."===".$sRedisKey.'==='.$iArticleId."===".$sTime.'-----------'); + // 检查Redis连接状态 + if (!$this->QueueRedis->getConnectionStatus()) { + $this->oQueueJob->log("Redis连接失败,10秒后重试"); + $job->release(10); + $this->oQueueJob->flushLog(); return; } - //任务数组 - $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, - 'create_time' => time(), - 'params' => json_encode($data, JSON_UNESCAPED_UNICODE) - ]; - //执行任务 - try { - //添加任务日志 - $sMsg = '推荐审稿人任务处理成功'; - $iLogId = $this->addLog($aParam); + // 获取文章ID + $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; + if (empty($iArticleId)) { + $this->oQueueJob->log("无效的article_id,删除任务"); + $job->delete(); + return; + } + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; + $sRedisValue = uniqid() . '_' . getmypid(); + $lockExpire = $this->lockExpire; + + $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); + if (!$isLocked) { + $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); + if (in_array($jobStatus, ['completed', 'failed'])) { + $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); + $job->delete(); + } else { + $attempts = $job->attempts(); + if ($attempts >= $this->maxRetries) { + $this->oQueueJob->log("超过最大重试次数,停止重试"); + $job->delete(); + } else { + $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); + $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; + $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); + $job->release($delay); + } + } + $this->oQueueJob->flushLog(); + return; + } + + $aParam = [ + 'job_id' => $sRedisKey, + 'job_class' => $sClassName, + 'status' => 0, + 'create_time' => time(), + 'params' => json_encode($data, self::JSON_OPTIONS) + ]; + + $iLogId = $this->oQueueJob->addLog($aParam); + if (!$iLogId) { + $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); + $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); + $job->delete(); + $this->oQueueJob->flushLog(); + return; + } + + try { + //获取推荐审稿人信息 $aParam = ['article_id' => $iArticleId,'page' => 1,'size' => empty($data['size']) ? 5 : $data['size']]; $oReviewer = new Reviewer; $aResult = json_decode($oReviewer->recommend($aParam),true); $iStatus = empty($aResult['status']) ? 0 : $aResult['status']; $sMsg = empty($aResult['msg']) ? '' : $aResult['msg']; - //处理数据 $iCount = empty($aResult['data']['total']) ? 0 : $aResult['data']['total'];//数量 $iSize = empty($aResult['data']['size']) ? 0 : $aResult['data']['size'];//推荐数量 @@ -100,33 +130,29 @@ class RecommendReviewer $sMsg .= empty($aResult['msg']) ? 'Reviewer data insertion failed' : $aResult['msg']; } } - - //更新任务状态 - $aParam = ['log_id' => $iLogId,'status' => 1,'update_time' => time(),'error' => $sMsg]; - $oQueueJob->updateLog($aParam); - - //删除任务 + //更新日志 + $this->oQueueJob->updateLog([ + 'log_id' => $iLogId, + 'status' => 1, + 'update_time' => time(), + 'error' => $sMsg + ]); + + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); $job->delete(); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId}"); - file_put_contents($sLogPath,'-----------Queue job end:'.$sTime.'-----------'); + } catch (\RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + } catch (\LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); } catch (\Exception $e) { - - // 2. 记录失败日志 - $aParam['status'] = 2; // 标记状态为"失败" - $sMsg = empty($e->getMessage()) ? '任务出错' : $e->getMessage(); // 错误信息 - $aParam['error'] = $sMsg; - $this->addLog($aParam); // 调用日志记录方法 - if ($job->attempts() > $this->tries) { - //如果任务尝试次数超过最大重试次数 - $job->delete(); // 删除任务,不再重试 - } else { - // 3. 如果尝试次数未超过最大重试次数,释放任务回队列 - $job->release(30); // 30秒后重新尝试执行任务 - } - file_put_contents($sLogPath,'-----------Queue job error:'.$sMsg.'-----------'.$sTime); - }finally { - gc_collect_cycles(); // 强制垃圾回收 + $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + } finally { + $executionTime = microtime(true) - $startTime; + $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); + $this->oQueueJob->flushLog(); + gc_collect_cycles(); } } - } \ No newline at end of file diff --git a/application/api/job/RelatedArticle.php b/application/api/job/RelatedArticle.php index faf5ad6..a677871 100644 --- a/application/api/job/RelatedArticle.php +++ b/application/api/job/RelatedArticle.php @@ -1,104 +1,129 @@ addLog($aParam); - return $iLogId; - } - - // 任务日志修改 - public function updateLog($aParam = []) - { - //实例化 - $oQueueJob = new QueueJob; - return $oQueueJob->updateLog($aParam); + $this->logPath = ROOT_PATH . 'public/queue_log/RelatedArticle_' . date('Ymd') . '.log'; + $this->oQueueJob = new QueueJob; + $this->QueueRedis = QueueRedis::getInstance(); + $this->lastLogTime = time(); + // 确保日志目录存在 + $this->oQueueJob->ensureLogDirExists($this->logPath); } - // 相关文章发送邮件任务 public function fire(Job $job, $data) { + $startTime = microtime(true); + $this->oQueueJob->log("-----------队列任务开始-----------"); - $sLogPath = ROOT_PATH.'public/queue_log/RelatedArticle_'.date('Ymd').'.log'; - $sTime = date('H:i:s'); - file_put_contents($sLogPath,'-----------Queue job started:'.$sTime.'-----------'); - - //文章ID - $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; - - //获取方法名 - $sClassName = get_class($this); - // 检查任务是否已处理(基于业务唯一标识) - $sRedisKey = $sClassName.'/'.$iArticleId; - $sRedisKey = md5($sRedisKey); - //判断Redis是否存在 - $oQueueJob = new QueueJob; - $result = $oQueueJob->setRedisLabel(['redis_key' => $sRedisKey]); - if($result != 1){ - $job->delete(); - file_put_contents($sLogPath,'-----------Queue job already:'.$result."===".$sRedisKey.'==='.$iArticleId."===".$sTime.'-----------'); + // 检查Redis连接状态 + if (!$this->QueueRedis->getConnectionStatus()) { + $this->oQueueJob->log("Redis连接失败,10秒后重试"); + $job->release(10); + $this->oQueueJob->flushLog(); return; } - //任务数组 - $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, - 'create_time' => time(), - 'params' => empty($data) ? '暂无参数' : json_encode($data, JSON_UNESCAPED_UNICODE) - ]; - //执行任务 - try { - //添加任务日志 - $sMsg = '关联文章任务处理成功'; - $iLogId = $this->addLog($aParam); + // 获取文章ID + $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; + if (empty($iArticleId)) { + $this->oQueueJob->log("无效的article_id,删除任务"); + $job->delete(); + return; + } + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; + $sRedisValue = uniqid() . '_' . getmypid(); + $lockExpire = $this->lockExpire; + + $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); + if (!$isLocked) { + $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); + if (in_array($jobStatus, ['completed', 'failed'])) { + $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); + $job->delete(); + } else { + $attempts = $job->attempts(); + if ($attempts >= $this->maxRetries) { + $this->oQueueJob->log("超过最大重试次数,停止重试"); + $job->delete(); + } else { + $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); + $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; + $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); + $job->release($delay); + } + } + $this->oQueueJob->flushLog(); + return; + } + + $aParam = [ + 'job_id' => $sRedisKey, + 'job_class' => $sClassName, + 'status' => 0, + 'create_time' => time(), + 'params' => json_encode($data, self::JSON_OPTIONS) + ]; + + $iLogId = $this->oQueueJob->addLog($aParam); + if (!$iLogId) { + $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); + $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); + $job->delete(); + $this->oQueueJob->flushLog(); + return; + } + + try { + //查询文章所关联的文章 $oJournalArticle = new JournalArticle; $aResult = json_decode(JournalArticle::get($data),true); $iStatus = empty($aResult['status']) ? 0 : $aResult['status']; $sMsg = empty($aResult['msg']) ? '获取相关文章信息失败' : $aResult['msg']; - //更新任务状态 - $aParam = ['log_id' => $iLogId,'status' => 1,'update_time' => time(),'error' => $sMsg]; - $oQueueJob->updateLog($aParam); - - //删除任务 + //更新日志 + $this->oQueueJob->updateLog([ + 'log_id' => $iLogId, + 'status' => 1, + 'update_time' => time(), + 'error' => $sMsg + ]); + + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); $job->delete(); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId}"); - file_put_contents($sLogPath,'-----------Queue job end:'.$sTime.'-----------'); - + } catch (\RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + } catch (\LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); } catch (\Exception $e) { - - // 2. 记录失败日志 - $aParam['status'] = 2; // 标记状态为"失败" - $sMsg = empty($e->getMessage()) ? '任务出错' : $e->getMessage(); // 错误信息 - $aParam['error'] = $sMsg; - $this->addLog($aParam); // 调用日志记录方法 - if ($job->attempts() > $this->tries) { - //如果任务尝试次数超过最大重试次数 - $job->delete(); // 删除任务,不再重试 - } else { - // 3. 如果尝试次数未超过最大重试次数,释放任务回队列 - $job->release(30); // 30秒后重新尝试执行任务 - } - file_put_contents($sLogPath,'-----------Queue job error:'.$sMsg.'-----------'.$sTime); - }finally { - gc_collect_cycles(); // 强制垃圾回收 + $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + } finally { + $executionTime = microtime(true) - $startTime; + $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); + $this->oQueueJob->flushLog(); + gc_collect_cycles(); } } - } \ No newline at end of file diff --git a/application/api/job/ReviewerScore.php b/application/api/job/ReviewerScore.php index bb106c5..32eabb4 100644 --- a/application/api/job/ReviewerScore.php +++ b/application/api/job/ReviewerScore.php @@ -1,103 +1,133 @@ addLog($aParam); - return $iLogId; - } - - // 任务日志修改 - public function updateLog($aParam = []) - { - //实例化 - $oQueueJob = new QueueJob; - return $oQueueJob->updateLog($aParam); + $this->logPath = ROOT_PATH . 'public/queue_log/ReviewerScore_' . date('Ymd') . '.log'; + $this->oQueueJob = new QueueJob; + $this->QueueRedis = QueueRedis::getInstance(); + $this->lastLogTime = time(); + // 确保日志目录存在 + $this->oQueueJob->ensureLogDirExists($this->logPath); } - // 审稿人评分 public function fire(Job $job, $data) { + $startTime = microtime(true); + $this->oQueueJob->log("-----------队列任务开始-----------"); - $sLogPath = ROOT_PATH.'public/queue_log/ReviewerScore_'.date('Ymd').'.log'; - $sTime = date('H:i:s'); - file_put_contents($sLogPath,'-----------Queue job started:'.$sTime.'-----------'); - - //参数 - $iArticleId = empty($data['article_id']) ? 0 : $data['article_id'];//文章ID - $iReviewerId = empty($data['reviewer_id']) ? 0 : $data['reviewer_id'];//审稿人ID - $iArtRevId = empty($data['art_rev_id']) ? 0 : $data['art_rev_id'];//主键ID - //获取方法名 - $sClassName = get_class($this); - // 检查任务是否已处理(基于业务唯一标识) - $sRedisKey = $sClassName.'/'.$iArticleId.'/'.$iReviewerId.'/'.$iArtRevId; - $sRedisKey = md5($sRedisKey); - //判断Redis是否存在 - $oQueueJob = new QueueJob; - $result = $oQueueJob->setRedisLabel(['redis_key' => $sRedisKey]); - if($result != 1){ - $job->delete(); - file_put_contents($sLogPath,'-----------Queue job already:'.$result."===".$sRedisKey.'==='.$iArticleId."===".$sTime.'-----------'); + // 检查Redis连接状态 + if (!$this->QueueRedis->getConnectionStatus()) { + $this->oQueueJob->log("Redis连接失败,10秒后重试"); + $job->release(10); + $this->oQueueJob->flushLog(); return; } - //任务数组 - $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, - 'create_time' => time(), - 'params' => json_encode($data, JSON_UNESCAPED_UNICODE) - ]; - //执行任务 - try { - //添加任务日志 - $sMsg = '审稿人评分任务处理成功'; - $iLogId = $this->addLog($aParam); + // 获取文章ID + $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; + //审稿人ID + $iReviewerId = empty($data['reviewer_id']) ? 0 : $data['reviewer_id']; + //主键ID + $iArtRevId = empty($data['art_rev_id']) ? 0 : $data['art_rev_id']; + if (empty($iArticleId)) { + $this->oQueueJob->log("无效的article_id,删除任务"); + $job->delete(); + return; + } + // 生成唯一任务标识 + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}:{$iReviewerId}:{$iArtRevId}"; + $sRedisValue = uniqid() . '_' . getmypid(); + $lockExpire = $this->lockExpire; + $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); + if (!$isLocked) { + $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); + if (in_array($jobStatus, ['completed', 'failed'])) { + $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); + $job->delete(); + } else { + $attempts = $job->attempts(); + if ($attempts >= $this->maxRetries) { + $this->oQueueJob->log("超过最大重试次数,停止重试"); + $job->delete(); + } else { + $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); + $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; + $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); + $job->release($delay); + } + } + $this->oQueueJob->flushLog(); + return; + } + + $aParam = [ + 'job_id' => $sRedisKey, + 'job_class' => $sClassName, + 'status' => 0, + 'create_time' => time(), + 'params' => json_encode($data, self::JSON_OPTIONS) + ]; + + $iLogId = $this->oQueueJob->addLog($aParam); + if (!$iLogId) { + $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); + $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); + $job->delete(); + $this->oQueueJob->flushLog(); + return; + } + + try { + + // 执行核心任务 $aParam = ['article_id' => $iArticleId,'reviewer_id' => $iReviewerId,'art_rev_id' => $iArtRevId]; $oReviewer = new Reviewer; $aResult = json_decode($oReviewer->score($aParam),true); $sMsg = empty($aResult['msg']) ? '给审稿人评分处理失败' : $aResult['msg']; - - //更新任务状态 - $aParam = ['log_id' => $iLogId,'status' => 1,'update_time' => time(),'error' => $sMsg]; - $oQueueJob->updateLog($aParam); - - //删除任务 - $job->delete(); - - file_put_contents($sLogPath,'-----------Queue job end:'.$sTime.'-----------'); - - } catch (\Exception $e) { - - // 2. 记录失败日志 - $aParam['status'] = 2; // 标记状态为"失败" - $sMsg = empty($e->getMessage()) ? '任务出错' : $e->getMessage(); // 错误信息 - $aParam['error'] = $sMsg; - $this->addLog($aParam); // 调用日志记录方法 - if ($job->attempts() > $this->tries) { - //如果任务尝试次数超过最大重试次数 - $job->delete(); // 删除任务,不再重试 - } else { - // 3. 如果尝试次数未超过最大重试次数,释放任务回队列 - $job->release(30); // 30秒后重新尝试执行任务 - } - file_put_contents($sLogPath,'-----------Queue job error:'.$sMsg.'-----------'.$sTime); - }finally { - gc_collect_cycles(); // 强制垃圾回收 + //更新日志 + $this->oQueueJob->updateLog([ + 'log_id' => $iLogId, + 'status' => 1, + 'update_time' => time(), + 'error' => $sMsg + ]); + + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); + $job->delete(); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId}"); + + } catch (\RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + } catch (\LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); + } catch (\Exception $e) { + $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + } finally { + $executionTime = microtime(true) - $startTime; + $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); + $this->oQueueJob->flushLog(); + gc_collect_cycles(); } } - } \ No newline at end of file diff --git a/application/api/job/RevisionReviewer.php b/application/api/job/RevisionReviewer.php index e1deb52..78e29d2 100644 --- a/application/api/job/RevisionReviewer.php +++ b/application/api/job/RevisionReviewer.php @@ -1,100 +1,129 @@ addLog($aParam); - return $iLogId; - } - - // 任务日志修改 - public function updateLog($aParam = []) - { - //实例化 - $oQueueJob = new QueueJob; - return $oQueueJob->updateLog($aParam); + $this->logPath = ROOT_PATH . 'public/queue_log/RevisionReviewer_' . date('Ymd') . '.log'; + $this->oQueueJob = new QueueJob; + $this->QueueRedis = QueueRedis::getInstance(); + $this->lastLogTime = time(); + // 确保日志目录存在 + $this->oQueueJob->ensureLogDirExists($this->logPath); } - // 文章退修任务 public function fire(Job $job, $data) { + $startTime = microtime(true); + $this->oQueueJob->log("-----------队列任务开始-----------"); - $sLogPath = ROOT_PATH.'public/queue_log/RevisionReviewer_'.date('Ymd').'.log'; - $sTime = date('H:i:s'); - file_put_contents($sLogPath,'-----------Queue job started:'.$sTime.'-----------'); - - //参数 - $iArticleId = empty($data['article_id']) ? 0 : $data['article_id'];//文章ID - //获取方法名 - $sClassName = get_class($this); - // 检查任务是否已处理(基于业务唯一标识) - $sRedisKey = $sClassName.'/'.$iArticleId; - $sRedisKey = md5($sRedisKey); - //判断Redis是否存在 - $oQueueJob = new QueueJob; - $result = $oQueueJob->setRedisLabel(['redis_key' => $sRedisKey]); - if($result != 1){ - $job->delete(); - file_put_contents($sLogPath,'-----------Queue job already:'.$result."===".$sRedisKey.'==='.$iArticleId."===".$sTime.'-----------'); + // 检查Redis连接状态 + if (!$this->QueueRedis->getConnectionStatus()) { + $this->oQueueJob->log("Redis连接失败,10秒后重试"); + $job->release(10); + $this->oQueueJob->flushLog(); return; } - //任务数组 - $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, - 'create_time' => time(), - 'params' => json_encode($data, JSON_UNESCAPED_UNICODE) - ]; - //执行任务 - try { - //添加任务日志 - $sMsg = '文章退修任务处理成功'; - $iLogId = $this->addLog($aParam); + // 获取文章ID + $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; + if (empty($iArticleId)) { + $this->oQueueJob->log("无效的article_id,删除任务"); + $job->delete(); + return; + } + // 生成唯一任务标识 + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; + $sRedisValue = uniqid() . '_' . getmypid(); + $lockExpire = $this->lockExpire; + $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); + if (!$isLocked) { + $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); + if (in_array($jobStatus, ['completed', 'failed'])) { + $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); + $job->delete(); + } else { + $attempts = $job->attempts(); + if ($attempts >= $this->maxRetries) { + $this->oQueueJob->log("超过最大重试次数,停止重试"); + $job->delete(); + } else { + $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); + $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; + $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); + $job->release($delay); + } + } + $this->oQueueJob->flushLog(); + return; + } + + $aParam = [ + 'job_id' => $sRedisKey, + 'job_class' => $sClassName, + 'status' => 0, + 'create_time' => time(), + 'params' => json_encode($data, self::JSON_OPTIONS) + ]; + + $iLogId = $this->oQueueJob->addLog($aParam); + if (!$iLogId) { + $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); + $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); + $job->delete(); + $this->oQueueJob->flushLog(); + return; + } + + try { + //获取符合条件的文章审稿人信息 $aParam = ['article_id' => $iArticleId]; $oReviewer = new Reviewer; $aResult = json_decode($oReviewer->revisionForReviewer($aParam),true); $sMsg = empty($aResult['msg']) ? '审稿人同意审稿但超时未审的数据失败' : $aResult['msg']; - - //更新任务状态 - $aParam = ['log_id' => $iLogId,'status' => 1,'update_time' => time(),'error' => $sMsg]; - $oQueueJob->updateLog($aParam); - - //删除任务 + + //更新日志 + $this->oQueueJob->updateLog([ + 'log_id' => $iLogId, + 'status' => 1, + 'update_time' => time(), + 'error' => $sMsg + ]); + + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); $job->delete(); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId}"); - file_put_contents($sLogPath,'-----------Queue job end:'.$sTime.'-----------'); - + } catch (\RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + } catch (\LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); } catch (\Exception $e) { - - // 2. 记录失败日志 - $aParam['status'] = 2; // 标记状态为"失败" - $sMsg = empty($e->getMessage()) ? '任务出错' : $e->getMessage(); // 错误信息 - $aParam['error'] = $sMsg; - $this->addLog($aParam); // 调用日志记录方法 - if ($job->attempts() > $this->tries) { - //如果任务尝试次数超过最大重试次数 - $job->delete(); // 删除任务,不再重试 - } else { - // 3. 如果尝试次数未超过最大重试次数,释放任务回队列 - $job->release(30); // 30秒后重新尝试执行任务 - } - file_put_contents($sLogPath,'-----------Queue job error:'.$sMsg.'-----------'.$sTime); - }finally { - gc_collect_cycles(); // 强制垃圾回收 + $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + } finally { + $executionTime = microtime(true) - $startTime; + $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); + $this->oQueueJob->flushLog(); + gc_collect_cycles(); } } } \ No newline at end of file diff --git a/application/api/job/SendRelatedArticleEmail.php b/application/api/job/SendRelatedArticleEmail.php index 7f7103c..c94b861 100644 --- a/application/api/job/SendRelatedArticleEmail.php +++ b/application/api/job/SendRelatedArticleEmail.php @@ -1,41 +1,47 @@ addLog($aParam); - return $iLogId; - } - - // 任务日志修改 - public function updateLog($aParam = []) - { - //实例化 - $oQueueJob = new QueueJob; - return $oQueueJob->updateLog($aParam); + $this->logPath = ROOT_PATH . 'public/queue_log/SendRelatedArticleEmail_' . date('Ymd') . '.log'; + $this->oQueueJob = new QueueJob; + $this->QueueRedis = QueueRedis::getInstance(); + $this->lastLogTime = time(); + // 确保日志目录存在 + $this->oQueueJob->ensureLogDirExists($this->logPath); } - // 关联文章任务 public function fire(Job $job, $data) { + $startTime = microtime(true); + $this->oQueueJob->log("-----------队列任务开始-----------"); - $sLogPath = ROOT_PATH.'public/queue_log/SendRelatedArticleEmail_'.date('Ymd').'.log'; - $sTime = date('H:i:s'); - file_put_contents($sLogPath,'-----------Queue job started:'.$sTime.'-----------'); + // 检查Redis连接状态 + if (!$this->QueueRedis->getConnectionStatus()) { + $this->oQueueJob->log("Redis连接失败,10秒后重试"); + $job->release(10); + $this->oQueueJob->flushLog(); + return; + } - //文章ID + // 获取文章ID $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; //作者邮箱 $email = empty($data['email']) ? '' : $data['email']; @@ -57,35 +63,58 @@ class SendRelatedArticleEmail $journal_id = empty($data['journal_id']) ? '' : $data['journal_id']; //期刊issn $journal_issn = empty($data['journal_issn']) ? '' : $data['journal_issn']; - - //获取方法名 - $sClassName = get_class($this); - // 检查任务是否已处理(基于业务唯一标识) - $sRedisKey = $sClassName.'/'.$iArticleId.'/'.$related_article_id.'/'.$article_author_id.'/'.$email; - $sRedisKey = md5($sRedisKey); - //判断Redis是否存在 - $oQueueJob = new QueueJob; - $result = $oQueueJob->setRedisLabel(['redis_key' => $sRedisKey]); - if($result != 1){ + if (empty($iArticleId) || empty($email)) { + $this->oQueueJob->log("无效的article_id,删除任务"); $job->delete(); - file_put_contents($sLogPath,'-----------Queue job already:'.$result."===".$sRedisKey.'==='.$iArticleId."===".$sTime.'-----------'); + return; + } + // 生成唯一任务标识 + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}:{$related_article_id}:{$article_author_id}:{$email}"; + $sRedisValue = uniqid() . '_' . getmypid(); + $lockExpire = $this->lockExpire; + + $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); + if (!$isLocked) { + $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); + if (in_array($jobStatus, ['completed', 'failed'])) { + $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); + $job->delete(); + } else { + $attempts = $job->attempts(); + if ($attempts >= $this->maxRetries) { + $this->oQueueJob->log("超过最大重试次数,停止重试"); + $job->delete(); + } else { + $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); + $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; + $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); + $job->release($delay); + } + } + $this->oQueueJob->flushLog(); return; } - //任务数组 $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, + 'job_id' => $sRedisKey, + 'job_class' => $sClassName, + 'status' => 0, 'create_time' => time(), - 'params' => empty($data) ? '暂无参数' : json_encode($data, JSON_UNESCAPED_UNICODE) + 'params' => json_encode($data, self::JSON_OPTIONS) ]; - //执行任务 - try { - //添加任务日志 - $sMsg = '关联文章任务处理成功'; - $iLogId = $this->addLog($aParam); + + $iLogId = $this->oQueueJob->addLog($aParam); + if (!$iLogId) { + $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); + $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); + $job->delete(); + $this->oQueueJob->flushLog(); + return; + } + try { + //查询是否发送过邮件 $oJournalArticle = new JournalArticle; $aLog = json_decode($oJournalArticle::getLog(['article_id' => $iArticleId,'article_author_id' => $article_author_id,'related_article_id' => $related_article_id,'is_success' => 1]),true); @@ -105,34 +134,30 @@ class SendRelatedArticleEmail //添加邮件发送日志 $iId = JournalArticle::addLog($aEmailLog); } - - //更新任务状态 - $aParam = ['log_id' => $iLogId,'status' => 1,'update_time' => time(),'error' => $sMsg]; - $oQueueJob->updateLog($aParam); - - //删除任务 + + //更新日志 + $this->oQueueJob->updateLog([ + 'log_id' => $iLogId, + 'status' => 1, + 'update_time' => time(), + 'error' => $sMsg + ]); + + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); $job->delete(); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId}"); - file_put_contents($sLogPath,'-----------Queue job end:'.$sTime.'-----------'); - + } catch (\RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + } catch (\LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); } catch (\Exception $e) { - - // 2. 记录失败日志 - $aParam['status'] = 2; // 标记状态为"失败" - $sMsg = empty($e->getMessage()) ? '任务出错' : $e->getMessage(); // 错误信息 - $aParam['error'] = $sMsg; - $this->addLog($aParam); // 调用日志记录方法 - if ($job->attempts() > $this->tries) { - //如果任务尝试次数超过最大重试次数 - $job->delete(); // 删除任务,不再重试 - } else { - // 3. 如果尝试次数未超过最大重试次数,释放任务回队列 - $job->release(30); // 30秒后重新尝试执行任务 - } - file_put_contents($sLogPath,'-----------Queue job error:'.$sMsg.'-----------'.$sTime); - }finally { - gc_collect_cycles(); // 强制垃圾回收 + $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + } finally { + $executionTime = microtime(true) - $startTime; + $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); + $this->oQueueJob->flushLog(); + gc_collect_cycles(); } } - } \ No newline at end of file diff --git a/application/api/job/SendReviewEmail.php b/application/api/job/SendReviewEmail.php index b8958c8..c98deda 100644 --- a/application/api/job/SendReviewEmail.php +++ b/application/api/job/SendReviewEmail.php @@ -1,40 +1,48 @@ addLog($aParam); - return $iLogId; - } - - // 任务日志修改 - public function updateLog($aParam = []) - { - //实例化 - $oQueueJob = new QueueJob; - return $oQueueJob->updateLog($aParam); + $this->logPath = ROOT_PATH . 'public/queue_log/SendReviewEmail_' . date('Ymd') . '.log'; + $this->oQueueJob = new QueueJob; + $this->QueueRedis = QueueRedis::getInstance(); + $this->lastLogTime = time(); + // 确保日志目录存在 + $this->oQueueJob->ensureLogDirExists($this->logPath); } - // 发送审稿邀请邮件任务 public function fire(Job $job, $data) { + $startTime = microtime(true); + $this->oQueueJob->log("-----------队列任务开始-----------"); - $sLogPath = ROOT_PATH.'public/queue_log/SendReviewEmail_'.date('Ymd').'.log'; - $sTime = date('H:i:s'); - file_put_contents($sLogPath,'-----------Queue job started:'.$sTime.'-----------'); + // 检查Redis连接状态 + if (!$this->QueueRedis->getConnectionStatus()) { + $this->oQueueJob->log("Redis连接失败,10秒后重试"); + $job->release(10); + $this->oQueueJob->flushLog(); + return; + } - //文章ID + // 获取文章ID + // 获取文章ID $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; //作者邮箱 $email = empty($data['email']) ? '' : $data['email']; @@ -54,35 +62,59 @@ class SendReviewEmail $reviewer_id = empty($data['reviewer_id']) ? 0 : $data['reviewer_id']; //邮件类型 $type = empty($data['type']) ? 1 : $data['type']; - - //获取方法名 - $sClassName = get_class($this); - // 检查任务是否已处理(基于业务唯一标识) - $sRedisKey = $sClassName.'/'.$iArticleId.'/'.$art_rev_id.'/'.$reviewer_id.'/'.$email; - $sRedisKey = md5($sRedisKey); - //判断Redis是否存在 - $oQueueJob = new QueueJob; - $result = $oQueueJob->setRedisLabel(['redis_key' => $sRedisKey]); - if($result != 1){ + if (empty($iArticleId)) { + $this->oQueueJob->log("无效的article_id,删除任务"); $job->delete(); - file_put_contents($sLogPath,'-----------Queue job already:'.$result."===".$sRedisKey.'==='.$iArticleId."===".$sTime.'-----------'); return; } - //任务数组 + // 生成唯一任务标识 + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}:{$art_rev_id}:{$reviewer_id}:{$email}"; + $sRedisValue = uniqid() . '_' . getmypid(); + $lockExpire = $this->lockExpire; + + $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); + if (!$isLocked) { + $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); + if (in_array($jobStatus, ['completed', 'failed'])) { + $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); + $job->delete(); + } else { + $attempts = $job->attempts(); + if ($attempts >= $this->maxRetries) { + $this->oQueueJob->log("超过最大重试次数,停止重试"); + $job->delete(); + } else { + $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); + $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; + $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); + $job->release($delay); + } + } + $this->oQueueJob->flushLog(); + return; + } + $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, + 'job_id' => $sRedisKey, + 'job_class' => $sClassName, + 'status' => 0, 'create_time' => time(), - 'params' => empty($data) ? '暂无参数' : json_encode($data, JSON_UNESCAPED_UNICODE) + 'params' => json_encode($data, self::JSON_OPTIONS) ]; - //执行任务 + + $iLogId = $this->oQueueJob->addLog($aParam); + if (!$iLogId) { + $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); + $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); + $job->delete(); + $this->oQueueJob->flushLog(); + return; + } + try { - //添加任务日志 - $sMsg = '发送审稿邀请邮件任务处理成功'; - $iLogId = $this->addLog($aParam); - //发送邮件 + //查询是否发送过邮件 $oReviewer = new Reviewer; if($type != 3){ @@ -104,34 +136,29 @@ class SendReviewEmail //添加邮件发送日志 $iId = $oReviewer->addLog($aEmailLog); } + //更新日志 + $this->oQueueJob->updateLog([ + 'log_id' => $iLogId, + 'status' => 1, + 'update_time' => time(), + 'error' => $sMsg + ]); - //更新任务状态 - $aParam = ['log_id' => $iLogId,'status' => 1,'update_time' => time(),'error' => $sMsg]; - $oQueueJob->updateLog($aParam); - - //删除任务 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); $job->delete(); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId}"); - file_put_contents($sLogPath,'-----------Queue job end:'.$sTime.'-----------'); - + } catch (\RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + } catch (\LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); } catch (\Exception $e) { - - // 2. 记录失败日志 - $aParam['status'] = 2; // 标记状态为"失败" - $sMsg = empty($e->getMessage()) ? '任务出错' : $e->getMessage(); // 错误信息 - $aParam['error'] = $sMsg; - $this->addLog($aParam); // 调用日志记录方法 - if ($job->attempts() > $this->tries) { - //如果任务尝试次数超过最大重试次数 - $job->delete(); // 删除任务,不再重试 - } else { - // 3. 如果尝试次数未超过最大重试次数,释放任务回队列 - $job->release(30); // 30秒后重新尝试执行任务 - } - file_put_contents($sLogPath,'-----------Queue job error:'.$sMsg.'-----------'.$sTime); - }finally { - gc_collect_cycles(); // 强制垃圾回收 + $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + } finally { + $executionTime = microtime(true) - $startTime; + $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); + $this->oQueueJob->flushLog(); + gc_collect_cycles(); } } - } \ No newline at end of file diff --git a/application/api/job/WechatDraft.php b/application/api/job/WechatDraft.php index ca6f737..3e84e78 100644 --- a/application/api/job/WechatDraft.php +++ b/application/api/job/WechatDraft.php @@ -1,102 +1,128 @@ addLog($aParam); - return $iLogId; - } - - // 任务日志修改 - public function updateLog($aParam = []) - { - //实例化 - $oQueueJob = new QueueJob; - return $oQueueJob->updateLog($aParam); + $this->logPath = ROOT_PATH . 'public/queue_log/WechatDraft_' . date('Ymd') . '.log'; + $this->oQueueJob = new QueueJob; + $this->QueueRedis = QueueRedis::getInstance(); + $this->lastLogTime = time(); + // 确保日志目录存在 + $this->oQueueJob->ensureLogDirExists($this->logPath); } - // 上传文章到草稿箱任务入口 public function fire(Job $job, $data) { + $startTime = microtime(true); + $this->oQueueJob->log("-----------队列任务开始-----------"); - $sLogPath = ROOT_PATH.'public/queue_log/WechatDraft_'.date('Ymd').'.log'; - $sTime = date('H:i:s'); - file_put_contents($sLogPath,'-----------Queue job started:'.$sTime.'-----------'); - - //文章ID - $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; - - //获取方法名 - $sClassName = get_class($this); - // 检查任务是否已处理(基于业务唯一标识) - $sRedisKey = $sClassName.'/'.$iArticleId; - $sRedisKey = md5($sRedisKey); - //判断Redis是否存在 - $oQueueJob = new QueueJob; - $result = $oQueueJob->setRedisLabel(['redis_key' => $sRedisKey]); - if($result != 1){ - $job->delete(); - file_put_contents($sLogPath,'-----------Queue job already:'.$result."===".$sRedisKey.'==='.$iArticleId."===".$sTime.'-----------'); + // 检查Redis连接状态 + if (!$this->QueueRedis->getConnectionStatus()) { + $this->oQueueJob->log("Redis连接失败,10秒后重试"); + $job->release(10); + $this->oQueueJob->flushLog(); + return; + } + + // 获取文章ID + $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; + if (empty($iArticleId)) { + $this->oQueueJob->log("无效的article_id,删除任务"); + $job->delete(); + return; + } + + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; + $sRedisValue = uniqid() . '_' . getmypid(); + $lockExpire = $this->lockExpire; + + $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); + if (!$isLocked) { + $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); + if (in_array($jobStatus, ['completed', 'failed'])) { + $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); + $job->delete(); + } else { + $attempts = $job->attempts(); + if ($attempts >= $this->maxRetries) { + $this->oQueueJob->log("超过最大重试次数,停止重试"); + $job->delete(); + } else { + $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); + $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; + $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); + $job->release($delay); + } + } + $this->oQueueJob->flushLog(); return; } - //任务数组 $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, + 'job_id' => $sRedisKey, + 'job_class' => $sClassName, + 'status' => 0, 'create_time' => time(), - 'params' => empty($data) ? '暂无参数' : json_encode($data, JSON_UNESCAPED_UNICODE) + 'params' => json_encode($data, self::JSON_OPTIONS) ]; - //执行任务 + + $iLogId = $this->oQueueJob->addLog($aParam); + if (!$iLogId) { + $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); + $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); + $job->delete(); + $this->oQueueJob->flushLog(); + return; + } + try { - //添加任务日志 - $sMsg = '上传草稿箱处理成功'; - $iLogId = $this->addLog($aParam); //上传草稿箱 $oAiarticle = new Aiarticle; $aResult = json_decode($oAiarticle->syncWechat($data),true); - $iStatus = empty($aResult['status']) ? 0 : $aResult['status']; $sMsg = empty($aResult['msg']) ? '上传草稿箱失败' : $aResult['msg']; - - //更新任务状态 - $aParam = ['log_id' => $iLogId,'status' => 1,'update_time' => time(),'error' => $sMsg]; - $oQueueJob->updateLog($aParam); + + //更新日志 + $this->oQueueJob->updateLog([ + 'log_id' => $iLogId, + 'status' => 1, + 'update_time' => time(), + 'error' => $sMsg + ]); - //删除任务 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); $job->delete(); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId}"); - file_put_contents($sLogPath,'-----------Queue job end:'.$sTime.'-----------'); - + } catch (\RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + } catch (\LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); } catch (\Exception $e) { - - // 2. 记录失败日志 - $aParam['status'] = 2; // 标记状态为"失败" - $sMsg = empty($e->getMessage()) ? '任务出错' : $e->getMessage(); // 错误信息 - $aParam['error'] = $sMsg; - $this->addLog($aParam); // 调用日志记录方法 - if ($job->attempts() > $this->tries) { - //如果任务尝试次数超过最大重试次数 - $job->delete(); // 删除任务,不再重试 - } else { - // 3. 如果尝试次数未超过最大重试次数,释放任务回队列 - $job->release(30); // 30秒后重新尝试执行任务 - } - file_put_contents($sLogPath,'-----------Queue job error:'.$sMsg.'-----------'.$sTime); - }finally { - gc_collect_cycles(); // 强制垃圾回收 + $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + } finally { + $executionTime = microtime(true) - $startTime; + $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); + $this->oQueueJob->flushLog(); + gc_collect_cycles(); } } - } \ No newline at end of file diff --git a/application/api/job/WechatDraftPublish.php b/application/api/job/WechatDraftPublish.php index b5da1a0..77177e8 100644 --- a/application/api/job/WechatDraftPublish.php +++ b/application/api/job/WechatDraftPublish.php @@ -1,104 +1,128 @@ addLog($aParam); - return $iLogId; - } - - // 任务日志修改 - public function updateLog($aParam = []) - { - //实例化 - $oQueueJob = new QueueJob; - return $oQueueJob->updateLog($aParam); + $this->logPath = ROOT_PATH . 'public/queue_log/WechatDraftPublish_' . date('Ymd') . '.log'; + $this->oQueueJob = new QueueJob; + $this->QueueRedis = QueueRedis::getInstance(); + $this->lastLogTime = time(); + // 确保日志目录存在 + $this->oQueueJob->ensureLogDirExists($this->logPath); } - // 发布草稿箱任务入口 public function fire(Job $job, $data) { + $startTime = microtime(true); + $this->oQueueJob->log("-----------队列任务开始-----------"); - $sLogPath = ROOT_PATH.'public/queue_log/WechatDraftPublish_'.date('Ymd').'.log'; - $sTime = date('H:i:s'); - file_put_contents($sLogPath,'-----------Queue job started:'.$sTime.'-----------'); - - //文章ID - $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; - - //获取方法名 - $sClassName = get_class($this); - // 检查任务是否已处理(基于业务唯一标识) - $sRedisKey = $sClassName.'/'.$iArticleId; - $sRedisKey = md5($sRedisKey); - //判断Redis是否存在 - $oQueueJob = new QueueJob; - $result = $oQueueJob->setRedisLabel(['redis_key' => $sRedisKey]); - if($result != 1){ - $job->delete(); - file_put_contents($sLogPath,'-----------Queue job already:'.$result."===".$sRedisKey.'==='.$iArticleId."===".$sTime.'-----------'); + // 检查Redis连接状态 + if (!$this->QueueRedis->getConnectionStatus()) { + $this->oQueueJob->log("Redis连接失败,10秒后重试"); + $job->release(10); + $this->oQueueJob->flushLog(); + return; + } + + // 获取文章ID + $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; + if (empty($iArticleId)) { + $this->oQueueJob->log("无效的article_id,删除任务"); + $job->delete(); + return; + } + + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; + $sRedisValue = uniqid() . '_' . getmypid(); + $lockExpire = $this->lockExpire; + + $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); + if (!$isLocked) { + $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); + if (in_array($jobStatus, ['completed', 'failed'])) { + $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); + $job->delete(); + } else { + $attempts = $job->attempts(); + if ($attempts >= $this->maxRetries) { + $this->oQueueJob->log("超过最大重试次数,停止重试"); + $job->delete(); + } else { + $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); + $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; + $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); + $job->release($delay); + } + } + $this->oQueueJob->flushLog(); return; } - //任务数组 $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, + 'job_id' => $sRedisKey, + 'job_class' => $sClassName, + 'status' => 0, 'create_time' => time(), - 'params' => empty($data) ? '暂无参数' : json_encode($data, JSON_UNESCAPED_UNICODE) + 'params' => json_encode($data, self::JSON_OPTIONS) ]; - //执行任务 + + $iLogId = $this->oQueueJob->addLog($aParam); + if (!$iLogId) { + $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); + $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); + $job->delete(); + $this->oQueueJob->flushLog(); + return; + } + try { - //添加任务日志 - $sMsg = '草稿箱发布任务处理成功'; - $iLogId = $this->addLog($aParam); - //发布草稿箱 $oAiarticle = new Aiarticle; - $aResult = json_decode($oAiarticle->publishDraft($data),true); - $iStatus = empty($aResult['status']) ? 0 : $aResult['status']; + $aResult = json_decode($oAiarticle->publishDraft($data),true); $sMsg = empty($aResult['msg']) ? '草稿箱发布失败' : $aResult['msg']; - + //更新任务状态 - $aParam = ['log_id' => $iLogId,'status' => 1,'update_time' => time(),'error' => $sMsg]; - $oQueueJob->updateLog($aParam); - - //删除任务 + $this->oQueueJob->updateLog([ + 'log_id' => $iLogId, + 'status' => 1, + 'update_time' => time(), + 'error' => $sMsg + ]); + + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); $job->delete(); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId}"); - file_put_contents($sLogPath,'-----------Queue job end:'.$sTime.'-----------'); - + } catch (\RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + } catch (\LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); } catch (\Exception $e) { - - // 2. 记录失败日志 - $aParam['status'] = 2; // 标记状态为"失败" - $sMsg = empty($e->getMessage()) ? '任务出错' : $e->getMessage(); // 错误信息 - $aParam['error'] = $sMsg; - $this->addLog($aParam); // 调用日志记录方法 - if ($job->attempts() > $this->tries) { - //如果任务尝试次数超过最大重试次数 - $job->delete(); // 删除任务,不再重试 - } else { - // 3. 如果尝试次数未超过最大重试次数,释放任务回队列 - $job->release(30); // 30秒后重新尝试执行任务 - } - file_put_contents($sLogPath,'-----------Queue job error:'.$sMsg.'-----------'.$sTime); - }finally { - gc_collect_cycles(); // 强制垃圾回收 + $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + } finally { + $executionTime = microtime(true) - $startTime; + $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); + $this->oQueueJob->flushLog(); + gc_collect_cycles(); } } - } \ No newline at end of file diff --git a/application/api/job/WechatMaterial.php b/application/api/job/WechatMaterial.php index 8342174..9d8fe8a 100644 --- a/application/api/job/WechatMaterial.php +++ b/application/api/job/WechatMaterial.php @@ -1,102 +1,128 @@ addLog($aParam); - return $iLogId; - } - - // 任务日志修改 - public function updateLog($aParam = []) - { - //实例化 - $oQueueJob = new QueueJob; - return $oQueueJob->updateLog($aParam); + $this->logPath = ROOT_PATH . 'public/queue_log/WechatMaterial_' . date('Ymd') . '.log'; + $this->oQueueJob = new QueueJob; + $this->QueueRedis = QueueRedis::getInstance(); + $this->lastLogTime = time(); + // 确保日志目录存在 + $this->oQueueJob->ensureLogDirExists($this->logPath); } - // 上传素材任务入口 public function fire(Job $job, $data) { + $startTime = microtime(true); + $this->oQueueJob->log("-----------队列任务开始-----------"); - $sLogPath = ROOT_PATH.'public/queue_log/WechatMaterial_'.date('Ymd').'.log'; - $sTime = date('H:i:s'); - file_put_contents($sLogPath,'-----------Queue job started:'.$sTime.'-----------'); - - //文章ID - $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; - - //获取方法名 - $sClassName = get_class($this); - // 检查任务是否已处理(基于业务唯一标识) - $sRedisKey = $sClassName.'/'.$iArticleId; - $sRedisKey = md5($sRedisKey); - //判断Redis是否存在 - $oQueueJob = new QueueJob; - $result = $oQueueJob->setRedisLabel(['redis_key' => $sRedisKey]); - if($result != 1){ - $job->delete(); - file_put_contents($sLogPath,'-----------Queue job already:'.$result."===".$sRedisKey.'==='.$iArticleId."===".$sTime.'-----------'); + // 检查Redis连接状态 + if (!$this->QueueRedis->getConnectionStatus()) { + $this->oQueueJob->log("Redis连接失败,10秒后重试"); + $job->release(10); + $this->oQueueJob->flushLog(); + return; + } + + // 获取文章ID + $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; + if (empty($iArticleId)) { + $this->oQueueJob->log("无效的article_id,删除任务"); + $job->delete(); + return; + } + + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; + $sRedisValue = uniqid() . '_' . getmypid(); + $lockExpire = $this->lockExpire; + + $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); + if (!$isLocked) { + $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); + if (in_array($jobStatus, ['completed', 'failed'])) { + $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); + $job->delete(); + } else { + $attempts = $job->attempts(); + if ($attempts >= $this->maxRetries) { + $this->oQueueJob->log("超过最大重试次数,停止重试"); + $job->delete(); + } else { + $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); + $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; + $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); + $job->release($delay); + } + } + $this->oQueueJob->flushLog(); return; } - //任务数组 $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, + 'job_id' => $sRedisKey, + 'job_class' => $sClassName, + 'status' => 0, 'create_time' => time(), - 'params' => empty($data) ? '暂无参数' : json_encode($data, JSON_UNESCAPED_UNICODE) + 'params' => json_encode($data, self::JSON_OPTIONS) ]; - //执行任务 + + $iLogId = $this->oQueueJob->addLog($aParam); + if (!$iLogId) { + $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); + $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); + $job->delete(); + $this->oQueueJob->flushLog(); + return; + } + try { - //添加任务日志 - $sMsg = '上传素材任务处理成功'; - $iLogId = $this->addLog($aParam); //上传素材 $oAiarticle = new Aiarticle; $aResult = json_decode($oAiarticle->uploadMaterial($data),true); - $iStatus = empty($aResult['status']) ? 0 : $aResult['status']; $sMsg = empty($aResult['msg']) ? '上传素材失败' : $aResult['msg']; //更新任务状态 - $aParam = ['log_id' => $iLogId,'status' => 1,'update_time' => time(),'error' => $sMsg]; - $oQueueJob->updateLog($aParam); - - //删除任务 + $this->oQueueJob->updateLog([ + 'log_id' => $iLogId, + 'status' => 1, + 'update_time' => time(), + 'error' => $sMsg + ]); + + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); $job->delete(); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId}"); - file_put_contents($sLogPath,'-----------Queue job end:'.$sTime.'-----------'); - + } catch (\RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + } catch (\LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); } catch (\Exception $e) { - - // 2. 记录失败日志 - $aParam['status'] = 2; // 标记状态为"失败" - $sMsg = empty($e->getMessage()) ? '任务出错' : $e->getMessage(); // 错误信息 - $aParam['error'] = $sMsg; - $this->addLog($aParam); // 调用日志记录方法 - if ($job->attempts() > $this->tries) { - //如果任务尝试次数超过最大重试次数 - $job->delete(); // 删除任务,不再重试 - } else { - // 3. 如果尝试次数未超过最大重试次数,释放任务回队列 - $job->release(30); // 30秒后重新尝试执行任务 - } - file_put_contents($sLogPath,'-----------Queue job error:'.$sMsg.'-----------'.$sTime); - }finally { - gc_collect_cycles(); // 强制垃圾回收 + $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + } finally { + $executionTime = microtime(true) - $startTime; + $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); + $this->oQueueJob->flushLog(); + gc_collect_cycles(); } } } \ No newline at end of file diff --git a/application/api/job/WechatQueryStatus.php b/application/api/job/WechatQueryStatus.php index ca41059..4003eda 100644 --- a/application/api/job/WechatQueryStatus.php +++ b/application/api/job/WechatQueryStatus.php @@ -1,103 +1,128 @@ addLog($aParam); - return $iLogId; - } - - // 任务日志修改 - public function updateLog($aParam = []) - { - //实例化 - $oQueueJob = new QueueJob; - return $oQueueJob->updateLog($aParam); + $this->logPath = ROOT_PATH . 'public/queue_log/WechatQueryStatus_' . date('Ymd') . '.log'; + $this->oQueueJob = new QueueJob; + $this->QueueRedis = QueueRedis::getInstance(); + $this->lastLogTime = time(); + // 确保日志目录存在 + $this->oQueueJob->ensureLogDirExists($this->logPath); } - // 草稿箱文章发布状态任务入口 public function fire(Job $job, $data) { + $startTime = microtime(true); + $this->oQueueJob->log("-----------队列任务开始-----------"); - $sLogPath = ROOT_PATH.'public/queue_log/WechatQueryStatus_'.date('Ymd').'.log'; - $sTime = date('H:i:s'); - file_put_contents($sLogPath,'-----------Queue job started:'.$sTime.'-----------'); - - //文章ID - $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; - - //获取方法名 - $sClassName = get_class($this); - // 检查任务是否已处理(基于业务唯一标识) - $sRedisKey = $sClassName.'/'.$iArticleId; - $sRedisKey = md5($sRedisKey); - //判断Redis是否存在 - $oQueueJob = new QueueJob; - $result = $oQueueJob->setRedisLabel(['redis_key' => $sRedisKey]); - if($result != 1){ - $job->delete(); - file_put_contents($sLogPath,'-----------Queue job already:'.$result."===".$sRedisKey.'==='.$iArticleId."===".$sTime.'-----------'); + // 检查Redis连接状态 + if (!$this->QueueRedis->getConnectionStatus()) { + $this->oQueueJob->log("Redis连接失败,10秒后重试"); + $job->release(10); + $this->oQueueJob->flushLog(); + return; + } + + // 获取文章ID + $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; + if (empty($iArticleId)) { + $this->oQueueJob->log("无效的article_id,删除任务"); + $job->delete(); + return; + } + + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; + $sRedisValue = uniqid() . '_' . getmypid(); + $lockExpire = $this->lockExpire; + + $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); + if (!$isLocked) { + $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); + if (in_array($jobStatus, ['completed', 'failed'])) { + $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); + $job->delete(); + } else { + $attempts = $job->attempts(); + if ($attempts >= $this->maxRetries) { + $this->oQueueJob->log("超过最大重试次数,停止重试"); + $job->delete(); + } else { + $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); + $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; + $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); + $job->release($delay); + } + } + $this->oQueueJob->flushLog(); return; } - //任务数组 $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, + 'job_id' => $sRedisKey, + 'job_class' => $sClassName, + 'status' => 0, 'create_time' => time(), - 'params' => empty($data) ? '暂无参数' : json_encode($data, JSON_UNESCAPED_UNICODE) + 'params' => json_encode($data, self::JSON_OPTIONS) ]; - //执行任务 + + $iLogId = $this->oQueueJob->addLog($aParam); + if (!$iLogId) { + $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); + $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); + $job->delete(); + $this->oQueueJob->flushLog(); + return; + } + try { - //添加任务日志 - $sMsg = '查询草稿箱文章发布处理成功'; - $iLogId = $this->addLog($aParam); // 查询状态 $oAiarticle = new Aiarticle; $aResult = json_decode($oAiarticle->queryStatus($data),true); - $iStatus = empty($aResult['status']) ? 0 : $aResult['status']; $sMsg = empty($aResult['msg']) ? '查询草稿箱文章是否发布失败' : $aResult['msg']; //更新任务状态 - $aParam = ['log_id' => $iLogId,'status' => 1,'update_time' => time(),'error' => $sMsg]; - $oQueueJob->updateLog($aParam); - - //删除任务 + $this->oQueueJob->updateLog([ + 'log_id' => $iLogId, + 'status' => 1, + 'update_time' => time(), + 'error' => $sMsg + ]); + + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); $job->delete(); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId}"); - file_put_contents($sLogPath,'-----------Queue job end:'.$sTime.'-----------'); - + } catch (\RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + } catch (\LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); } catch (\Exception $e) { - - // 2. 记录失败日志 - $aParam['status'] = 2; // 标记状态为"失败" - $sMsg = empty($e->getMessage()) ? '任务出错' : $e->getMessage(); // 错误信息 - $aParam['error'] = $sMsg; - $this->addLog($aParam); // 调用日志记录方法 - if ($job->attempts() > $this->tries) { - //如果任务尝试次数超过最大重试次数 - $job->delete(); // 删除任务,不再重试 - } else { - // 3. 如果尝试次数未超过最大重试次数,释放任务回队列 - $job->release(30); // 30秒后重新尝试执行任务 - } - file_put_contents($sLogPath,'-----------Queue job error:'.$sMsg.'-----------'.$sTime); - }finally { - gc_collect_cycles(); // 强制垃圾回收 + $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + } finally { + $executionTime = microtime(true) - $startTime; + $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); + $this->oQueueJob->flushLog(); + gc_collect_cycles(); } } - } \ No newline at end of file diff --git a/application/api/job/createFieldForQueue.php b/application/api/job/createFieldForQueue.php index 734ff92..a35b8b0 100644 --- a/application/api/job/createFieldForQueue.php +++ b/application/api/job/createFieldForQueue.php @@ -1,162 +1,126 @@ queueJob = new QueueJob; - // 初始化日志路径 $this->logPath = ROOT_PATH . 'public/queue_log/createFieldForQueue_' . date('Ymd') . '.log'; - } - - /** - * 安全写入日志(带文件锁) - */ - private function log($message) - { - $time = date('H:i:s'); - $logMsg = "[$time] $message\n"; - $fp = fopen($this->logPath, 'w'); - if ($fp) { - flock($fp, LOCK_EX); // 排他锁防止并发写入冲突 - fwrite($fp, $logMsg); - flock($fp, LOCK_UN); - fclose($fp); - } - } - - // 任务日志添加 - public function addLog($aParam = []) - { - //实例化 - return $this->queueJob->addLog($aParam); - } - - // 任务日志修改 - public function updateLog($aParam = []) - { - return $this->queueJob->updateLog($aParam); - } - - /** - * 根据错误信息获取重试延迟 - */ - private function getRetryDelay($errorMsg) - { - $delayMap = [ - 'MySQL server has gone away' => 60, - 'timeout' => 30, - 'OpenAI' => 45, - 'network' => 60 - ]; - foreach ($delayMap as $keyword => $delay) { - if (strpos($errorMsg, $keyword) !== false) { - return $delay; - } - } - return 10; // 默认延迟 + $this->oQueueJob = new QueueJob; + $this->QueueRedis = QueueRedis::getInstance(); + $this->lastLogTime = time(); + // 确保日志目录存在 + $this->oQueueJob->ensureLogDirExists($this->logPath); } public function fire(Job $job, $data) { - //日志 - $this->log("-----------队列任务开始-----------"); + $startTime = microtime(true); + $this->oQueueJob->log("-----------队列任务开始-----------"); - // 获取文章ID - $iRedisId = empty($data['redis_id']) ? 0 : $data['redis_id']; - if (empty($iRedisId)) { - $this->log("无效的redis_id,删除任务"); - $job->delete(); + // 检查Redis连接状态 + if (!$this->QueueRedis->getConnectionStatus()) { + $this->oQueueJob->log("Redis连接失败,10秒后重试"); + $job->release(10); + $this->oQueueJob->flushLog(); return; } - // 生成唯一任务标识 - $sClassName = get_class($this); - $sRedisKey = "queue_job:{$sClassName}:{$iRedisId}"; - $sRedisValue = uniqid() . '_' . getmypid(); // 增加进程ID确保唯一性 + $iRedisId = empty($data['redis_id']) ? 0 : $data['redis_id']; + $sChunkIndex = empty($data['chunkIndex']) ? 0 : $data['chunkIndex']; + if (empty($iRedisId)) { + $this->oQueueJob->log("无效的redis_id,删除任务"); + $job->delete(); + $this->oQueueJob->flushLog(); + return; + } - // 尝试获取Redis锁(原子操作) - $isLocked = $this->queueJob->setRedisLock($sRedisKey, $sRedisValue, 86400); + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iRedisId}:{$sChunkIndex}"; + $sRedisValue = uniqid() . '_' . getmypid(); + $lockExpire = $this->lockExpire; + + $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); if (!$isLocked) { - $currentValue = $this->queueJob->getRedisValue($sRedisKey); - $this->log("任务已被锁定,避免重复执行 | 锁键: {$sRedisKey} | 锁值: {$currentValue}"); - // 检查任务是否已超过最大重试次数 - if ($job->attempts() >= 2) { - $this->log("任务超过最大重试次数,停止重试"); + $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); + if (in_array($jobStatus, ['completed', 'failed'])) { + $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); $job->delete(); } else { - $delay = $this->getRetryDelay("任务已锁定"); - $this->log("{$delay}秒后重试任务 | 重试次数: {$job->attempts()}"); - $job->release($delay); + $attempts = $job->attempts(); + if ($attempts >= $this->maxRetries) { + $this->oQueueJob->log("超过最大重试次数,停止重试"); + $job->delete(); + } else { + $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); + $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; + $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); + $job->release($delay); + } } + $this->oQueueJob->flushLog(); return; } - // 任务基础信息 $aParam = [ 'job_id' => $sRedisKey, 'job_class' => $sClassName, - 'status' => 0, // 0:处理中 + 'status' => 0, 'create_time' => time(), - 'params' => json_encode($data, JSON_UNESCAPED_UNICODE) + 'params' => json_encode($data, self::JSON_OPTIONS) ]; - // 创建任务日志记录 - $iLogId = $this->addLog($aParam); - if(!$iLogId) { - $this->log("日志创建失败,释放锁并删除任务:".json_encode($data)); - $this->queueJob->releaseRedisLock($sRedisKey, $sRedisValue); + + $iLogId = $this->oQueueJob->addLog($aParam); + if (!$iLogId) { + $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); + $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); $job->delete(); + $this->oQueueJob->flushLog(); return; } + try { - // 执行核心任务 $oOpenAi = new OpenAi; $aResult = json_decode($oOpenAi->createFieldForQueue($data), true); - $iStatus = empty($aResult['status']) ? 0 : $aResult['status']; - $sMsg = empty($aResult['msg']) ? '内容生成失败' : $aResult['msg']; - //更新任务状态 - $aParam = ['log_id' => $iLogId,'status' => 1,'update_time' => time(),'error' => $sMsg]; - $this->updateLog($aParam); - //删除任务 - $job->delete(); - $this->log("任务执行成功,已删除任务 | 日志ID: {$iLogId}"); - } catch (\Exception $e) { - - //错误信息 - $sMsg = empty($e->getMessage()) ? '任务出错' : $e->getMessage(); // 错误信息 - $sTrace = empty($e->getTraceAsString()) ? '' : $e->getTraceAsString(); - $this->log("任务执行异常: {$sMsg} | 堆栈: {$sTrace}"); - // 记录失败日志 - $this->updateLog([ + $sMsg = empty($aResult['msg']) ? '内容生成成功' : $aResult['msg']; + + $this->oQueueJob->updateLog([ 'log_id' => $iLogId, - 'status' => 2, + 'status' => 1, 'update_time' => time(), - 'error' => $sMsg.':'.$sTrace, + 'error' => $sMsg ]); - // 重试策略 - $attempts = $job->attempts(); - if ($attempts >= 2) { - $this->log("任务已重试{$attempts}次,停止重试"); - $job->delete(); - } else { - $delay = $this->getRetryDelay($sMsg); - $this->log("{$delay}秒后重试任务 | 重试次数: {$attempts}"); - $job->release($delay); - } + + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); + $job->delete(); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId}"); + + } catch (\RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + } catch (\LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); + } catch (\Exception $e) { + $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); } finally { - // 无论成功失败都释放锁(确保锁值匹配) - $releaseResult = $this->queueJob->releaseRedisLock($sRedisKey, $sRedisValue); - if (!$releaseResult) { - $this->log("释放锁失败 | 锁键: {$sRedisKey} | 锁值: {$sRedisValue}"); - } else { - $this->log("成功释放锁 | 锁键: {$sRedisKey}"); - } + $executionTime = microtime(true) - $startTime; + $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); + $this->oQueueJob->flushLog(); gc_collect_cycles(); } }