diff --git a/application/api/job/WechatDraftPublish.php b/application/api/job/WechatDraftPublish.php index 60b280e..93ef526 100644 --- a/application/api/job/WechatDraftPublish.php +++ b/application/api/job/WechatDraftPublish.php @@ -11,21 +11,14 @@ class WechatDraftPublish private $oQueueJob; private $QueueRedis; private $maxRetries = 2; - private $logBuffer = []; - private $lastLogTime = 0; - private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配 private $lockExpire = 1800; private $completedExprie = 3600; const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; public function __construct() { - $this->logPath = ROOT_PATH . 'public/queue_log/WechatDraftPublish_' . date('Ymd') . '.log'; $this->oQueueJob = new QueueJob; $this->QueueRedis = QueueRedis::getInstance(); - $this->lastLogTime = time(); - // 确保日志目录存在 - $this->oQueueJob->ensureLogDirExists($this->logPath); } public function fire(Job $job, $data) @@ -33,11 +26,17 @@ class WechatDraftPublish $startTime = microtime(true); $this->oQueueJob->log("-----------队列任务开始-----------"); + // 检查数据库连接 + if (!$this->oQueueJob->checkDbConnection(true)) { + $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); + $job->release(10); + return; + } + // 检查Redis连接状态 if (!$this->QueueRedis->getConnectionStatus()) { $this->oQueueJob->log("Redis连接失败,10秒后重试"); $job->release(10); - $this->oQueueJob->flushLog(); return; } @@ -72,56 +71,35 @@ class WechatDraftPublish $job->release($delay); } } - $this->oQueueJob->flushLog(); return; } - - $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, - 'create_time' => time(), - 'params' => json_encode($data, self::JSON_OPTIONS) - ]; - - $iLogId = $this->oQueueJob->addLog($aParam); - if (!$iLogId) { - $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); - $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); - $job->delete(); - $this->oQueueJob->flushLog(); - return; - } - try { + // 执行核心任务前再次检查连接 + $result = $this->oQueueJob->checkDbConnection(); + if (!$result) { + throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + } + //发布草稿箱 $oAiarticle = new Aiarticle; $aResult = json_decode($oAiarticle->publishDraft($data),true); $sMsg = empty($aResult['msg']) ? '草稿箱发布失败' : $aResult['msg']; - //更新任务状态 - $this->oQueueJob->updateLog([ - 'log_id' => $iLogId, - 'status' => 1, - 'update_time' => time(), - 'error' => $sMsg - ]); - - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); + //更新日志 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); $job->delete(); - $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}"); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job); } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisKey,$job); } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job); } finally { $executionTime = microtime(true) - $startTime; $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - $this->oQueueJob->flushLog(); gc_collect_cycles(); } } diff --git a/application/api/job/WechatMaterial.php b/application/api/job/WechatMaterial.php index caa098d..2a2cfcf 100644 --- a/application/api/job/WechatMaterial.php +++ b/application/api/job/WechatMaterial.php @@ -11,21 +11,14 @@ class WechatMaterial private $oQueueJob; private $QueueRedis; private $maxRetries = 2; - private $logBuffer = []; - private $lastLogTime = 0; - private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配 private $lockExpire = 1800; private $completedExprie = 3600; const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; public function __construct() { - $this->logPath = ROOT_PATH . 'public/queue_log/WechatMaterial_' . date('Ymd') . '.log'; $this->oQueueJob = new QueueJob; $this->QueueRedis = QueueRedis::getInstance(); - $this->lastLogTime = time(); - // 确保日志目录存在 - $this->oQueueJob->ensureLogDirExists($this->logPath); } public function fire(Job $job, $data) @@ -33,11 +26,17 @@ class WechatMaterial $startTime = microtime(true); $this->oQueueJob->log("-----------队列任务开始-----------"); + // 检查数据库连接 + if (!$this->oQueueJob->checkDbConnection(true)) { + $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); + $job->release(10); + return; + } + // 检查Redis连接状态 if (!$this->QueueRedis->getConnectionStatus()) { $this->oQueueJob->log("Redis连接失败,10秒后重试"); $job->release(10); - $this->oQueueJob->flushLog(); return; } @@ -72,56 +71,36 @@ class WechatMaterial $job->release($delay); } } - $this->oQueueJob->flushLog(); return; } - - $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, - 'create_time' => time(), - 'params' => json_encode($data, self::JSON_OPTIONS) - ]; - - $iLogId = $this->oQueueJob->addLog($aParam); - if (!$iLogId) { - $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); - $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); - $job->delete(); - $this->oQueueJob->flushLog(); - return; - } - try { + + // 执行核心任务前再次检查连接 + $result = $this->oQueueJob->checkDbConnection(); + if (!$result) { + throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + } + //上传素材 $oAiarticle = new Aiarticle; $aResult = json_decode($oAiarticle->uploadMaterial($data),true); $sMsg = empty($aResult['msg']) ? '上传素材失败' : $aResult['msg']; - //更新任务状态 - $this->oQueueJob->updateLog([ - 'log_id' => $iLogId, - 'status' => 1, - 'update_time' => time(), - 'error' => $sMsg - ]); - - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); + //更新日志 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); $job->delete(); - $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}"); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job); } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisKey,$job); } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job); } finally { $executionTime = microtime(true) - $startTime; $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - $this->oQueueJob->flushLog(); gc_collect_cycles(); } } diff --git a/application/api/job/WechatQueryStatus.php b/application/api/job/WechatQueryStatus.php index aec968f..77d67f9 100644 --- a/application/api/job/WechatQueryStatus.php +++ b/application/api/job/WechatQueryStatus.php @@ -11,21 +11,14 @@ class WechatQueryStatus private $oQueueJob; private $QueueRedis; private $maxRetries = 2; - private $logBuffer = []; - private $lastLogTime = 0; - private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配 private $lockExpire = 1800; private $completedExprie = 3600; const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; public function __construct() { - $this->logPath = ROOT_PATH . 'public/queue_log/WechatQueryStatus_' . date('Ymd') . '.log'; $this->oQueueJob = new QueueJob; $this->QueueRedis = QueueRedis::getInstance(); - $this->lastLogTime = time(); - // 确保日志目录存在 - $this->oQueueJob->ensureLogDirExists($this->logPath); } public function fire(Job $job, $data) @@ -33,11 +26,17 @@ class WechatQueryStatus $startTime = microtime(true); $this->oQueueJob->log("-----------队列任务开始-----------"); + // 检查数据库连接 + if (!$this->oQueueJob->checkDbConnection(true)) { + $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); + $job->release(10); + return; + } + // 检查Redis连接状态 if (!$this->QueueRedis->getConnectionStatus()) { $this->oQueueJob->log("Redis连接失败,10秒后重试"); $job->release(10); - $this->oQueueJob->flushLog(); return; } @@ -72,56 +71,37 @@ class WechatQueryStatus $job->release($delay); } } - $this->oQueueJob->flushLog(); return; } - - $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, - 'create_time' => time(), - 'params' => json_encode($data, self::JSON_OPTIONS) - ]; - - $iLogId = $this->oQueueJob->addLog($aParam); - if (!$iLogId) { - $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); - $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); - $job->delete(); - $this->oQueueJob->flushLog(); - return; - } - try { + + // 执行核心任务前再次检查连接 + $result = $this->oQueueJob->checkDbConnection(); + if (!$result) { + throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + } + // 查询状态 $oAiarticle = new Aiarticle; $aResult = json_decode($oAiarticle->queryStatus($data),true); $sMsg = empty($aResult['msg']) ? '查询草稿箱文章是否发布失败' : $aResult['msg']; - //更新任务状态 - $this->oQueueJob->updateLog([ - 'log_id' => $iLogId, - 'status' => 1, - 'update_time' => time(), - 'error' => $sMsg - ]); - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); + //更新日志 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); $job->delete(); - $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}"); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job); } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisKey,$job); } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job); } finally { $executionTime = microtime(true) - $startTime; $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - $this->oQueueJob->flushLog(); gc_collect_cycles(); } }