Files
tougao/application/api/job/CitationRelevanceQueue.php
wangjinlei d92df3e103 1
2026-04-09 11:04:11 +08:00

91 lines
3.5 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
namespace app\api\job;
use think\queue\Job;
use app\common\QueueJob;
use app\common\QueueRedis;
use app\api\controller\References;
class CitationRelevanceQueue
{
private $oQueueJob;
private $QueueRedis;
private $lockExpire = 900; // 单条参考文献鉴别锁 15 分钟
private $completedExprie = 86400; // 结果保留 1 天
public function __construct()
{
$this->oQueueJob = new QueueJob;
$this->QueueRedis = QueueRedis::getInstance();
}
public function fire(Job $job, $data)
{
// 通用初始化(进程超时 / Redis / DB 自检)
$this->oQueueJob->init($job);
// 校验参数
$pReferId = empty($data['p_refer_id']) ? 0 : intval($data['p_refer_id']);
if (empty($pReferId)) {
$this->oQueueJob->log("无效的 p_refer_id删除任务");
$job->delete();
return;
}
// 生成 Redis 键:一个参考文献只允许一个队列在跑
$sClassName = get_class($this);
$sRedisKey = "queue_job:{$sClassName}:{$pReferId}";
$sRedisValue = uniqid() . '_' . getmypid();
// 加锁并标记为 processing幂等控制
$isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $this->lockExpire);
if (!$isLocked) {
$jobStatus = $this->QueueRedis->getJobStatus($sRedisKey);
if (in_array($jobStatus, ['completed', 'failed'])) {
$this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}");
$job->delete();
} else {
$attempts = $job->attempts();
if ($attempts >= 3) {
$this->oQueueJob->log("超过最大重试次数,停止重试");
$job->delete();
} else {
$lockTtl = $this->QueueRedis->getLockTtl($sRedisKey);
$delay = $lockTtl > 0 ? $lockTtl + 5 : 30;
$this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/3)");
$job->release($delay);
}
}
return;
}
try {
// 实际调用 References 控制器里的同步方法,拿到完整 JSON 结果
$ctrl = new References();
$resp = $ctrl->checkCitationRelevance(['p_refer_id' => $pReferId]);
$decoded = is_string($resp) ? json_decode($resp, true) : $resp;
if (!is_array($decoded)) {
$decoded = ['status' => 2, 'msg' => 'Unexpected response from checkCitationRelevance', 'raw' => $resp];
}
// 结果写入 Redis供 HTTP 轮询读取
$this->QueueRedis->setRedisValue($sRedisKey . ':result', json_encode($decoded, JSON_UNESCAPED_UNICODE));
// 标记完成并释放锁
$this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie, $sRedisValue);
$job->delete();
$this->oQueueJob->log("CitationRelevanceQueue 任务执行成功 | p_refer_id: {$pReferId}");
} catch (\RuntimeException $e) {
$this->oQueueJob->handleRetryableException($e, $sRedisKey, $sRedisValue, $job);
} catch (\LogicException $e) {
$this->oQueueJob->handleNonRetryableException($e, $sRedisKey, $sRedisValue, $job);
} catch (\Exception $e) {
$this->oQueueJob->handleRetryableException($e, $sRedisKey, $sRedisValue, $job);
} finally {
$this->oQueueJob->finnal();
}
}
}