From 80a113116ce525749807e6ab063cf229d15a32aa Mon Sep 17 00:00:00 2001 From: chengxl Date: Wed, 6 Aug 2025 17:27:26 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E9=98=9F=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/api/job/uploadMaterialStep.php | 105 +++++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 application/api/job/uploadMaterialStep.php diff --git a/application/api/job/uploadMaterialStep.php b/application/api/job/uploadMaterialStep.php new file mode 100644 index 0000000..025d0da --- /dev/null +++ b/application/api/job/uploadMaterialStep.php @@ -0,0 +1,105 @@ +oQueueJob = new QueueJob; + $this->QueueRedis = QueueRedis::getInstance(); + } + + public function fire(Job $job, $data) + { + $startTime = microtime(true); + $this->oQueueJob->log("-----------队列任务开始-----------"); + + // 检查数据库连接 + if (!$this->oQueueJob->checkDbConnection(true)) { + $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); + $job->release(10); + return; + } + // 检查Redis连接状态 + if (!$this->QueueRedis->getConnectionStatus()) { + $this->oQueueJob->log("Redis连接失败,10秒后重试"); + $job->release(10); + return; + } + + $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; + if(empty($iArticleId)){ + $iArticleId = empty($data['journal_id']) ? 0 : $data['journal_id']; + } + $sChunkIndex = empty($data['chunkIndex']) ? 0 : $data['chunkIndex']; + if (empty($iArticleId)) { + $this->oQueueJob->log("无效的article_id/journal_id,删除任务"); + $job->delete(); + return; + } + + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}:{$sChunkIndex}"; + $sRedisValue = uniqid() . '_' . getmypid(); + $lockExpire = $this->lockExpire; + + $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); + if (!$isLocked) { + $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); + if (in_array($jobStatus, ['completed', 'failed'])) { + $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); + $job->delete(); + } else { + $attempts = $job->attempts(); + if ($attempts >= $this->maxRetries) { + $this->oQueueJob->log("超过最大重试次数,停止重试"); + $job->delete(); + } else { + $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); + $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; + $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); + $job->release($delay); + } + } + return; + } + + try { + // 执行核心任务前再次检查连接 + $result = $this->oQueueJob->checkDbConnection(); + if (!$result) { + throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + } + $oMaterial = new Material; + $aResult = json_decode($oMaterial->uploadMaterialStep($data), true); + $sMsg = empty($aResult['msg']) ? '素材上传失败' : $aResult['msg']; + //更新完成标识 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); + $job->delete(); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); + + } catch (\RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e, $sRedisKey,$sRedisValue,$job); + } catch (\LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e, $sRedisKey,$sRedisValue,$job); + } catch (\Exception $e) { + $this->oQueueJob->handleRetryableException($e, $sRedisKey,$sRedisValue,$job); + } finally { + $executionTime = microtime(true) - $startTime; + $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); + gc_collect_cycles(); + } + } +} \ No newline at end of file