oQueueJob = new QueueJob; $this->QueueRedis = QueueRedis::getInstance(); } public function fire(Job $job, $data) { // 通用初始化(进程超时 / Redis / DB 自检) $this->oQueueJob->init($job); // 校验参数 $pReferId = empty($data['p_refer_id']) ? 0 : intval($data['p_refer_id']); if (empty($pReferId)) { $this->oQueueJob->log("无效的 p_refer_id,删除任务"); $job->delete(); return; } // 生成 Redis 键:一个参考文献只允许一个队列在跑 $sClassName = get_class($this); $sRedisKey = "queue_job:{$sClassName}:{$pReferId}"; $sRedisValue = uniqid() . '_' . getmypid(); // 加锁并标记为 processing(幂等控制) $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $this->lockExpire); if (!$isLocked) { $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); if (in_array($jobStatus, ['completed', 'failed'])) { $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); $job->delete(); } else { $attempts = $job->attempts(); if ($attempts >= 3) { $this->oQueueJob->log("超过最大重试次数,停止重试"); $job->delete(); } else { $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/3)"); $job->release($delay); } } return; } try { // 实际调用 References 控制器里的同步方法,拿到完整 JSON 结果 $ctrl = new References(); $resp = $ctrl->checkCitationRelevance(['p_refer_id' => $pReferId]); $decoded = is_string($resp) ? json_decode($resp, true) : $resp; if (!is_array($decoded)) { $decoded = ['status' => 2, 'msg' => 'Unexpected response from checkCitationRelevance', 'raw' => $resp]; } // 结果写入 Redis,供 HTTP 轮询读取 $this->QueueRedis->setRedisValue($sRedisKey . ':result', json_encode($decoded, JSON_UNESCAPED_UNICODE)); // 标记完成并释放锁 $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie, $sRedisValue); $job->delete(); $this->oQueueJob->log("CitationRelevanceQueue 任务执行成功 | p_refer_id: {$pReferId}"); } catch (\RuntimeException $e) { $this->oQueueJob->handleRetryableException($e, $sRedisKey, $sRedisValue, $job); } catch (\LogicException $e) { $this->oQueueJob->handleNonRetryableException($e, $sRedisKey, $sRedisValue, $job); } catch (\Exception $e) { $this->oQueueJob->handleRetryableException($e, $sRedisKey, $sRedisValue, $job); } finally { $this->oQueueJob->finnal(); } } }