From 783c32ae2ba3db38bfb0db6117854c9559a5c538 Mon Sep 17 00:00:00 2001 From: chengxl Date: Fri, 15 Aug 2025 15:44:13 +0800 Subject: [PATCH] =?UTF-8?q?job=E4=BB=BB=E5=8A=A1=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/job/ArticleAiCreateContent.php | 82 ++++------ application/api/job/RecommendArticleField.php | 123 ++++++--------- application/api/job/RecommendReviewer.php | 119 ++++++-------- application/api/job/RelatedArticle.php | 120 ++++++-------- application/api/job/ReviewerScore.php | 125 ++++++--------- application/api/job/RevisionReviewer.php | 119 ++++++-------- .../api/job/SendRelatedArticleEmail.php | 143 +++++++---------- application/api/job/SendReviewEmail.php | 149 +++++++----------- application/api/job/WechatDraft.php | 117 ++++++-------- application/api/job/WechatDraftPublish.php | 117 ++++++-------- application/api/job/WechatMaterial.php | 118 ++++++-------- application/api/job/WechatQueryStatus.php | 119 ++++++-------- application/api/job/createFieldForQueue.php | 119 ++++++-------- application/api/job/uploadMaterialStep.php | 126 ++++++--------- 14 files changed, 664 insertions(+), 1032 deletions(-) diff --git a/application/api/job/ArticleAiCreateContent.php b/application/api/job/ArticleAiCreateContent.php index 0ad128e..7b16d68 100644 --- a/application/api/job/ArticleAiCreateContent.php +++ b/application/api/job/ArticleAiCreateContent.php @@ -9,10 +9,7 @@ class ArticleAiCreateContent { private $oQueueJob; private $QueueRedis; - private $maxRetries = 2; - private $lockExpire = 1800; private $completedExprie = 3600; - const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; public function __construct() { @@ -22,22 +19,16 @@ class ArticleAiCreateContent public function fire(Job $job, $data) { - $startTime = microtime(true); + //任务开始判断 + $this->oQueueJob->init($job); + + // 获取 Redis 任务的原始数据 + $rawBody = empty($job->getRawBody()) ? '' : $job->getRawBody(); + $jobData = empty($rawBody) ? [] : json_decode($rawBody, true); + $jobId = empty($jobData['id']) ? 'unknown' : $jobData['id']; + $this->oQueueJob->log("-----------队列任务开始-----------"); - - // 检查数据库连接 - if (!$this->oQueueJob->checkDbConnection(true)) { - $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); - $job->release(10); - return; - } - - // 检查Redis连接状态 - if (!$this->QueueRedis->getConnectionStatus()) { - $this->oQueueJob->log("Redis连接失败,10秒后重试"); - $job->release(10); - return; - } + $this->oQueueJob->log("当前任务ID: {$jobId}, 尝试次数: {$job->attempts()}"); // 获取文章ID $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; @@ -46,42 +37,29 @@ class ArticleAiCreateContent $job->delete(); return; } - - $sClassName = get_class($this); - $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; - $sRedisValue = uniqid() . '_' . getmypid(); - $lockExpire = $this->lockExpire; - - $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); - if (!$isLocked) { - $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); - if (in_array($jobStatus, ['completed', 'failed'])) { - $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); - $job->delete(); - } else { - $attempts = $job->attempts(); - if ($attempts >= $this->maxRetries) { - $this->oQueueJob->log("超过最大重试次数,停止重试"); - $job->delete(); - } else { - $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); - $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; - $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); - $job->release($delay); - } - } - return; - } try { - // 执行核心任务前再次检查连接 - $result = $this->oQueueJob->checkDbConnection(); - if (!$result) { - throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + + // 生成Redis键并尝试获取锁 + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; + $sRedisValue = uniqid() . '_' . getmypid(); + if (!$this->oQueueJob->acquireLock($sRedisKey, $sRedisValue, $job)) { + return; // 未获取到锁,已处理 } + //生成内容 $oAiarticle = new Aiarticle; - $aResult = json_decode($oAiarticle->create($data),true); - $sMsg = empty($aResult['msg']) ? '内容生成失败' : $aResult['msg']; + $response = $oAiarticle->create($data); + // 验证API响应 + if (empty($response)) { + throw new RuntimeException("OpenAI API返回空结果"); + } + // 检查JSON解析错误 + $aResult = json_decode($response, true); + if (json_last_error() !== JSON_ERROR_NONE) { + throw new RuntimeException("解析OpenAI响应失败: " . json_last_error_msg() . " | 原始响应: {$response}"); + } + $sMsg = empty($aResult['msg']) ? 'success' : $aResult['msg']; //更新完成标识 $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); $job->delete(); @@ -94,9 +72,7 @@ class ArticleAiCreateContent } catch (\Exception $e) { $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); } finally { - $executionTime = microtime(true) - $startTime; - $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - gc_collect_cycles(); + $this->oQueueJob->finnal(); } } } \ No newline at end of file diff --git a/application/api/job/RecommendArticleField.php b/application/api/job/RecommendArticleField.php index c2c7749..4226db1 100644 --- a/application/api/job/RecommendArticleField.php +++ b/application/api/job/RecommendArticleField.php @@ -9,11 +9,7 @@ class RecommendArticleField { private $oQueueJob; private $QueueRedis; - private $maxRetries = 2; - private $lockExpire = 1800; - private $completedExprie = 3600; - const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; - + private $completedExprie = 3600; // 完成状态过期时间 public function __construct() { $this->oQueueJob = new QueueJob; @@ -22,79 +18,62 @@ class RecommendArticleField public function fire(Job $job, $data) { - $startTime = microtime(true); + //任务开始判断 + $this->oQueueJob->init($job); + + // 获取 Redis 任务的原始数据 + $rawBody = empty($job->getRawBody()) ? '' : $job->getRawBody(); + $jobData = empty($rawBody) ? [] : json_decode($rawBody, true); + $jobId = empty($jobData['id']) ? 'unknown' : $jobData['id']; + $this->oQueueJob->log("-----------队列任务开始-----------"); + $this->oQueueJob->log("当前任务ID: {$jobId}, 尝试次数: {$job->attempts()}"); - // 检查数据库连接 - if (!$this->oQueueJob->checkDbConnection(true)) { - $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); - $job->release(10); - return; - } - - // 检查Redis连接状态 - if (!$this->QueueRedis->getConnectionStatus()) { - $this->oQueueJob->log("Redis连接失败,10秒后重试"); - $job->release(10); - return; - } - - $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; - if (empty($iArticleId)) { - $this->oQueueJob->log("无效的article_id,删除任务"); - $job->delete(); - return; - } - - $sClassName = get_class($this); - $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; - $sRedisValue = uniqid() . '_' . getmypid(); - $lockExpire = $this->lockExpire; - - $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); - if (!$isLocked) { - $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); - if (in_array($jobStatus, ['completed', 'failed'])) { - $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); - $job->delete(); - } else { - $attempts = $job->attempts(); - if ($attempts >= $this->maxRetries) { - $this->oQueueJob->log("超过最大重试次数,停止重试"); - $job->delete(); - } else { - $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); - $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; - $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); - $job->release($delay); - } - } - return; - } try { - // 执行核心任务前再次检查连接 - $result = $this->oQueueJob->checkDbConnection(); - if (!$result) { - throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + + // 验证任务数据完整性 + $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; + if (empty($iArticleId)) { + $this->oQueueJob->log("无效的article_id,删除任务"); + $job->delete(); + return; } - $oArticle = new Article; - $aResult = json_decode($oArticle->getAiField($data), true); - $sMsg = empty($aResult['msg']) ? '内容生成成功' : $aResult['msg']; - //更新完成标识 - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); - $job->delete(); - $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey}|执行日志:{$sMsg}"); - } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e, $sRedisKey, $sRedisValue,$job); - } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e, $sRedisKey, $sRedisValue,$job); - } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e, $sRedisKey, $sRedisValue,$job); + // 生成Redis键并尝试获取锁 + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; + $sRedisValue = uniqid() . '_' . getmypid(); + if (!$this->oQueueJob->acquireLock($sRedisKey, $sRedisValue, $job)) { + return; // 未获取到锁,已处理 + } + + // 执行核心任务 + $oArticle = new Article; + $response = $oArticle->getAiField($data); + // 验证API响应 + if (empty($response)) { + throw new RuntimeException("OpenAI API返回空结果"); + } + // 检查JSON解析错误 + $aResult = json_decode($response, true); + if (json_last_error() !== JSON_ERROR_NONE) { + throw new RuntimeException("解析OpenAI响应失败: " . json_last_error_msg() . " | 原始响应: {$response}"); + } + $sMsg = empty($aResult['msg']) ? 'success' : $aResult['msg']; + + // 更新完成标识 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie, $sRedisValue); + $job->delete(); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); + + } catch (RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (Exception $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); } finally { - $executionTime = microtime(true) - $startTime; - $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - gc_collect_cycles(); + $this->oQueueJob->finnal(); } } } \ No newline at end of file diff --git a/application/api/job/RecommendReviewer.php b/application/api/job/RecommendReviewer.php index 965c384..da8ee74 100644 --- a/application/api/job/RecommendReviewer.php +++ b/application/api/job/RecommendReviewer.php @@ -7,14 +7,9 @@ use app\common\QueueRedis; use app\common\Reviewer; class RecommendReviewer { - private $logPath; private $oQueueJob; private $QueueRedis; - private $maxRetries = 2; - private $lockExpire = 1800; - private $completedExprie = 3600; - const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; - + private $completedExprie = 3600; // 完成状态过期时间 public function __construct() { $this->oQueueJob = new QueueJob; @@ -23,68 +18,49 @@ class RecommendReviewer public function fire(Job $job, $data) { - $startTime = microtime(true); + //任务开始判断 + $this->oQueueJob->init($job); + + // 获取 Redis 任务的原始数据 + $rawBody = empty($job->getRawBody()) ? '' : $job->getRawBody(); + $jobData = empty($rawBody) ? [] : json_decode($rawBody, true); + $jobId = empty($jobData['id']) ? 'unknown' : $jobData['id']; + $this->oQueueJob->log("-----------队列任务开始-----------"); - - // 检查数据库连接 - if (!$this->oQueueJob->checkDbConnection(true)) { - $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); - $job->release(10); - return; - } - - // 检查Redis连接状态 - if (!$this->QueueRedis->getConnectionStatus()) { - $this->oQueueJob->log("Redis连接失败,10秒后重试"); - $job->release(10); - return; - } - - // 获取文章ID - $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; - if (empty($iArticleId)) { - $this->oQueueJob->log("无效的article_id,删除任务"); - $job->delete(); - return; - } - - $sClassName = get_class($this); - $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; - $sRedisValue = uniqid() . '_' . getmypid(); - $lockExpire = $this->lockExpire; - - $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); - if (!$isLocked) { - $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); - if (in_array($jobStatus, ['completed', 'failed'])) { - $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); - $job->delete(); - } else { - $attempts = $job->attempts(); - if ($attempts >= $this->maxRetries) { - $this->oQueueJob->log("超过最大重试次数,停止重试"); - $job->delete(); - } else { - $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); - $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; - $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); - $job->release($delay); - } - } - return; - } + $this->oQueueJob->log("当前任务ID: {$jobId}, 尝试次数: {$job->attempts()}"); try { - - // 执行核心任务前再次检查连接 - $result = $this->oQueueJob->checkDbConnection(); - if (!$result) { - throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + + // 验证任务数据完整性 + $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; + if (empty($iArticleId)) { + $this->oQueueJob->log("无效的article_id,删除任务"); + $job->delete(); + return; } + + // 生成Redis键并尝试获取锁 + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; + $sRedisValue = uniqid() . '_' . getmypid(); + if (!$this->oQueueJob->acquireLock($sRedisKey, $sRedisValue, $job)) { + return; // 未获取到锁,已处理 + } + + // 执行核心任务 //获取推荐审稿人信息 $aParam = ['article_id' => $iArticleId,'page' => 1,'size' => empty($data['size']) ? 5 : $data['size']]; $oReviewer = new Reviewer; - $aResult = json_decode($oReviewer->recommend($aParam),true); + $response = $oReviewer->recommend($aParam); + // 验证API响应 + if (empty($response)) { + throw new RuntimeException("OpenAI API返回空结果"); + } + // 检查JSON解析错误 + $aResult = json_decode($response, true); + if (json_last_error() !== JSON_ERROR_NONE) { + throw new RuntimeException("解析OpenAI响应失败: " . json_last_error_msg() . " | 原始响应: {$response}"); + } $iStatus = empty($aResult['status']) ? 0 : $aResult['status']; $sMsg = empty($aResult['msg']) ? '' : $aResult['msg']; //处理数据 @@ -116,21 +92,20 @@ class RecommendReviewer $sMsg .= empty($aResult['msg']) ? 'Reviewer data insertion failed' : $aResult['msg']; } } - //更新完成标识 - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); + + // 更新完成标识 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie, $sRedisValue); $job->delete(); $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); - } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job); - } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue,$job); - } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job); + } catch (RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (Exception $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); } finally { - $executionTime = microtime(true) - $startTime; - $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - gc_collect_cycles(); + $this->oQueueJob->finnal(); } } } \ No newline at end of file diff --git a/application/api/job/RelatedArticle.php b/application/api/job/RelatedArticle.php index f35b877..59e4c1b 100644 --- a/application/api/job/RelatedArticle.php +++ b/application/api/job/RelatedArticle.php @@ -7,14 +7,9 @@ use app\common\QueueRedis; use app\common\JournalArticle; class RelatedArticle { - private $logPath; private $oQueueJob; private $QueueRedis; - private $maxRetries = 2; - private $lockExpire = 1800; - private $completedExprie = 3600; - const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; - + private $completedExprie = 3600; // 完成状态过期时间 public function __construct() { $this->oQueueJob = new QueueJob; @@ -23,85 +18,62 @@ class RelatedArticle public function fire(Job $job, $data) { - $startTime = microtime(true); + //任务开始判断 + $this->oQueueJob->init($job); + + // 获取 Redis 任务的原始数据 + $rawBody = empty($job->getRawBody()) ? '' : $job->getRawBody(); + $jobData = empty($rawBody) ? [] : json_decode($rawBody, true); + $jobId = empty($jobData['id']) ? 'unknown' : $jobData['id']; + $this->oQueueJob->log("-----------队列任务开始-----------"); + $this->oQueueJob->log("当前任务ID: {$jobId}, 尝试次数: {$job->attempts()}"); - // 检查数据库连接 - if (!$this->oQueueJob->checkDbConnection(true)) { - $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); - $job->release(10); - return; - } - - // 检查Redis连接状态 - if (!$this->QueueRedis->getConnectionStatus()) { - $this->oQueueJob->log("Redis连接失败,10秒后重试"); - $job->release(10); - return; - } - - // 获取文章ID - $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; - if (empty($iArticleId)) { - $this->oQueueJob->log("无效的article_id,删除任务"); - $job->delete(); - return; - } - - $sClassName = get_class($this); - $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; - $sRedisValue = uniqid() . '_' . getmypid(); - $lockExpire = $this->lockExpire; - - $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); - if (!$isLocked) { - $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); - if (in_array($jobStatus, ['completed', 'failed'])) { - $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); - $job->delete(); - } else { - $attempts = $job->attempts(); - if ($attempts >= $this->maxRetries) { - $this->oQueueJob->log("超过最大重试次数,停止重试"); - $job->delete(); - } else { - $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); - $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; - $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); - $job->release($delay); - } - } - return; - } try { - - // 执行核心任务前再次检查连接 - $result = $this->oQueueJob->checkDbConnection(); - if (!$result) { - throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + + // 验证任务数据完整性 + $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; + if (empty($iArticleId)) { + $this->oQueueJob->log("无效的article_id,删除任务"); + $job->delete(); + return; } - //查询文章所关联的文章 + // 生成Redis键并尝试获取锁 + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; + $sRedisValue = uniqid() . '_' . getmypid(); + if (!$this->oQueueJob->acquireLock($sRedisKey, $sRedisValue, $job)) { + return; // 未获取到锁,已处理 + } + + // 执行核心任务 $oJournalArticle = new JournalArticle; - $aResult = json_decode(JournalArticle::get($data),true); - $iStatus = empty($aResult['status']) ? 0 : $aResult['status']; + $response = JournalArticle::get($data); + // 验证API响应 + if (empty($response)) { + throw new RuntimeException("OpenAI API返回空结果"); + } + // 检查JSON解析错误 + $aResult = json_decode($response, true); + if (json_last_error() !== JSON_ERROR_NONE) { + throw new RuntimeException("解析OpenAI响应失败: " . json_last_error_msg() . " | 原始响应: {$response}"); + } $sMsg = empty($aResult['msg']) ? '获取相关文章信息失败' : $aResult['msg']; - - //更新日志 - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); + + // 更新完成标识 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie, $sRedisValue); $job->delete(); $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); - } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job); - } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue,$job); - } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job); + } catch (RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (Exception $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); } finally { - $executionTime = microtime(true) - $startTime; - $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - gc_collect_cycles(); + $this->oQueueJob->finnal(); } } } \ No newline at end of file diff --git a/application/api/job/ReviewerScore.php b/application/api/job/ReviewerScore.php index 5d672d0..cbd28ee 100644 --- a/application/api/job/ReviewerScore.php +++ b/application/api/job/ReviewerScore.php @@ -7,14 +7,9 @@ use app\common\QueueRedis; use app\common\Reviewer; class ReviewerScore { - private $logPath; private $oQueueJob; private $QueueRedis; - private $maxRetries = 2; - private $lockExpire = 1800; - private $completedExprie = 3600; - const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; - + private $completedExprie = 3600; // 完成状态过期时间 public function __construct() { $this->oQueueJob = new QueueJob; @@ -23,87 +18,67 @@ class ReviewerScore public function fire(Job $job, $data) { - $startTime = microtime(true); + //任务开始判断 + $this->oQueueJob->init($job); + + // 获取 Redis 任务的原始数据 + $rawBody = empty($job->getRawBody()) ? '' : $job->getRawBody(); + $jobData = empty($rawBody) ? [] : json_decode($rawBody, true); + $jobId = empty($jobData['id']) ? 'unknown' : $jobData['id']; + $this->oQueueJob->log("-----------队列任务开始-----------"); - - // 检查数据库连接 - if (!$this->oQueueJob->checkDbConnection(true)) { - $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); - $job->release(10); - return; - } - - // 检查Redis连接状态 - if (!$this->QueueRedis->getConnectionStatus()) { - $this->oQueueJob->log("Redis连接失败,10秒后重试"); - $job->release(10); - return; - } - - // 获取文章ID - $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; - //审稿人ID - $iReviewerId = empty($data['reviewer_id']) ? 0 : $data['reviewer_id']; - //主键ID - $iArtRevId = empty($data['art_rev_id']) ? 0 : $data['art_rev_id']; - if (empty($iArticleId)) { - $this->oQueueJob->log("无效的article_id,删除任务"); - $job->delete(); - return; - } - // 生成唯一任务标识 - $sClassName = get_class($this); - $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}:{$iReviewerId}:{$iArtRevId}"; - $sRedisValue = uniqid() . '_' . getmypid(); - $lockExpire = $this->lockExpire; - - $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); - if (!$isLocked) { - $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); - if (in_array($jobStatus, ['completed', 'failed'])) { - $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); - $job->delete(); - } else { - $attempts = $job->attempts(); - if ($attempts >= $this->maxRetries) { - $this->oQueueJob->log("超过最大重试次数,停止重试"); - $job->delete(); - } else { - $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); - $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; - $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); - $job->release($delay); - } - } - return; - } + $this->oQueueJob->log("当前任务ID: {$jobId}, 尝试次数: {$job->attempts()}"); try { - - // 执行核心任务前再次检查连接 - $result = $this->oQueueJob->checkDbConnection(); - if (!$result) { - throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + + // 获取文章ID + $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; + //审稿人ID + $iReviewerId = empty($data['reviewer_id']) ? 0 : $data['reviewer_id']; + //主键ID + $iArtRevId = empty($data['art_rev_id']) ? 0 : $data['art_rev_id']; + if (empty($iArticleId)) { + $this->oQueueJob->log("无效的article_id,删除任务"); + $job->delete(); + return; } + // 生成Redis键并尝试获取锁 + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}:{$iReviewerId}:{$iArtRevId}"; + $sRedisValue = uniqid() . '_' . getmypid(); + if (!$this->oQueueJob->acquireLock($sRedisKey, $sRedisValue, $job)) { + return; // 未获取到锁,已处理 + } + + // 执行核心任务 $aParam = ['article_id' => $iArticleId,'reviewer_id' => $iReviewerId,'art_rev_id' => $iArtRevId]; $oReviewer = new Reviewer; - $aResult = json_decode($oReviewer->score($aParam),true); + $response = $oReviewer->score($aParam); + // 验证API响应 + if (empty($response)) { + throw new RuntimeException("OpenAI API返回空结果"); + } + // 检查JSON解析错误 + $aResult = json_decode($response, true); + if (json_last_error() !== JSON_ERROR_NONE) { + throw new RuntimeException("解析OpenAI响应失败: " . json_last_error_msg() . " | 原始响应: {$response}"); + } $sMsg = empty($aResult['msg']) ? '给审稿人评分处理失败' : $aResult['msg']; - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); + + // 更新完成标识 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie, $sRedisValue); $job->delete(); $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); - } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job); - } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue,$job); - } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job); + } catch (RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (Exception $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); } finally { - $executionTime = microtime(true) - $startTime; - $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - gc_collect_cycles(); + $this->oQueueJob->finnal(); } } } \ No newline at end of file diff --git a/application/api/job/RevisionReviewer.php b/application/api/job/RevisionReviewer.php index 93529d7..ecc1e52 100644 --- a/application/api/job/RevisionReviewer.php +++ b/application/api/job/RevisionReviewer.php @@ -7,14 +7,9 @@ use app\common\QueueRedis; use app\common\Reviewer; class RevisionReviewer { - private $logPath; private $oQueueJob; private $QueueRedis; - private $maxRetries = 2; - private $lockExpire = 1800; - private $completedExprie = 3600; - const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; - + private $completedExprie = 3600; // 完成状态过期时间 public function __construct() { $this->oQueueJob = new QueueJob; @@ -23,84 +18,64 @@ class RevisionReviewer public function fire(Job $job, $data) { - $startTime = microtime(true); + //任务开始判断 + $this->oQueueJob->init($job); + + // 获取 Redis 任务的原始数据 + $rawBody = empty($job->getRawBody()) ? '' : $job->getRawBody(); + $jobData = empty($rawBody) ? [] : json_decode($rawBody, true); + $jobId = empty($jobData['id']) ? 'unknown' : $jobData['id']; + $this->oQueueJob->log("-----------队列任务开始-----------"); + $this->oQueueJob->log("当前任务ID: {$jobId}, 尝试次数: {$job->attempts()}"); - // 检查数据库连接 - if (!$this->oQueueJob->checkDbConnection(true)) { - $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); - $job->release(10); - return; - } - - // 检查Redis连接状态 - if (!$this->QueueRedis->getConnectionStatus()) { - $this->oQueueJob->log("Redis连接失败,10秒后重试"); - $job->release(10); - return; - } - - // 获取文章ID - $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; - if (empty($iArticleId)) { - $this->oQueueJob->log("无效的article_id,删除任务"); - $job->delete(); - return; - } - // 生成唯一任务标识 - $sClassName = get_class($this); - $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; - $sRedisValue = uniqid() . '_' . getmypid(); - $lockExpire = $this->lockExpire; - - $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); - if (!$isLocked) { - $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); - if (in_array($jobStatus, ['completed', 'failed'])) { - $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); - $job->delete(); - } else { - $attempts = $job->attempts(); - if ($attempts >= $this->maxRetries) { - $this->oQueueJob->log("超过最大重试次数,停止重试"); - $job->delete(); - } else { - $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); - $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; - $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); - $job->release($delay); - } - } - return; - } try { - - // 执行核心任务前再次检查连接 - $result = $this->oQueueJob->checkDbConnection(); - if (!$result) { - throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + + // 验证任务数据完整性 + $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; + if (empty($iArticleId)) { + $this->oQueueJob->log("无效的article_id,删除任务"); + $job->delete(); + return; } + + // 生成Redis键并尝试获取锁 + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; + $sRedisValue = uniqid() . '_' . getmypid(); + if (!$this->oQueueJob->acquireLock($sRedisKey, $sRedisValue, $job)) { + return; // 未获取到锁,已处理 + } + + // 执行核心任务 //获取符合条件的文章审稿人信息 $aParam = ['article_id' => $iArticleId]; $oReviewer = new Reviewer; - $aResult = json_decode($oReviewer->revisionForReviewer($aParam),true); + $response = $oReviewer->revisionForReviewer($aParam); + // 验证API响应 + if (empty($response)) { + throw new RuntimeException("OpenAI API返回空结果"); + } + // 检查JSON解析错误 + $aResult = json_decode($response, true); + if (json_last_error() !== JSON_ERROR_NONE) { + throw new RuntimeException("解析OpenAI响应失败: " . json_last_error_msg() . " | 原始响应: {$response}"); + } $sMsg = empty($aResult['msg']) ? '审稿人同意审稿但超时未审的数据失败' : $aResult['msg']; - - //更新日志 - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); + + // 更新完成标识 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie, $sRedisValue); $job->delete(); $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); - } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job); - } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue,$job); - } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job); + } catch (RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (Exception $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); } finally { - $executionTime = microtime(true) - $startTime; - $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - gc_collect_cycles(); + $this->oQueueJob->finnal(); } } } \ No newline at end of file diff --git a/application/api/job/SendRelatedArticleEmail.php b/application/api/job/SendRelatedArticleEmail.php index 90a68e0..813001d 100644 --- a/application/api/job/SendRelatedArticleEmail.php +++ b/application/api/job/SendRelatedArticleEmail.php @@ -7,14 +7,9 @@ use app\common\QueueRedis; use app\common\JournalArticle; class SendRelatedArticleEmail { - private $logPath; private $oQueueJob; private $QueueRedis; - private $maxRetries = 2; - private $lockExpire = 1800; - private $completedExprie = 3600; - const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; - + private $completedExprie = 3600; // 完成状态过期时间 public function __construct() { $this->oQueueJob = new QueueJob; @@ -23,83 +18,57 @@ class SendRelatedArticleEmail public function fire(Job $job, $data) { - $startTime = microtime(true); + //任务开始判断 + $this->oQueueJob->init($job); + + // 获取 Redis 任务的原始数据 + $rawBody = empty($job->getRawBody()) ? '' : $job->getRawBody(); + $jobData = empty($rawBody) ? [] : json_decode($rawBody, true); + $jobId = empty($jobData['id']) ? 'unknown' : $jobData['id']; + $this->oQueueJob->log("-----------队列任务开始-----------"); + $this->oQueueJob->log("当前任务ID: {$jobId}, 尝试次数: {$job->attempts()}"); - // 检查数据库连接 - if (!$this->oQueueJob->checkDbConnection(true)) { - $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); - $job->release(10); - return; - } - - // 检查Redis连接状态 - if (!$this->QueueRedis->getConnectionStatus()) { - $this->oQueueJob->log("Redis连接失败,10秒后重试"); - $job->release(10); - return; - } - - // 获取文章ID - $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; - //作者邮箱 - $email = empty($data['email']) ? '' : $data['email']; - //邮件主题 - $title = empty($data['title']) ? '' : $data['title']; - //发送来源 - $from_name = empty($data['from_name']) ? '' : $data['from_name']; - //邮件内容 - $content = empty($data['content']) ? '' : $data['content']; - //邮箱 - $memail = empty($data['memail']) ? '' : $data['memail']; - //密码 - $mpassword = empty($data['mpassword']) ? '' : $data['mpassword']; - //文章作者 - $article_author_id = empty($data['article_author_id']) ? 0 : $data['article_author_id']; - //关联文章ID - $related_article_id = empty($data['related_article_id']) ? 0 : $data['related_article_id']; - //期刊ID - $journal_id = empty($data['journal_id']) ? '' : $data['journal_id']; - //期刊issn - $journal_issn = empty($data['journal_issn']) ? '' : $data['journal_issn']; - if (empty($iArticleId) || empty($email)) { - $this->oQueueJob->log("无效的article_id,删除任务"); - $job->delete(); - return; - } - // 生成唯一任务标识 - $sClassName = get_class($this); - $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}:{$related_article_id}:{$article_author_id}:{$email}"; - $sRedisValue = uniqid() . '_' . getmypid(); - $lockExpire = $this->lockExpire; - - $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); - if (!$isLocked) { - $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); - if (in_array($jobStatus, ['completed', 'failed'])) { - $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); - $job->delete(); - } else { - $attempts = $job->attempts(); - if ($attempts >= $this->maxRetries) { - $this->oQueueJob->log("超过最大重试次数,停止重试"); - $job->delete(); - } else { - $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); - $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; - $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); - $job->release($delay); - } - } - return; - } try { - - // 执行核心任务前再次检查连接 - $result = $this->oQueueJob->checkDbConnection(); - if (!$result) { - throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + + // 获取文章ID + $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; + //作者邮箱 + $email = empty($data['email']) ? '' : $data['email']; + //邮件主题 + $title = empty($data['title']) ? '' : $data['title']; + //发送来源 + $from_name = empty($data['from_name']) ? '' : $data['from_name']; + //邮件内容 + $content = empty($data['content']) ? '' : $data['content']; + //邮箱 + $memail = empty($data['memail']) ? '' : $data['memail']; + //密码 + $mpassword = empty($data['mpassword']) ? '' : $data['mpassword']; + //文章作者 + $article_author_id = empty($data['article_author_id']) ? 0 : $data['article_author_id']; + //关联文章ID + $related_article_id = empty($data['related_article_id']) ? 0 : $data['related_article_id']; + //期刊ID + $journal_id = empty($data['journal_id']) ? '' : $data['journal_id']; + //期刊issn + $journal_issn = empty($data['journal_issn']) ? '' : $data['journal_issn']; + if (empty($iArticleId) || empty($email)) { + $this->oQueueJob->log("无效的article_id,删除任务"); + $job->delete(); + return; } + + // 生成Redis键并尝试获取锁 + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}:{$related_article_id}:{$article_author_id}:{$email}"; + $sRedisValue = uniqid() . '_' . getmypid(); + if (!$this->oQueueJob->acquireLock($sRedisKey, $sRedisValue, $job)) { + return; // 未获取到锁,已处理 + } + + // 执行核心任务 + $oArticle = new Article; //查询是否发送过邮件 $oJournalArticle = new JournalArticle; $aLog = json_decode($oJournalArticle::getLog(['article_id' => $iArticleId,'article_author_id' => $article_author_id,'related_article_id' => $related_article_id,'is_success' => 1]),true); @@ -119,22 +88,20 @@ class SendRelatedArticleEmail //添加邮件发送日志 $iId = JournalArticle::addLog($aEmailLog); } - - //更新日志 - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); + + // 更新完成标识 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie, $sRedisValue); $job->delete(); $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); - } catch (\RuntimeException $e) { + } catch (RuntimeException $e) { $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); - } catch (\LogicException $e) { + } catch (LogicException $e) { $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue, $job); - } catch (\Exception $e) { + } catch (Exception $e) { $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); } finally { - $executionTime = microtime(true) - $startTime; - $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - gc_collect_cycles(); + $this->oQueueJob->finnal(); } } } \ No newline at end of file diff --git a/application/api/job/SendReviewEmail.php b/application/api/job/SendReviewEmail.php index 11a1bb2..41220a4 100644 --- a/application/api/job/SendReviewEmail.php +++ b/application/api/job/SendReviewEmail.php @@ -7,14 +7,9 @@ use app\common\QueueRedis; use app\common\Reviewer; class SendReviewEmail { - private $logPath; private $oQueueJob; private $QueueRedis; - private $maxRetries = 2; - private $lockExpire = 1800; - private $completedExprie = 3600; - const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; - + private $completedExprie = 3600; // 完成状态过期时间 public function __construct() { $this->oQueueJob = new QueueJob; @@ -23,82 +18,55 @@ class SendReviewEmail public function fire(Job $job, $data) { - $startTime = microtime(true); + //任务开始判断 + $this->oQueueJob->init($job); + + // 获取 Redis 任务的原始数据 + $rawBody = empty($job->getRawBody()) ? '' : $job->getRawBody(); + $jobData = empty($rawBody) ? [] : json_decode($rawBody, true); + $jobId = empty($jobData['id']) ? 'unknown' : $jobData['id']; + $this->oQueueJob->log("-----------队列任务开始-----------"); + $this->oQueueJob->log("当前任务ID: {$jobId}, 尝试次数: {$job->attempts()}"); - // 检查数据库连接 - if (!$this->oQueueJob->checkDbConnection(true)) { - $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); - $job->release(10); - return; - } - - // 检查Redis连接状态 - if (!$this->QueueRedis->getConnectionStatus()) { - $this->oQueueJob->log("Redis连接失败,10秒后重试"); - $job->release(10); - return; - } - - // 获取文章ID - $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; - //作者邮箱 - $email = empty($data['email']) ? '' : $data['email']; - //邮件主题 - $title = empty($data['title']) ? '' : $data['title']; - //发送来源 - $from_name = empty($data['from_name']) ? '' : $data['from_name']; - //邮件内容 - $content = empty($data['content']) ? '' : $data['content']; - //邮箱 - $memail = empty($data['memail']) ? '' : $data['memail']; - //密码 - $mpassword = empty($data['mpassword']) ? '' : $data['mpassword']; - //审稿记录表主键ID - $art_rev_id = empty($data['art_rev_id']) ? 0 : $data['art_rev_id']; - //审稿人ID - $reviewer_id = empty($data['reviewer_id']) ? 0 : $data['reviewer_id']; - //邮件类型 - $type = empty($data['type']) ? 1 : $data['type']; - if (empty($iArticleId) || empty($email)) { - $this->oQueueJob->log("无效的article_id/email,删除任务"); - $job->delete(); - return; - } - - // 生成唯一任务标识 - $sClassName = get_class($this); - $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}:{$art_rev_id}:{$reviewer_id}:{$email}"; - $sRedisValue = uniqid() . '_' . getmypid(); - $lockExpire = $this->lockExpire; - - $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); - if (!$isLocked) { - $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); - if (in_array($jobStatus, ['completed', 'failed'])) { - $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); - $job->delete(); - } else { - $attempts = $job->attempts(); - if ($attempts >= $this->maxRetries) { - $this->oQueueJob->log("超过最大重试次数,停止重试"); - $job->delete(); - } else { - $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); - $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; - $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); - $job->release($delay); - } - } - return; - } try { - - // 执行核心任务前再次检查连接 - $result = $this->oQueueJob->checkDbConnection(); - if (!$result) { - throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + + // 验证任务数据完整性 + // 获取文章ID + $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; + //作者邮箱 + $email = empty($data['email']) ? '' : $data['email']; + //邮件主题 + $title = empty($data['title']) ? '' : $data['title']; + //发送来源 + $from_name = empty($data['from_name']) ? '' : $data['from_name']; + //邮件内容 + $content = empty($data['content']) ? '' : $data['content']; + //邮箱 + $memail = empty($data['memail']) ? '' : $data['memail']; + //密码 + $mpassword = empty($data['mpassword']) ? '' : $data['mpassword']; + //审稿记录表主键ID + $art_rev_id = empty($data['art_rev_id']) ? 0 : $data['art_rev_id']; + //审稿人ID + $reviewer_id = empty($data['reviewer_id']) ? 0 : $data['reviewer_id']; + //邮件类型 + $type = empty($data['type']) ? 1 : $data['type']; + if (empty($iArticleId) || empty($email)) { + $this->oQueueJob->log("无效的article_id/email,删除任务"); + $job->delete(); + return; } + + // 生成唯一任务标识 + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}:{$art_rev_id}:{$reviewer_id}:{$email}"; + $sRedisValue = uniqid() . '_' . getmypid(); + if (!$this->oQueueJob->acquireLock($sRedisKey, $sRedisValue, $job)) { + return; // 未获取到锁,已处理 + } + + // 执行核心任务 //查询是否发送过邮件 $oReviewer = new Reviewer; if($type != 3){ @@ -120,21 +88,20 @@ class SendReviewEmail //添加邮件发送日志 $iId = $oReviewer->addLog($aEmailLog); } - //更新日志 - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); - $job->delete(); - $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); - } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job); - } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue,$job); - } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job); + // 更新完成标识 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie, $sRedisValue); + $job->delete(); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); + + } catch (RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (Exception $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); } finally { - $executionTime = microtime(true) - $startTime; - $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - gc_collect_cycles(); + $this->oQueueJob->finnal(); } } } \ No newline at end of file diff --git a/application/api/job/WechatDraft.php b/application/api/job/WechatDraft.php index 031caf0..fe2de53 100644 --- a/application/api/job/WechatDraft.php +++ b/application/api/job/WechatDraft.php @@ -7,14 +7,9 @@ use app\common\QueueRedis; use app\api\controller\Aiarticle; class WechatDraft { - private $logPath; private $oQueueJob; private $QueueRedis; - private $maxRetries = 2; - private $lockExpire = 1800; - private $completedExprie = 3600; - const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; - + private $completedExprie = 3600; // 完成状态过期时间 public function __construct() { $this->oQueueJob = new QueueJob; @@ -23,84 +18,62 @@ class WechatDraft public function fire(Job $job, $data) { - $startTime = microtime(true); + //任务开始判断 + $this->oQueueJob->init($job); + + // 获取 Redis 任务的原始数据 + $rawBody = empty($job->getRawBody()) ? '' : $job->getRawBody(); + $jobData = empty($rawBody) ? [] : json_decode($rawBody, true); + $jobId = empty($jobData['id']) ? 'unknown' : $jobData['id']; + $this->oQueueJob->log("-----------队列任务开始-----------"); + $this->oQueueJob->log("当前任务ID: {$jobId}, 尝试次数: {$job->attempts()}"); - // 检查数据库连接 - if (!$this->oQueueJob->checkDbConnection(true)) { - $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); - $job->release(10); - return; - } - - // 检查Redis连接状态 - if (!$this->QueueRedis->getConnectionStatus()) { - $this->oQueueJob->log("Redis连接失败,10秒后重试"); - $job->release(10); - return; - } - - // 获取文章ID - $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; - if (empty($iArticleId)) { - $this->oQueueJob->log("无效的article_id,删除任务"); - $job->delete(); - return; - } - - $sClassName = get_class($this); - $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; - $sRedisValue = uniqid() . '_' . getmypid(); - $lockExpire = $this->lockExpire; - - $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); - if (!$isLocked) { - $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); - if (in_array($jobStatus, ['completed', 'failed'])) { - $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); - $job->delete(); - } else { - $attempts = $job->attempts(); - if ($attempts >= $this->maxRetries) { - $this->oQueueJob->log("超过最大重试次数,停止重试"); - $job->delete(); - } else { - $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); - $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; - $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); - $job->release($delay); - } - } - return; - } try { - - // 执行核心任务前再次检查连接 - $result = $this->oQueueJob->checkDbConnection(); - if (!$result) { - throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + + // 验证任务数据完整性 + $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; + if (empty($iArticleId)) { + $this->oQueueJob->log("无效的article_id,删除任务"); + $job->delete(); + return; } - //上传草稿箱 + // 生成Redis键并尝试获取锁 + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; + $sRedisValue = uniqid() . '_' . getmypid(); + if (!$this->oQueueJob->acquireLock($sRedisKey, $sRedisValue, $job)) { + return; // 未获取到锁,已处理 + } + + // 执行核心任务 $oAiarticle = new Aiarticle; - $aResult = json_decode($oAiarticle->syncWechat($data),true); + $response = $oAiarticle->syncWechat($data); + // 验证API响应 + if (empty($response)) { + throw new RuntimeException("OpenAI API返回空结果"); + } + // 检查JSON解析错误 + $aResult = json_decode($response, true); + if (json_last_error() !== JSON_ERROR_NONE) { + throw new RuntimeException("解析OpenAI响应失败: " . json_last_error_msg() . " | 原始响应: {$response}"); + } $sMsg = empty($aResult['msg']) ? '上传草稿箱失败' : $aResult['msg']; - //更新日志 - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); + // 更新完成标识 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie, $sRedisValue); $job->delete(); $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); - } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job); - } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisKey,$job); - } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job); + } catch (RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (Exception $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); } finally { - $executionTime = microtime(true) - $startTime; - $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - gc_collect_cycles(); + $this->oQueueJob->finnal(); } } } \ No newline at end of file diff --git a/application/api/job/WechatDraftPublish.php b/application/api/job/WechatDraftPublish.php index 93ef526..f71563e 100644 --- a/application/api/job/WechatDraftPublish.php +++ b/application/api/job/WechatDraftPublish.php @@ -7,14 +7,9 @@ use app\common\QueueRedis; use app\api\controller\Aiarticle; class WechatDraftPublish { - private $logPath; private $oQueueJob; private $QueueRedis; - private $maxRetries = 2; - private $lockExpire = 1800; - private $completedExprie = 3600; - const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; - + private $completedExprie = 3600; // 完成状态过期时间 public function __construct() { $this->oQueueJob = new QueueJob; @@ -23,84 +18,62 @@ class WechatDraftPublish public function fire(Job $job, $data) { - $startTime = microtime(true); + //任务开始判断 + $this->oQueueJob->init($job); + + // 获取 Redis 任务的原始数据 + $rawBody = empty($job->getRawBody()) ? '' : $job->getRawBody(); + $jobData = empty($rawBody) ? [] : json_decode($rawBody, true); + $jobId = empty($jobData['id']) ? 'unknown' : $jobData['id']; + $this->oQueueJob->log("-----------队列任务开始-----------"); + $this->oQueueJob->log("当前任务ID: {$jobId}, 尝试次数: {$job->attempts()}"); - // 检查数据库连接 - if (!$this->oQueueJob->checkDbConnection(true)) { - $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); - $job->release(10); - return; - } - - // 检查Redis连接状态 - if (!$this->QueueRedis->getConnectionStatus()) { - $this->oQueueJob->log("Redis连接失败,10秒后重试"); - $job->release(10); - return; - } - - // 获取文章ID - $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; - if (empty($iArticleId)) { - $this->oQueueJob->log("无效的article_id,删除任务"); - $job->delete(); - return; - } - - $sClassName = get_class($this); - $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; - $sRedisValue = uniqid() . '_' . getmypid(); - $lockExpire = $this->lockExpire; - - $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); - if (!$isLocked) { - $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); - if (in_array($jobStatus, ['completed', 'failed'])) { - $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); - $job->delete(); - } else { - $attempts = $job->attempts(); - if ($attempts >= $this->maxRetries) { - $this->oQueueJob->log("超过最大重试次数,停止重试"); - $job->delete(); - } else { - $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); - $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; - $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); - $job->release($delay); - } - } - return; - } try { - - // 执行核心任务前再次检查连接 - $result = $this->oQueueJob->checkDbConnection(); - if (!$result) { - throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + + // 验证任务数据完整性 + $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; + if (empty($iArticleId)) { + $this->oQueueJob->log("无效的article_id,删除任务"); + $job->delete(); + return; } - //发布草稿箱 + // 生成Redis键并尝试获取锁 + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; + $sRedisValue = uniqid() . '_' . getmypid(); + if (!$this->oQueueJob->acquireLock($sRedisKey, $sRedisValue, $job)) { + return; // 未获取到锁,已处理 + } + + // 执行核心任务 $oAiarticle = new Aiarticle; - $aResult = json_decode($oAiarticle->publishDraft($data),true); + $response = $oAiarticle->publishDraft($data); + // 验证API响应 + if (empty($response)) { + throw new RuntimeException("OpenAI API返回空结果"); + } + // 检查JSON解析错误 + $aResult = json_decode($response, true); + if (json_last_error() !== JSON_ERROR_NONE) { + throw new RuntimeException("解析OpenAI响应失败: " . json_last_error_msg() . " | 原始响应: {$response}"); + } $sMsg = empty($aResult['msg']) ? '草稿箱发布失败' : $aResult['msg']; - //更新日志 - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); + // 更新完成标识 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie, $sRedisValue); $job->delete(); $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); - } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job); - } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisKey,$job); - } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job); + } catch (RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (Exception $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); } finally { - $executionTime = microtime(true) - $startTime; - $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - gc_collect_cycles(); + $this->oQueueJob->finnal(); } } } \ No newline at end of file diff --git a/application/api/job/WechatMaterial.php b/application/api/job/WechatMaterial.php index 2a2cfcf..90a3e5d 100644 --- a/application/api/job/WechatMaterial.php +++ b/application/api/job/WechatMaterial.php @@ -7,14 +7,9 @@ use app\common\QueueRedis; use app\api\controller\Aiarticle; class WechatMaterial { - private $logPath; private $oQueueJob; private $QueueRedis; - private $maxRetries = 2; - private $lockExpire = 1800; - private $completedExprie = 3600; - const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; - + private $completedExprie = 3600; // 完成状态过期时间 public function __construct() { $this->oQueueJob = new QueueJob; @@ -23,85 +18,62 @@ class WechatMaterial public function fire(Job $job, $data) { - $startTime = microtime(true); + //任务开始判断 + $this->oQueueJob->init($job); + + // 获取 Redis 任务的原始数据 + $rawBody = empty($job->getRawBody()) ? '' : $job->getRawBody(); + $jobData = empty($rawBody) ? [] : json_decode($rawBody, true); + $jobId = empty($jobData['id']) ? 'unknown' : $jobData['id']; + $this->oQueueJob->log("-----------队列任务开始-----------"); + $this->oQueueJob->log("当前任务ID: {$jobId}, 尝试次数: {$job->attempts()}"); - // 检查数据库连接 - if (!$this->oQueueJob->checkDbConnection(true)) { - $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); - $job->release(10); - return; - } - - // 检查Redis连接状态 - if (!$this->QueueRedis->getConnectionStatus()) { - $this->oQueueJob->log("Redis连接失败,10秒后重试"); - $job->release(10); - return; - } - - // 获取文章ID - $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; - if (empty($iArticleId)) { - $this->oQueueJob->log("无效的article_id,删除任务"); - $job->delete(); - return; - } - - $sClassName = get_class($this); - $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; - $sRedisValue = uniqid() . '_' . getmypid(); - $lockExpire = $this->lockExpire; - - $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); - if (!$isLocked) { - $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); - if (in_array($jobStatus, ['completed', 'failed'])) { - $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); - $job->delete(); - } else { - $attempts = $job->attempts(); - if ($attempts >= $this->maxRetries) { - $this->oQueueJob->log("超过最大重试次数,停止重试"); - $job->delete(); - } else { - $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); - $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; - $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); - $job->release($delay); - } - } - return; - } try { - - - // 执行核心任务前再次检查连接 - $result = $this->oQueueJob->checkDbConnection(); - if (!$result) { - throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + + // 验证任务数据完整性 + $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; + if (empty($iArticleId)) { + $this->oQueueJob->log("无效的article_id,删除任务"); + $job->delete(); + return; } - //上传素材 + // 生成Redis键并尝试获取锁 + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; + $sRedisValue = uniqid() . '_' . getmypid(); + if (!$this->oQueueJob->acquireLock($sRedisKey, $sRedisValue, $job)) { + return; // 未获取到锁,已处理 + } + + // 执行核心任务 $oAiarticle = new Aiarticle; - $aResult = json_decode($oAiarticle->uploadMaterial($data),true); + $response = $oAiarticle->uploadMaterial($data); + // 验证API响应 + if (empty($response)) { + throw new RuntimeException("OpenAI API返回空结果"); + } + // 检查JSON解析错误 + $aResult = json_decode($response, true); + if (json_last_error() !== JSON_ERROR_NONE) { + throw new RuntimeException("解析OpenAI响应失败: " . json_last_error_msg() . " | 原始响应: {$response}"); + } $sMsg = empty($aResult['msg']) ? '上传素材失败' : $aResult['msg']; - //更新日志 - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); + // 更新完成标识 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie, $sRedisValue); $job->delete(); $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); - } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job); - } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisKey,$job); - } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job); + } catch (RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (Exception $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); } finally { - $executionTime = microtime(true) - $startTime; - $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - gc_collect_cycles(); + $this->oQueueJob->finnal(); } } } \ No newline at end of file diff --git a/application/api/job/WechatQueryStatus.php b/application/api/job/WechatQueryStatus.php index 77d67f9..ba1291b 100644 --- a/application/api/job/WechatQueryStatus.php +++ b/application/api/job/WechatQueryStatus.php @@ -7,14 +7,9 @@ use app\common\QueueRedis; use app\api\controller\Aiarticle; class WechatQueryStatus { - private $logPath; private $oQueueJob; private $QueueRedis; - private $maxRetries = 2; - private $lockExpire = 1800; - private $completedExprie = 3600; - const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; - + private $completedExprie = 3600; // 完成状态过期时间 public function __construct() { $this->oQueueJob = new QueueJob; @@ -23,86 +18,62 @@ class WechatQueryStatus public function fire(Job $job, $data) { - $startTime = microtime(true); + //任务开始判断 + $this->oQueueJob->init($job); + + // 获取 Redis 任务的原始数据 + $rawBody = empty($job->getRawBody()) ? '' : $job->getRawBody(); + $jobData = empty($rawBody) ? [] : json_decode($rawBody, true); + $jobId = empty($jobData['id']) ? 'unknown' : $jobData['id']; + $this->oQueueJob->log("-----------队列任务开始-----------"); + $this->oQueueJob->log("当前任务ID: {$jobId}, 尝试次数: {$job->attempts()}"); - // 检查数据库连接 - if (!$this->oQueueJob->checkDbConnection(true)) { - $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); - $job->release(10); - return; - } - - // 检查Redis连接状态 - if (!$this->QueueRedis->getConnectionStatus()) { - $this->oQueueJob->log("Redis连接失败,10秒后重试"); - $job->release(10); - return; - } - - // 获取文章ID - $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; - if (empty($iArticleId)) { - $this->oQueueJob->log("无效的article_id,删除任务"); - $job->delete(); - return; - } - - $sClassName = get_class($this); - $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; - $sRedisValue = uniqid() . '_' . getmypid(); - $lockExpire = $this->lockExpire; - - $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); - if (!$isLocked) { - $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); - if (in_array($jobStatus, ['completed', 'failed'])) { - $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); - $job->delete(); - } else { - $attempts = $job->attempts(); - if ($attempts >= $this->maxRetries) { - $this->oQueueJob->log("超过最大重试次数,停止重试"); - $job->delete(); - } else { - $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); - $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; - $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); - $job->release($delay); - } - } - return; - } try { - - - // 执行核心任务前再次检查连接 - $result = $this->oQueueJob->checkDbConnection(); - if (!$result) { - throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + + // 验证任务数据完整性 + $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; + if (empty($iArticleId)) { + $this->oQueueJob->log("无效的article_id,删除任务"); + $job->delete(); + return; } - // 查询状态 + // 生成Redis键并尝试获取锁 + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; + $sRedisValue = uniqid() . '_' . getmypid(); + if (!$this->oQueueJob->acquireLock($sRedisKey, $sRedisValue, $job)) { + return; // 未获取到锁,已处理 + } + + // 执行核心任务 $oAiarticle = new Aiarticle; - $aResult = json_decode($oAiarticle->queryStatus($data),true); + $response = $oAiarticle->queryStatus($data); + // 验证API响应 + if (empty($response)) { + throw new RuntimeException("OpenAI API返回空结果"); + } + // 检查JSON解析错误 + $aResult = json_decode($response, true); + if (json_last_error() !== JSON_ERROR_NONE) { + throw new RuntimeException("解析OpenAI响应失败: " . json_last_error_msg() . " | 原始响应: {$response}"); + } $sMsg = empty($aResult['msg']) ? '查询草稿箱文章是否发布失败' : $aResult['msg']; - - //更新日志 - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); + // 更新完成标识 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie, $sRedisValue); $job->delete(); $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); - } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job); - } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisKey,$job); - } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job); + } catch (RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (Exception $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); } finally { - $executionTime = microtime(true) - $startTime; - $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - gc_collect_cycles(); + $this->oQueueJob->finnal(); } } } \ No newline at end of file diff --git a/application/api/job/createFieldForQueue.php b/application/api/job/createFieldForQueue.php index 1c38e7b..5458880 100644 --- a/application/api/job/createFieldForQueue.php +++ b/application/api/job/createFieldForQueue.php @@ -6,15 +6,11 @@ use app\common\OpenAi; use app\common\QueueJob; use app\common\QueueRedis; -class createFieldForQueue +class CreateFieldForQueue { private $oQueueJob; private $QueueRedis; - private $maxRetries = 2; - private $lockExpire = 1800; - private $completedExprie = 3600; - const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; - + private $completedExprie = 3600; // 完成状态过期时间 public function __construct() { $this->oQueueJob = new QueueJob; @@ -23,80 +19,63 @@ class createFieldForQueue public function fire(Job $job, $data) { - $startTime = microtime(true); + //任务开始判断 + $this->oQueueJob->init($job); + + // 获取 Redis 任务的原始数据 + $rawBody = empty($job->getRawBody()) ? '' : $job->getRawBody(); + $jobData = empty($rawBody) ? [] : json_decode($rawBody, true); + $jobId = empty($jobData['id']) ? 'unknown' : $jobData['id']; + $this->oQueueJob->log("-----------队列任务开始-----------"); - - // 检查数据库连接 - if (!$this->oQueueJob->checkDbConnection(true)) { - $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); - $job->release(10); - return; - } - // 检查Redis连接状态 - if (!$this->QueueRedis->getConnectionStatus()) { - $this->oQueueJob->log("Redis连接失败,10秒后重试"); - $job->release(10); - return; - } - - $iRedisId = empty($data['redis_id']) ? 0 : $data['redis_id']; - $sChunkIndex = empty($data['chunkIndex']) ? 0 : $data['chunkIndex']; - if (empty($iRedisId)) { - $this->oQueueJob->log("无效的redis_id,删除任务"); - $job->delete(); - return; - } - - $sClassName = get_class($this); - $sRedisKey = "queue_job:{$sClassName}:{$iRedisId}:{$sChunkIndex}"; - $sRedisValue = uniqid() . '_' . getmypid(); - $lockExpire = $this->lockExpire; - - $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); - if (!$isLocked) { - $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); - if (in_array($jobStatus, ['completed', 'failed'])) { - $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); - $job->delete(); - } else { - $attempts = $job->attempts(); - if ($attempts >= $this->maxRetries) { - $this->oQueueJob->log("超过最大重试次数,停止重试"); - $job->delete(); - } else { - $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); - $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; - $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); - $job->release($delay); - } - } - return; - } + $this->oQueueJob->log("当前任务ID: {$jobId}, 尝试次数: {$job->attempts()}"); try { - // 执行核心任务前再次检查连接 - $result = $this->oQueueJob->checkDbConnection(); - if (!$result) { - throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + + // 验证任务数据完整性 + $iRedisId = empty($data['redis_id']) ? 0 : $data['redis_id']; + $sChunkIndex = empty($data['chunkIndex']) ? 0 : $data['chunkIndex']; + if (empty($iRedisId)) { + $this->oQueueJob->log("无效的redis_id,删除任务"); + $job->delete(); + return; } + + // 生成Redis键并尝试获取锁 + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iRedisId}:{$sChunkIndex}"; + $sRedisValue = uniqid() . '_' . getmypid(); + if (!$this->oQueueJob->acquireLock($sRedisKey, $sRedisValue, $job)) { + return; // 未获取到锁,已处理 + } + + // 执行核心任务 $oOpenAi = new OpenAi; - $aResult = json_decode($oOpenAi->createFieldForQueue($data), true); + $response = $oOpenAi->createFieldForQueue($data); + // 验证API响应 + if (empty($response)) { + throw new RuntimeException("OpenAI API返回空结果"); + } + // 检查JSON解析错误 + $aResult = json_decode($response, true); + if (json_last_error() !== JSON_ERROR_NONE) { + throw new RuntimeException("解析OpenAI响应失败: " . json_last_error_msg() . " | 原始响应: {$response}"); + } $sMsg = empty($aResult['msg']) ? '内容生成成功' : $aResult['msg']; - //更新完成标识 - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); + + // 更新完成标识 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie, $sRedisValue); $job->delete(); $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); - } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e, $sRedisKey,$sRedisValue,$job); - } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e, $sRedisKey,$sRedisValue,$job); - } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e, $sRedisKey,$sRedisValue,$job); + } catch (RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (Exception $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); } finally { - $executionTime = microtime(true) - $startTime; - $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - gc_collect_cycles(); + $this->oQueueJob->finnal(); } } } \ No newline at end of file diff --git a/application/api/job/uploadMaterialStep.php b/application/api/job/uploadMaterialStep.php index 025d0da..98bbba9 100644 --- a/application/api/job/uploadMaterialStep.php +++ b/application/api/job/uploadMaterialStep.php @@ -2,19 +2,14 @@ namespace app\api\job; use think\queue\Job; -use app\common\Material; use app\common\QueueJob; use app\common\QueueRedis; - +use app\common\Material; class uploadMaterialStep { private $oQueueJob; private $QueueRedis; - private $maxRetries = 2; - private $lockExpire = 1800; - private $completedExprie = 3600; - const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; - + private $completedExprie = 3600; // 完成状态过期时间 public function __construct() { $this->oQueueJob = new QueueJob; @@ -23,83 +18,66 @@ class uploadMaterialStep public function fire(Job $job, $data) { - $startTime = microtime(true); + //任务开始判断 + $this->oQueueJob->init($job); + + // 获取 Redis 任务的原始数据 + $rawBody = empty($job->getRawBody()) ? '' : $job->getRawBody(); + $jobData = empty($rawBody) ? [] : json_decode($rawBody, true); + $jobId = empty($jobData['id']) ? 'unknown' : $jobData['id']; + $this->oQueueJob->log("-----------队列任务开始-----------"); - - // 检查数据库连接 - if (!$this->oQueueJob->checkDbConnection(true)) { - $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); - $job->release(10); - return; - } - // 检查Redis连接状态 - if (!$this->QueueRedis->getConnectionStatus()) { - $this->oQueueJob->log("Redis连接失败,10秒后重试"); - $job->release(10); - return; - } - - $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; - if(empty($iArticleId)){ - $iArticleId = empty($data['journal_id']) ? 0 : $data['journal_id']; - } - $sChunkIndex = empty($data['chunkIndex']) ? 0 : $data['chunkIndex']; - if (empty($iArticleId)) { - $this->oQueueJob->log("无效的article_id/journal_id,删除任务"); - $job->delete(); - return; - } - - $sClassName = get_class($this); - $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}:{$sChunkIndex}"; - $sRedisValue = uniqid() . '_' . getmypid(); - $lockExpire = $this->lockExpire; - - $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); - if (!$isLocked) { - $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); - if (in_array($jobStatus, ['completed', 'failed'])) { - $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); - $job->delete(); - } else { - $attempts = $job->attempts(); - if ($attempts >= $this->maxRetries) { - $this->oQueueJob->log("超过最大重试次数,停止重试"); - $job->delete(); - } else { - $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); - $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; - $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); - $job->release($delay); - } - } - return; - } + $this->oQueueJob->log("当前任务ID: {$jobId}, 尝试次数: {$job->attempts()}"); try { - // 执行核心任务前再次检查连接 - $result = $this->oQueueJob->checkDbConnection(); - if (!$result) { - throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + + // 验证任务数据完整性 + $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; + if(empty($iArticleId)){ + $iArticleId = empty($data['journal_id']) ? 0 : $data['journal_id']; } + $sChunkIndex = empty($data['chunkIndex']) ? 0 : $data['chunkIndex']; + if (empty($iArticleId)) { + $this->oQueueJob->log("无效的article_id/journal_id,删除任务"); + $job->delete(); + return; + } + + // 生成Redis键并尝试获取锁 + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}:{$sChunkIndex}"; + $sRedisValue = uniqid() . '_' . getmypid(); + if (!$this->oQueueJob->acquireLock($sRedisKey, $sRedisValue, $job)) { + return; // 未获取到锁,已处理 + } + + // 执行核心任务 $oMaterial = new Material; - $aResult = json_decode($oMaterial->uploadMaterialStep($data), true); + $response = $oMaterial->uploadMaterialStep($data); + // 验证API响应 + if (empty($response)) { + throw new RuntimeException("OpenAI API返回空结果"); + } + // 检查JSON解析错误 + $aResult = json_decode($response, true); + if (json_last_error() !== JSON_ERROR_NONE) { + throw new RuntimeException("解析OpenAI响应失败: " . json_last_error_msg() . " | 原始响应: {$response}"); + } $sMsg = empty($aResult['msg']) ? '素材上传失败' : $aResult['msg']; - //更新完成标识 - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); + + // 更新完成标识 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie, $sRedisValue); $job->delete(); $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); - } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e, $sRedisKey,$sRedisValue,$job); - } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e, $sRedisKey,$sRedisValue,$job); - } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e, $sRedisKey,$sRedisValue,$job); + } catch (RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (Exception $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); } finally { - $executionTime = microtime(true) - $startTime; - $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - gc_collect_cycles(); + $this->oQueueJob->finnal(); } } } \ No newline at end of file