oQueueJob = new QueueJob; $this->QueueRedis = QueueRedis::getInstance(); } public function fire(Job $job, $data) { $startTime = microtime(true); $this->oQueueJob->log("-----------队列任务开始-----------"); // 检查数据库连接 if (!$this->oQueueJob->checkDbConnection(true)) { $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); $job->release(10); return; } // 检查Redis连接状态 if (!$this->QueueRedis->getConnectionStatus()) { $this->oQueueJob->log("Redis连接失败,10秒后重试"); $job->release(10); return; } // 获取文章ID $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; if (empty($iArticleId)) { $this->oQueueJob->log("无效的article_id,删除任务"); $job->delete(); return; } $sClassName = get_class($this); $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; $sRedisValue = uniqid() . '_' . getmypid(); $lockExpire = $this->lockExpire; $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); if (!$isLocked) { $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); if (in_array($jobStatus, ['completed', 'failed'])) { $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); $job->delete(); } else { $attempts = $job->attempts(); if ($attempts >= $this->maxRetries) { $this->oQueueJob->log("超过最大重试次数,停止重试"); $job->delete(); } else { $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); $job->release($delay); } } return; } try { // 执行核心任务前再次检查连接 $result = $this->oQueueJob->checkDbConnection(); if (!$result) { throw new \RuntimeException("数据库连接异常,无法执行核心任务"); } //获取推荐审稿人信息 $aParam = ['article_id' => $iArticleId,'page' => 1,'size' => empty($data['size']) ? 5 : $data['size']]; $oReviewer = new Reviewer; $aResult = json_decode($oReviewer->recommend($aParam),true); $iStatus = empty($aResult['status']) ? 0 : $aResult['status']; $sMsg = empty($aResult['msg']) ? '' : $aResult['msg']; //处理数据 $iCount = empty($aResult['data']['total']) ? 0 : $aResult['data']['total'];//数量 $iSize = empty($aResult['data']['size']) ? 0 : $aResult['data']['size'];//推荐数量 //判断是否给期刊管理者发邮件【数据库的审稿数量小于推荐数量】 if($iCount < $iSize){ $aSendEmailResult = json_decode($oReviewer->emailForEditor($aParam),true); $sMsg .= ';'; $sMsg .= empty($aSendEmailResult['msg']) ? '发送邮件入队成功' : $aSendEmailResult['msg']; } //推荐审稿人数据 $aResult = empty($aResult['data']['lists']) ? [] : $aResult['data']['lists']; if(empty($aResult)){ $sMsg .= ';'; $sMsg .= 'No qualified reviewers were selected'; } if(!empty($aResult)){ $aParam = ['article_id' => $iArticleId,'reviewer_id' => array_column($aResult, 'reviewer_id')]; $aResult = json_decode($oReviewer->add($aParam),true); $iStatus = empty($aResult['status']) ? 0 : $aResult['status']; $sMsg .= ';'; $sMsg .= empty($aResult['msg']) ? 'Reviewer data insertion failed' : $aResult['msg']; //发送邀请审稿人审稿邮件 if($iStatus == 1){ $aResult = json_decode($oReviewer->email($aParam),true); $iStatus = empty($aResult['status']) ? 0 : $aResult['status']; $sMsg .= ';'; $sMsg .= empty($aResult['msg']) ? 'Reviewer data insertion failed' : $aResult['msg']; } } //更新完成标识 $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); $job->delete(); $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); } catch (\RuntimeException $e) { $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job); } catch (\LogicException $e) { $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue,$job); } catch (\Exception $e) { $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job); } finally { $executionTime = microtime(true) - $startTime; $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); gc_collect_cycles(); } } }