From f5993ca9e58870cb73921fdd0d9df1d0e1b27c6c Mon Sep 17 00:00:00 2001 From: chengxl Date: Thu, 25 Sep 2025 17:14:26 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=9C=E8=80=85=E9=82=AE=E7=AE=B1=E4=BF=AE?= =?UTF-8?q?=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/api/job/SyncArticleData.php | 78 ++++++++++++++++++++++ application/api/job/SyncArticleUpload.php | 79 +++++++++++++++++++++++ 2 files changed, 157 insertions(+) create mode 100644 application/api/job/SyncArticleData.php create mode 100644 application/api/job/SyncArticleUpload.php diff --git a/application/api/job/SyncArticleData.php b/application/api/job/SyncArticleData.php new file mode 100644 index 0000000..41cf313 --- /dev/null +++ b/application/api/job/SyncArticleData.php @@ -0,0 +1,78 @@ +oQueueJob = new QueueJob; + $this->QueueRedis = QueueRedis::getInstance(); + } + + public function fire(Job $job, $data) + { + //任务开始判断 + $this->oQueueJob->init($job); + + // 获取 Redis 任务的原始数据 + $rawBody = empty($job->getRawBody()) ? '' : $job->getRawBody(); + $jobData = empty($rawBody) ? [] : json_decode($rawBody, true); + $jobId = empty($jobData['id']) ? 'unknown' : $jobData['id']; + + $this->oQueueJob->log("-----------队列任务开始-----------"); + $this->oQueueJob->log("当前任务ID: {$jobId}, 尝试次数: {$job->attempts()}"); + + // 获取文章ID + $iJournalStageId = empty($data['journal_stage_id']) ? 0 : $data['journal_stage_id']; + if (empty($iJournalStageId)) { + $this->oQueueJob->log("无效的journal_stage_id,删除任务"); + $job->delete(); + return; + } + try { + + // 生成Redis键并尝试获取锁 + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iJournalStageId}"; + $sRedisValue = uniqid() . '_' . getmypid(); + if (!$this->oQueueJob->acquireLock($sRedisKey, $sRedisValue, $job)) { + return; // 未获取到锁,已处理 + } + + //生成内容 + $oAireview = new Syncdata; + $response = $oAireview->getJournalStageArticle($data); + // 验证API响应 + if (empty($response)) { + throw new \RuntimeException("OpenAI API返回空结果"); + } + // 检查JSON解析错误 + $aResult = json_decode($response, true); + if (json_last_error() !== JSON_ERROR_NONE) { + throw new \RuntimeException("解析OpenAI响应失败: " . json_last_error_msg() . " | 原始响应: {$response}"); + } + $sMsg = empty($aResult['msg']) ? 'success' : $aResult['msg']; + //更新完成标识 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); + $job->delete(); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); + + } catch (\RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (\LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (\Exception $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); + } finally { + $this->oQueueJob->finnal(); + } + } +} \ No newline at end of file diff --git a/application/api/job/SyncArticleUpload.php b/application/api/job/SyncArticleUpload.php new file mode 100644 index 0000000..d6c4b8a --- /dev/null +++ b/application/api/job/SyncArticleUpload.php @@ -0,0 +1,79 @@ +oQueueJob = new QueueJob; + $this->QueueRedis = QueueRedis::getInstance(); + } + + public function fire(Job $job, $data) + { + //任务开始判断 + $this->oQueueJob->init($job); + + // 获取 Redis 任务的原始数据 + $rawBody = empty($job->getRawBody()) ? '' : $job->getRawBody(); + $jobData = empty($rawBody) ? [] : json_decode($rawBody, true); + $jobId = empty($jobData['id']) ? 'unknown' : $jobData['id']; + + $this->oQueueJob->log("-----------队列任务开始-----------"); + $this->oQueueJob->log("当前任务ID: {$jobId}, 尝试次数: {$job->attempts()}"); + + // 获取文章ID + $iJournalStageId = empty($data['journal_stage_id']) ? 0 : $data['journal_stage_id']; + if (empty($iJournalStageId)) { + $this->oQueueJob->log("无效的journal_stage_id,删除任务"); + $job->delete(); + return; + } + try { + + // 生成Redis键并尝试获取锁 + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iJournalStageId}"; + $sRedisValue = uniqid() . '_' . getmypid(); + if (!$this->oQueueJob->acquireLock($sRedisKey, $sRedisValue, $job)) { + return; // 未获取到锁,已处理 + } + + //生成内容 + $oAireview = new Syncdata; + $response = $oAireview->uploadArticle($data); + // 验证API响应 + if (empty($response)) { + throw new \RuntimeException("OpenAI API返回空结果"); + } + // 检查JSON解析错误 + $aResult = json_decode($response, true); + if (json_last_error() !== JSON_ERROR_NONE) { + throw new \RuntimeException("解析OpenAI响应失败: " . json_last_error_msg() . " | 原始响应: {$response}"); + } + $sMsg = empty($aResult['msg']) ? 'success' : $aResult['msg']; + $sData = empty($aResult['data']) ? '' : json_encode($aResult['data']); + //更新完成标识 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); + $job->delete(); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg} | 数据:{$sData}"); + + } catch (\RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (\LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (\Exception $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); + } finally { + $this->oQueueJob->finnal(); + } + } +} \ No newline at end of file