文献校对功能 转rabbitMQ

This commit is contained in:
wyn
2026-06-02 09:57:51 +08:00
parent 94b212fe7c
commit f2294b375c
6 changed files with 250 additions and 431 deletions

View File

@@ -1533,7 +1533,7 @@ class References extends Base
* POST/GET: p_refer_id必填 * POST/GET: p_refer_id必填
* p_article_id可选 * p_article_id可选
* *
* 仅重跑 status=3校对失败的记录不改动 refer_text只重置结果字段后入 ReferenceCheck 队列。 * 仅重跑 status=3校对失败的记录不改动 refer_text只重置结果字段后入 RabbitMQ 批次队列。
* 返回p_refer_id、p_article_id、reset、queued、check_ids、queue * 返回p_refer_id、p_article_id、reset、queued、check_ids、queue
*/ */
public function referenceCheckRecheckFailedAI() public function referenceCheckRecheckFailedAI()

View File

@@ -1,114 +0,0 @@
<?php
namespace app\api\job;
use think\Db;
use think\queue\Job;
use app\common\QueueJob;
use app\common\QueueRedis;
use app\common\ReferenceCheckService;
class ReferenceCheck
{
private $oQueueJob;
private $QueueRedis;
private $completedExprie = 3600;
public function __construct()
{
$this->oQueueJob = new QueueJob();
$this->QueueRedis = QueueRedis::getInstance();
}
public function fire(Job $job, $data)
{
$this->oQueueJob->init($job);
$rawBody = empty($job->getRawBody()) ? '' : $job->getRawBody();
$jobData = empty($rawBody) ? [] : json_decode($rawBody, true);
$jobId = empty($jobData['id']) ? 'unknown' : $jobData['id'];
$sRedisKey = '';
$sRedisValue = '';
$this->oQueueJob->log("-----------队列任务开始-----------");
$this->oQueueJob->log("当前任务ID: {$jobId}, 尝试次数: {$job->attempts()}");
try {
$checkId = intval(isset($data['check_id']) ? $data['check_id'] : 0);
if ($checkId <= 0 && !empty($jobData['data']['check_id'])) {
$checkId = intval($jobData['data']['check_id']);
}
if ($checkId <= 0) {
$job->delete();
return;
}
$row = Db::name('article_reference_check_result')->where('id', $checkId)->find();
if (empty($row)) {
$job->delete();
return;
}
if (intval($row['status']) === ReferenceCheckService::RECORD_COMPLETED) {
$job->delete();
return;
}
$sClassName = get_class($this);
$sRedisKey = "queue_job:{$sClassName}:{$checkId}";
$sRedisValue = uniqid() . '_' . getmypid();
$svc = new ReferenceCheckService();
$svc->clearReferenceCheckQueueLock($checkId);
if (!$this->oQueueJob->acquireLock($sRedisKey, $sRedisValue, $job)) {
return;
}
try {
$svc->runReferenceCheckOnce($checkId);
$amId = intval(isset($row['am_id']) ? $row['am_id'] : 0);
if ($amId > 0) {
$svc->syncAmRefCheckStatus($amId);
}
$this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie, $sRedisValue);
$job->delete();
$this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey}");
} catch (\Exception $e) {
$this->oQueueJob->log('ReferenceCheck error: ' . $e->getMessage());
if ($job->attempts() >= 3) {
$this->markFailed($checkId, $e->getMessage());
$job->delete();
return;
}
$job->release(30);
}
} catch (\RuntimeException $e) {
$this->oQueueJob->handleRetryableException($e, $sRedisKey, $sRedisValue, $job);
} catch (\LogicException $e) {
$this->oQueueJob->handleNonRetryableException($e, $sRedisKey, $sRedisValue, $job);
} catch (\Exception $e) {
$this->oQueueJob->handleRetryableException($e, $sRedisKey, $sRedisValue, $job);
} finally {
$this->oQueueJob->finnal();
}
}
private function markFailed($checkId, $msg)
{
$row = Db::name('article_reference_check_result')->where('id', $checkId)->find();
try {
(new ReferenceCheckService())->updateCheckResult($checkId, [
'status' => ReferenceCheckService::RECORD_FAILED,
'error_msg' => $msg,
]);
} catch (\Exception $e) {
\think\Log::error('ReferenceCheck markFailed: ' . $e->getMessage());
}
$amId = empty($row) ? 0 : intval(isset($row['am_id']) ? $row['am_id'] : 0);
if ($amId > 0) {
(new ReferenceCheckService())->syncAmRefCheckStatus($amId);
}
}
}

View File

@@ -1,162 +0,0 @@
<?php
namespace app\api\job;
use think\Db;
use think\queue\Job;
use app\common\QueueJob;
use app\common\QueueRedis;
use app\common\ReferenceCheckService;
use app\common\service\LLMService;
class ReferenceCheckTwo
{
private $oQueueJob;
private $QueueRedis;
private $completedExprie = 3600;
public function __construct()
{
$this->oQueueJob = new QueueJob();
$this->QueueRedis = QueueRedis::getInstance();
}
public function fire(Job $job, $data)
{
$this->oQueueJob->init($job);
$rawBody = empty($job->getRawBody()) ? '' : $job->getRawBody();
$jobData = empty($rawBody) ? [] : json_decode($rawBody, true);
$jobId = empty($jobData['id']) ? 'unknown' : $jobData['id'];
$sRedisKey = '';
$sRedisValue = '';
$this->oQueueJob->log("-----------队列任务开始-----------");
$this->oQueueJob->log("当前任务ID: {$jobId}, 尝试次数: {$job->attempts()}");
try {
$checkId = intval(isset($data['check_id']) ? $data['check_id'] : 0);
if ($checkId <= 0 && !empty($jobData['data']['check_id'])) {
$checkId = intval($jobData['data']['check_id']);
}
$sClassName = get_class($this);
$sRedisKey = "queue_job_two:{$sClassName}:{$checkId}";
$sRedisValue = uniqid() . '_' . getmypid();
if (!$this->oQueueJob->acquireLock($sRedisKey, $sRedisValue, $job)) {
return;
}
if ($checkId <= 0) {
$job->delete();
return;
}
$row = Db::name('article_reference_check_result')->where('id', $checkId)->find();
if (empty($row)) {
$job->delete();
return;
}
// if (intval($row['status']) === ReferenceCheckService::RECORD_COMPLETED) {
// $job->delete();
// return;
// }
try {
$svc = new ReferenceCheckService();
$contentA = $svc->resolveMainContentForJob($row);
$referText = trim((string)(isset($row['refer_text']) ? $row['refer_text'] : ''));
$refer = null;
if (intval($row['p_refer_id']) > 0) {
$refer = Db::name('production_article_refer')
->where('p_refer_id', intval($row['p_refer_id']))
->where('state', 0)
->find();
}
$payload = $svc->prepareRecheckPayload(is_array($refer) ? $refer : [], $referText);
$doiBlock = $payload['doi_block'];
if ($contentA === '' || $referText === '') {
$this->markFailed($checkId, 'Missing article_main.content or refer_text');
$job->delete();
return;
}
$llm = new LLMService();
$llmResult = $llm->checkReference($contentA, $referText, true, $doiBlock);
$requestFailed = !empty($llmResult['request_failed']);
$canSupport = $svc->parseLlmCanSupport($llmResult);
$tag = $payload['has_abstract']
? ('[Crossref复核' . ($payload['doi_used'] !== '' ? ' ' . $payload['doi_used'] : '') . ']')
: '[Crossref复核-无摘要]';
$reason = $tag . ' ' . (isset($llmResult['reason']) ? $llmResult['reason'] : '');
// LLM 通讯失败:写 status=RECORD_FAILED(3) 并抛异常触发队列重试
if ($requestFailed) {
$svc->updateCheckResult($checkId, [
'confidence' => floatval($llmResult['confidence']),
'reason' => $reason,
'status' => ReferenceCheckService::RECORD_FAILED,
'error_msg' => isset($llmResult['reason']) ? $llmResult['reason'] : 'LLM request failed',
]);
throw new \RuntimeException(isset($llmResult['reason']) ? $llmResult['reason'] : 'LLM request failed');
}
$affected = $svc->updateCheckResult($checkId, [
'can_support' => $canSupport ? 1 : 0,
'is_match' => $canSupport ? 1 : 0,
'confidence' => floatval($llmResult['confidence']),
'reason' => $reason,
'status' => ReferenceCheckService::RECORD_COMPLETED,
'error_msg' => '',
]);
$this->oQueueJob->log("Crossref复核写入 id={$checkId} affected={$affected} can_support=" . ($canSupport ? 1 : 0) . " confidence=" . floatval($llmResult['confidence']));
$amId = intval(isset($row['am_id']) ? $row['am_id'] : 0);
if ($amId > 0) {
$svc->syncAmRefCheckStatus($amId);
}
$this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie, $sRedisValue);
$job->delete();
$this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey}");
} catch (\Exception $e) {
$this->oQueueJob->log('ReferenceCheckTwo error: ' . $e->getMessage());
if ($job->attempts() >= 3) {
$this->markFailed($checkId, $e->getMessage());
$job->delete();
return;
}
$job->release(30);
}
} catch (\RuntimeException $e) {
$this->oQueueJob->handleRetryableException($e, $sRedisKey, $sRedisValue, $job);
} catch (\LogicException $e) {
$this->oQueueJob->handleNonRetryableException($e, $sRedisKey, $sRedisValue, $job);
} catch (\Exception $e) {
$this->oQueueJob->handleRetryableException($e, $sRedisKey, $sRedisValue, $job);
} finally {
$this->oQueueJob->finnal();
}
}
private function markFailed($checkId, $msg)
{
$row = Db::name('article_reference_check_result')->where('id', $checkId)->find();
try {
(new ReferenceCheckService())->updateCheckResult($checkId, [
'status' => ReferenceCheckService::RECORD_FAILED,
'error_msg' => $msg,
]);
} catch (\Exception $e) {
\think\Log::error('ReferenceCheckTwo markFailed: ' . $e->getMessage());
}
$amId = empty($row) ? 0 : intval(isset($row['am_id']) ? $row['am_id'] : 0);
if ($amId > 0) {
(new ReferenceCheckService())->syncAmRefCheckStatus($amId);
}
}
}

View File

@@ -9,4 +9,6 @@
// | Author: yunwuxin <448901948@qq.com> // | Author: yunwuxin <448901948@qq.com>
// +---------------------------------------------------------------------- // +----------------------------------------------------------------------
return []; return [
'app\\command\\ReferenceCheckMqConsume',
];

View File

@@ -4,16 +4,17 @@ namespace app\common;
use think\Db; use think\Db;
use think\Env; use think\Env;
use think\Queue;
use app\common\service\LLMService; use app\common\service\LLMService;
use app\common\mq\ReferenceCheckMqPublisher;
/** /**
* 正文 &lt;blue&gt;[n]&lt;/blue&gt; 引用与 t_production_article_referindex+1=n相关性校对。 * 正文 &lt;blue&gt;[n]&lt;/blue&gt; 引用与 t_production_article_referindex+1=n相关性校对。
* LLM 配置与 PromotionLlmService 相同;单条任务走 ReferenceCheck 队列 * LLM 配置与 PromotionLlmService 相同;异步任务走 RabbitMQ一篇一条消息
*/ */
class ReferenceCheckService class ReferenceCheckService
{ {
const QUEUE_NAME = 'ReferenceCheck'; /** API 返回异步传输方式RabbitMQ 文章批次) */
const TRANSPORT_RABBITMQ = 'rabbitmq';
/** t_article_main.type */ /** t_article_main.type */
const MAIN_TYPE_TEXT = 0; const MAIN_TYPE_TEXT = 0;
@@ -29,6 +30,9 @@ class ReferenceCheckService
/** @var bool|null t_article_main 是否已有 ref_check_status 列 */ /** @var bool|null t_article_main 是否已有 ref_check_status 列 */
private static $amRefCheckStatusColumnExists = null; private static $amRefCheckStatusColumnExists = null;
/** 单条任务最多重试次数(不含首次执行) */
const QUEUE_MAX_RETRY = 1;
/** /**
* 引用校对状态生命周期顺序0→1→2→3 = 待→进行→完成→失败) * 引用校对状态生命周期顺序0→1→2→3 = 待→进行→完成→失败)
* *
@@ -56,6 +60,12 @@ class ReferenceCheckService
const RECORD_COMPLETED = 2; // 校对完成 const RECORD_COMPLETED = 2; // 校对完成
const RECORD_FAILED = 3; // 校对失败 const RECORD_FAILED = 3; // 校对失败
/** 队列执行状态queue_status */
const QUEUE_PENDING = 0; // 已入队待执行
const QUEUE_RUNNING = 1; // worker 正在执行
const QUEUE_COMPLETED = 2; // 执行完成
const QUEUE_FAILED = 3; // 最终失败(重试耗尽)
/** LLM 评分confidence通过阈值>= 该值视为"通过" */ /** LLM 评分confidence通过阈值>= 该值视为"通过" */
const PASS_CONFIDENCE_THRESHOLD = 0.65; const PASS_CONFIDENCE_THRESHOLD = 0.65;
@@ -69,6 +79,12 @@ class ReferenceCheckService
const BLUE_TAG_REGEX = '/<blue>\[([\d,\-\x{2013}\x{2014}\x{2212}\x{2010}\x{2011}\s]+)\]<\/blue>/u'; const BLUE_TAG_REGEX = '/<blue>\[([\d,\-\x{2013}\x{2014}\x{2212}\x{2010}\x{2011}\s]+)\]<\/blue>/u';
const BLUE_TAG_REGEX_BRACKET_OUTSIDE = '/\[<blue>([\d,\-\x{2013}\x{2014}\x{2212}\x{2010}\x{2011}\s]+)<\/blue>\]/u'; const BLUE_TAG_REGEX_BRACKET_OUTSIDE = '/\[<blue>([\d,\-\x{2013}\x{2014}\x{2212}\x{2010}\x{2011}\s]+)<\/blue>\]/u';
private $logFile;
public function __construct()
{
$this->logFile = ROOT_PATH . 'runtime' . DS . 'plagiarism_task.log';
}
/** /**
* 兼容无 ?? 的 PHP 版本 * 兼容无 ?? 的 PHP 版本
*/ */
@@ -77,6 +93,27 @@ class ReferenceCheckService
return isset($arr[$key]) ? $arr[$key] : $default; return isset($arr[$key]) ? $arr[$key] : $default;
} }
/** 新建/重置校对明细时的队列初始字段 */
private function newCheckRecordFields(array $fields, $queueStatus = self::QUEUE_PENDING, $retryCount = 0)
{
$fields['queue_status'] = intval($queueStatus);
$fields['retry_count'] = max(0, intval($retryCount));
return $fields;
}
public function markQueueRuntime($checkId, $queueStatus, $retryCount = null)
{
$checkId = intval($checkId);
if ($checkId <= 0) {
return 0;
}
$fields = ['queue_status' => intval($queueStatus)];
if ($retryCount !== null) {
$fields['retry_count'] = max(0, intval($retryCount));
}
return Db::name('article_reference_check_result')->where('id', $checkId)->update($fields);
}
/** /**
* 合并匹配两种 blue 引用排版,按在正文中的起始位置排序。 * 合并匹配两种 blue 引用排版,按在正文中的起始位置排序。
* *
@@ -128,7 +165,7 @@ class ReferenceCheckService
} }
$now = date('Y-m-d H:i:s'); $now = date('Y-m-d H:i:s');
$checkId = Db::name('article_reference_check_result')->insertGetId([ $checkId = Db::name('article_reference_check_result')->insertGetId($this->newCheckRecordFields([
'article_id' => intval($this->arrGet($extra, 'article_id', 0)), 'article_id' => intval($this->arrGet($extra, 'article_id', 0)),
'am_id' => intval($this->arrGet($extra, 'am_id', 0)), 'am_id' => intval($this->arrGet($extra, 'am_id', 0)),
'p_article_id' => intval($this->arrGet($extra, 'p_article_id', 0)), 'p_article_id' => intval($this->arrGet($extra, 'p_article_id', 0)),
@@ -145,14 +182,14 @@ class ReferenceCheckService
'status' => 0, 'status' => 0,
'created_at' => $now, 'created_at' => $now,
'updated_at' => $now, 'updated_at' => $now,
]); ]));
$amId = intval($this->arrGet($extra, 'am_id', 0)); $amId = intval($this->arrGet($extra, 'am_id', 0));
if ($amId > 0) { if ($amId > 0) {
$this->setAmRefCheckStatus($amId, self::AM_STATUS_RUNNING); $this->setAmRefCheckStatus($amId, self::AM_STATUS_RUNNING);
} }
$this->pushJob(intval($checkId), intval($this->arrGet($extra, 'queue_delay', 0))); $this->startArticleCheckQueue([intval($checkId)], intval($this->arrGet($extra, 'p_article_id', 0)), 'enqueue');
return ['check_id' => $checkId, 'queued' => 1]; return ['check_id' => $checkId, 'queued' => 1];
} }
@@ -190,7 +227,8 @@ class ReferenceCheckService
} }
$skipped = 0; $skipped = 0;
$delay = 0; $pendingJobs = [];
$now = date('Y-m-d H:i:s');
foreach ($citations as $cite) { foreach ($citations as $cite) {
foreach ($cite['reference_numbers'] as $refNo) { foreach ($cite['reference_numbers'] as $refNo) {
$referIndex = $refNo - 1; $referIndex = $refNo - 1;
@@ -201,9 +239,7 @@ class ReferenceCheckService
$refer = $referMap[$referIndex]; $refer = $referMap[$referIndex];
$referText = $this->formatReferForLlm($refer); $referText = $this->formatReferForLlm($refer);
$now = date('Y-m-d H:i:s'); $checkId = Db::name('article_reference_check_result')->insertGetId($this->newCheckRecordFields([
// [70-73] 展开为 reference_no=70,71,72,73 共 4 条记录
$checkId = Db::name('article_reference_check_result')->insertGetId([
'article_id' => $main['article_id'], 'article_id' => $main['article_id'],
'p_article_id' => $pArticleId, 'p_article_id' => $pArticleId,
'am_id' => intval($main['am_id']), 'am_id' => intval($main['am_id']),
@@ -211,22 +247,27 @@ class ReferenceCheckService
'refer_index' => $refNo, 'refer_index' => $refNo,
'origin_text' => $cite['original_text'], 'origin_text' => $cite['original_text'],
'refer_text' => $referText, 'refer_text' => $referText,
'p_refer_id' => $referMap[$referIndex]['p_refer_id'], 'p_refer_id' => $referMap[$referIndex]['p_refer_id'],
'text_start' => $cite['text_start'], 'text_start' => $cite['text_start'],
'text_end' => $cite['text_end'], 'text_end' => $cite['text_end'],
'status' => self::RECORD_PENDING,
'created_at' => $now, 'created_at' => $now,
'updated_at' => $now, 'updated_at' => $now,
]); ]));
$this->pushJob(intval($checkId), $delay); $pendingJobs[] = [
$checkIds[] = $checkId; 'check_id' => intval($checkId),
$delay += 1; 'reference_no' => intval($refNo),
'am_id' => intval($main['am_id']),
'text_start' => intval($cite['text_start']),
];
} }
} }
$this->enqueueChecksSortedByReferenceNo($pendingJobs, $pArticleId, 'enqueue');
$this->setAmRefCheckStatus($amId, self::AM_STATUS_RUNNING); $this->setAmRefCheckStatus($amId, self::AM_STATUS_RUNNING);
} }
/** /**
* 手工触发:对已完成且 confidence<=0.65 的记录入队 DOI 第二轮复核 * 手工触发:对已完成且 confidence<=0.65 的记录同步执行 Crossref 二轮复核
*/ */
public function enqueueSecondPassByArticle($articleId) public function enqueueSecondPassByArticle($articleId)
{ {
@@ -247,7 +288,7 @@ class ReferenceCheckService
$delay2 = 0; $delay2 = 0;
foreach ($rows as $checkLog) { foreach ($rows as $checkLog) {
$rowId = $this->resolveCheckRowId($checkLog); $rowId = $this->resolveCheckRowId($checkLog);
if ($this->maybeEnqueueSecondPass($rowId, floatval($checkLog['confidence']))) { if ($this->runSecondPassIfNeeded($rowId, floatval($checkLog['confidence']))) {
$checkIds2[] = $rowId; $checkIds2[] = $rowId;
$delay2 += 1; $delay2 += 1;
} }
@@ -299,7 +340,7 @@ class ReferenceCheckService
$referText = $this->formatReferForLlm($refer); $referText = $this->formatReferForLlm($refer);
// [70-73] 展开为 reference_no=70,71,72,73 共 4 条记录;先入队表,再按文献号正序校对 // [70-73] 展开为 reference_no=70,71,72,73 共 4 条记录;先入队表,再按文献号正序校对
$checkId = Db::name('article_reference_check_result')->insertGetId([ $checkId = Db::name('article_reference_check_result')->insertGetId($this->newCheckRecordFields([
'article_id' => $main['article_id'], 'article_id' => $main['article_id'],
'p_article_id' => $pArticleId, 'p_article_id' => $pArticleId,
'am_id' => $amId, 'am_id' => $amId,
@@ -310,9 +351,10 @@ class ReferenceCheckService
'p_refer_id' => $referMap[$referIndex]['p_refer_id'], 'p_refer_id' => $referMap[$referIndex]['p_refer_id'],
'text_start' => $cite['text_start'], 'text_start' => $cite['text_start'],
'text_end' => $cite['text_end'], 'text_end' => $cite['text_end'],
'status' => self::RECORD_PENDING,
'created_at' => $now, 'created_at' => $now,
'updated_at' => $now, 'updated_at' => $now,
]); ]));
$pendingJobs[] = [ $pendingJobs[] = [
'check_id' => intval($checkId), 'check_id' => intval($checkId),
@@ -325,8 +367,7 @@ class ReferenceCheckService
} }
} }
} }
$checkIds = $this->enqueueChecksSortedByReferenceNo($pendingJobs, $pArticleId, 'enqueue');
$checkIds = $this->pushJobsSortedByReferenceNo($pendingJobs);
foreach (array_keys($amIdsWithJobs) as $amId) { foreach (array_keys($amIdsWithJobs) as $amId) {
$this->setAmRefCheckStatus($amId, self::AM_STATUS_RUNNING); $this->setAmRefCheckStatus($amId, self::AM_STATUS_RUNNING);
} }
@@ -337,7 +378,7 @@ class ReferenceCheckService
'queued' => $queued, 'queued' => $queued,
'skipped' => $skipped, 'skipped' => $skipped,
'check_ids' => $checkIds, 'check_ids' => $checkIds,
'queue' => self::QUEUE_NAME, 'queue' => self::TRANSPORT_RABBITMQ,
]; ];
} }
public function enqueueByArticle($articleId){ public function enqueueByArticle($articleId){
@@ -386,7 +427,7 @@ class ReferenceCheckService
$referText = $this->formatReferForLlm($refer); $referText = $this->formatReferForLlm($refer);
// [70-73] 展开为 reference_no=70,71,72,73 共 4 条记录;先入队表,再按文献号正序校对 // [70-73] 展开为 reference_no=70,71,72,73 共 4 条记录;先入队表,再按文献号正序校对
$checkId = Db::name('article_reference_check_result')->insertGetId([ $checkId = Db::name('article_reference_check_result')->insertGetId($this->newCheckRecordFields([
'article_id' => $main['article_id'], 'article_id' => $main['article_id'],
'p_article_id' => $pArticleId, 'p_article_id' => $pArticleId,
'am_id' => $amId, 'am_id' => $amId,
@@ -397,9 +438,10 @@ class ReferenceCheckService
'p_refer_id' => $referMap[$referIndex]['p_refer_id'], 'p_refer_id' => $referMap[$referIndex]['p_refer_id'],
'text_start' => $cite['text_start'], 'text_start' => $cite['text_start'],
'text_end' => $cite['text_end'], 'text_end' => $cite['text_end'],
'status' => self::RECORD_PENDING,
'created_at' => $now, 'created_at' => $now,
'updated_at' => $now, 'updated_at' => $now,
]); ]));
$pendingJobs[] = [ $pendingJobs[] = [
'check_id' => intval($checkId), 'check_id' => intval($checkId),
@@ -413,7 +455,7 @@ class ReferenceCheckService
} }
} }
$checkIds = $this->pushJobsSortedByReferenceNo($pendingJobs); $checkIds = $this->enqueueChecksSortedByReferenceNo($pendingJobs, $pArticleId, 'enqueue');
foreach (array_keys($amIdsWithJobs) as $amId) { foreach (array_keys($amIdsWithJobs) as $amId) {
$this->setAmRefCheckStatus($amId, self::AM_STATUS_RUNNING); $this->setAmRefCheckStatus($amId, self::AM_STATUS_RUNNING);
} }
@@ -424,7 +466,7 @@ class ReferenceCheckService
'queued' => $queued, 'queued' => $queued,
'skipped' => $skipped, 'skipped' => $skipped,
'check_ids' => $checkIds, 'check_ids' => $checkIds,
'queue' => self::QUEUE_NAME, 'queue' => self::TRANSPORT_RABBITMQ,
]; ];
} }
@@ -524,14 +566,6 @@ class ReferenceCheckService
->whereIn('state', [0, 2]) ->whereIn('state', [0, 2])
->value('article_id')); ->value('article_id'));
// 先清掉旧记录对应的队列 Redis 锁,避免在途 worker 写回数据
$oldIds = Db::name('article_reference_check_result')
->where('p_article_id', $pArticleId)
->column('id');
foreach ($oldIds as $oldId) {
$this->clearReferenceCheckQueueLock(intval($oldId));
}
$deleted = Db::name('article_reference_check_result') $deleted = Db::name('article_reference_check_result')
->where('p_article_id', $pArticleId) ->where('p_article_id', $pArticleId)
->delete(); ->delete();
@@ -553,14 +587,6 @@ class ReferenceCheckService
return 0; return 0;
} }
// 先清掉旧记录对应的队列 Redis 锁,否则同 check_id 在 TTL 内不会再次执行
$oldIds = Db::name('article_reference_check_result')
->where('article_id', $articleId)
->column('id');
foreach ($oldIds as $oldId) {
$this->clearReferenceCheckQueueLock(intval($oldId));
}
$deleted = Db::name('article_reference_check_result')->where('article_id', $articleId)->delete(); $deleted = Db::name('article_reference_check_result')->where('article_id', $articleId)->delete();
if ($this->hasAmRefCheckStatusColumn()) { if ($this->hasAmRefCheckStatusColumn()) {
Db::name('article_main') Db::name('article_main')
@@ -1518,7 +1544,7 @@ class ReferenceCheckService
* 编辑某条文献内容后,按 p_refer_id 异步重新校对该文献对应的全部 check 明细 * 编辑某条文献内容后,按 p_refer_id 异步重新校对该文献对应的全部 check 明细
* *
* 流程:刷新 refer_text/refer_index → 重置 status/is_match/confidence/reason * 流程:刷新 refer_text/refer_index → 重置 status/is_match/confidence/reason
* → 设节级 ref_check_status=RUNNING → 投递 ReferenceCheck 队列 * → 设节级 ref_check_status=RUNNING → 投递 RabbitMQ 文章批次
* *
* 与 recheckByRefer 的差异:本方法**不**在请求内同步跑 LLM仅入队立即返回。 * 与 recheckByRefer 的差异:本方法**不**在请求内同步跑 LLM仅入队立即返回。
* 前端可调 getProgressByPArticleId 轮询进度。 * 前端可调 getProgressByPArticleId 轮询进度。
@@ -1567,11 +1593,11 @@ class ReferenceCheckService
'reset' => 0, 'reset' => 0,
'queued' => 0, 'queued' => 0,
'check_ids' => [], 'check_ids' => [],
'queue' => self::QUEUE_NAME, 'queue' => self::TRANSPORT_RABBITMQ,
]; ];
} }
$resetFields = [ $resetFields = $this->newCheckRecordFields([
'refer_text' => $referText, 'refer_text' => $referText,
'refer_index' => $referenceNo, 'refer_index' => $referenceNo,
'reference_no' => $referenceNo, 'reference_no' => $referenceNo,
@@ -1582,14 +1608,13 @@ class ReferenceCheckService
'reason' => '', 'reason' => '',
'error_msg' => '', 'error_msg' => '',
'updated_at' => $now, 'updated_at' => $now,
]; ], self::QUEUE_PENDING, 0);
$pendingJobs = []; $pendingJobs = [];
$amIds = []; $amIds = [];
foreach ($rows as $row) { foreach ($rows as $row) {
$checkId = $this->resolveCheckRowId($row); $checkId = $this->resolveCheckRowId($row);
Db::name('article_reference_check_result')->where('id', $checkId)->update($resetFields); Db::name('article_reference_check_result')->where('id', $checkId)->update($resetFields);
$this->clearReferenceCheckQueueLock($checkId);
$pendingJobs[] = [ $pendingJobs[] = [
'check_id' => $checkId, 'check_id' => $checkId,
'reference_no' => $referenceNo, 'reference_no' => $referenceNo,
@@ -1606,7 +1631,7 @@ class ReferenceCheckService
$this->setAmRefCheckStatus($amId, self::AM_STATUS_RUNNING); $this->setAmRefCheckStatus($amId, self::AM_STATUS_RUNNING);
} }
$checkIds = $this->pushJobsSortedByReferenceNo($pendingJobs); $checkIds = $this->enqueueChecksSortedByReferenceNo($pendingJobs, $pArticleId, 'enqueue');
return [ return [
'p_refer_id' => $pReferId, 'p_refer_id' => $pReferId,
@@ -1615,7 +1640,7 @@ class ReferenceCheckService
'reset' => count($rows), 'reset' => count($rows),
'queued' => count($checkIds), 'queued' => count($checkIds),
'check_ids' => $checkIds, 'check_ids' => $checkIds,
'queue' => self::QUEUE_NAME, 'queue' => self::TRANSPORT_RABBITMQ,
]; ];
} }
@@ -1652,7 +1677,7 @@ class ReferenceCheckService
'reset' => 0, 'reset' => 0,
'queued' => 0, 'queued' => 0,
'check_ids' => [], 'check_ids' => [],
'queue' => self::QUEUE_NAME, 'queue' => self::TRANSPORT_RABBITMQ,
]; ];
} }
@@ -1661,7 +1686,7 @@ class ReferenceCheckService
} }
$now = date('Y-m-d H:i:s'); $now = date('Y-m-d H:i:s');
$resetFields = [ $resetFields = $this->newCheckRecordFields([
'status' => self::RECORD_PENDING, 'status' => self::RECORD_PENDING,
'is_match' => 0, 'is_match' => 0,
'can_support' => 0, 'can_support' => 0,
@@ -1669,14 +1694,13 @@ class ReferenceCheckService
'reason' => '', 'reason' => '',
'error_msg' => '', 'error_msg' => '',
'updated_at' => $now, 'updated_at' => $now,
]; ], self::QUEUE_PENDING, 0);
$pendingJobs = []; $pendingJobs = [];
$amIds = []; $amIds = [];
foreach ($rows as $row) { foreach ($rows as $row) {
$checkId = $this->resolveCheckRowId($row); $checkId = $this->resolveCheckRowId($row);
Db::name('article_reference_check_result')->where('id', $checkId)->update($resetFields); Db::name('article_reference_check_result')->where('id', $checkId)->update($resetFields);
$this->clearReferenceCheckQueueLock($checkId);
$pendingJobs[] = [ $pendingJobs[] = [
'check_id' => $checkId, 'check_id' => $checkId,
'reference_no' => intval($this->arrGet($row, 'reference_no', 0)), 'reference_no' => intval($this->arrGet($row, 'reference_no', 0)),
@@ -1693,7 +1717,7 @@ class ReferenceCheckService
$this->setAmRefCheckStatus($amId, self::AM_STATUS_RUNNING); $this->setAmRefCheckStatus($amId, self::AM_STATUS_RUNNING);
} }
$checkIds = $this->pushJobsSortedByReferenceNo($pendingJobs); $checkIds = $this->enqueueChecksSortedByReferenceNo($pendingJobs, $pArticleId, 'recheck_failed');
return [ return [
'p_refer_id' => $pReferId, 'p_refer_id' => $pReferId,
@@ -1701,7 +1725,7 @@ class ReferenceCheckService
'reset' => count($rows), 'reset' => count($rows),
'queued' => count($checkIds), 'queued' => count($checkIds),
'check_ids' => $checkIds, 'check_ids' => $checkIds,
'queue' => self::QUEUE_NAME, 'queue' => self::TRANSPORT_RABBITMQ,
]; ];
} }
@@ -1735,11 +1759,11 @@ class ReferenceCheckService
'reset' => 0, 'reset' => 0,
'queued' => 0, 'queued' => 0,
'check_ids' => [], 'check_ids' => [],
'queue' => self::QUEUE_NAME, 'queue' => self::TRANSPORT_RABBITMQ,
]; ];
} }
$resetFields = [ $resetFields = $this->newCheckRecordFields([
'refer_text' => $referText, 'refer_text' => $referText,
'p_refer_id' => $pReferId, 'p_refer_id' => $pReferId,
'p_article_id' => $pArticleId, 'p_article_id' => $pArticleId,
@@ -1751,7 +1775,7 @@ class ReferenceCheckService
'reason' => '', 'reason' => '',
'error_msg' => '', 'error_msg' => '',
'updated_at' => $now, 'updated_at' => $now,
]; ], self::QUEUE_PENDING, 0);
$pendingJobs = []; $pendingJobs = [];
$amIds = []; $amIds = [];
@@ -1790,7 +1814,6 @@ class ReferenceCheckService
foreach ($pendingJobs as $job) { foreach ($pendingJobs as $job) {
$checkId = intval($job['check_id']); $checkId = intval($job['check_id']);
$checkIds[] = $checkId; $checkIds[] = $checkId;
$this->clearReferenceCheckQueueLock($checkId);
try { try {
$results[] = $this->runReferenceCheckOnce($checkId); $results[] = $this->runReferenceCheckOnce($checkId);
} catch (\Exception $e) { } catch (\Exception $e) {
@@ -1819,31 +1842,6 @@ class ReferenceCheckService
]; ];
} }
/**
* 清除队列 Redis 完成标记,避免重检任务被 acquireLock 静默丢弃
*/
public function clearReferenceCheckQueueLock($checkId)
{
$checkId = intval($checkId);
if ($checkId <= 0) {
return;
}
try {
$keys = [];
foreach (['queue_job', 'queue_job_two'] as $prefix) {
$class = $prefix === 'queue_job_two'
? 'app\\api\\job\\ReferenceCheckTwo'
: 'app\\api\\job\\ReferenceCheck';
$base = $prefix . ':' . $class . ':' . $checkId;
$keys[] = $base;
$keys[] = $base . ':status';
}
QueueRedis::getInstance()->deleteRedisKeys($keys);
} catch (\Exception $e) {
\think\Log::warning('clearReferenceCheckQueueLock id=' . $checkId . ' ' . $e->getMessage());
}
}
/** /**
* 执行一次引用 LLM 校对(同步,写回 article_reference_check_result * 执行一次引用 LLM 校对(同步,写回 article_reference_check_result
*/ */
@@ -1884,8 +1882,7 @@ class ReferenceCheckService
$confidence = floatval(isset($llmResult['confidence']) ? $llmResult['confidence'] : 0); $confidence = floatval(isset($llmResult['confidence']) ? $llmResult['confidence'] : 0);
$reason = isset($llmResult['reason']) ? $llmResult['reason'] : ''; $reason = isset($llmResult['reason']) ? $llmResult['reason'] : '';
// LLM 通讯失败:写 status=RECORD_FAILED(3) + error_msg抛异常让队列 worker 走 release(30) 重试 // LLM 通讯失败:写 status=RECORD_FAILED(3) + error_msg抛异常由 MQ worker 重试
// 重试 3 次后 ReferenceCheck::markFailed 会保持 status=3 收尾
if ($requestFailed) { if ($requestFailed) {
$this->updateCheckResult($checkId, [ $this->updateCheckResult($checkId, [
'confidence' => $confidence, 'confidence' => $confidence,
@@ -1893,7 +1890,6 @@ class ReferenceCheckService
'status' => self::RECORD_FAILED, 'status' => self::RECORD_FAILED,
'error_msg' => $reason, 'error_msg' => $reason,
]); ]);
$this->clearReferenceCheckQueueLock($checkId);
throw new \RuntimeException($reason !== '' ? $reason : 'LLM request failed'); throw new \RuntimeException($reason !== '' ? $reason : 'LLM request failed');
} }
@@ -1906,8 +1902,9 @@ class ReferenceCheckService
'error_msg' => '', 'error_msg' => '',
]); ]);
$this->clearReferenceCheckQueueLock($checkId); if ($confidence <= self::PASS_CONFIDENCE_THRESHOLD) {
$this->maybeEnqueueSecondPass($checkId, $confidence); $this->runSecondPassBlocking($checkId, $row, $contentA, $refer, $contentB);
}
return [ return [
'check_id' => $checkId, 'check_id' => $checkId,
@@ -1918,6 +1915,82 @@ class ReferenceCheckService
]; ];
} }
/**
* 低分结果的二轮 DOI 复核(同步阻塞执行;失败重试一次)
*/
public function runSecondPassBlocking($checkId, array $row, $contentA, $refer, $referText)
{
$checkId = intval($checkId);
if ($checkId <= 0) {
return false;
}
$payload = $this->prepareRecheckPayload(is_array($refer) ? $refer : [], trim((string)$referText));
if (empty($payload['has_abstract']) || trim((string)$payload['doi_block']) === '') {
return false;
}
$lastError = '';
for ($attempt = 0; $attempt < 2; $attempt++) {
try {
$llmResult = (new LLMService())->checkReference($contentA, trim((string)$referText), true, $payload['doi_block']);
$requestFailed = !empty($llmResult['request_failed']);
$canSupport = $this->parseLlmCanSupport($llmResult);
$confidence = floatval(isset($llmResult['confidence']) ? $llmResult['confidence'] : 0);
$tag = '[Crossref复核' . (trim((string)$payload['doi_used']) !== '' ? (' ' . trim((string)$payload['doi_used'])) : '') . ']';
$reason = $tag . ' ' . (isset($llmResult['reason']) ? $llmResult['reason'] : '');
if ($requestFailed) {
$lastError = isset($llmResult['reason']) ? (string)$llmResult['reason'] : 'LLM request failed';
if ($attempt < 1) {
continue;
}
$this->updateCheckResult($checkId, [
'confidence' => $confidence,
'reason' => $reason,
'status' => self::RECORD_FAILED,
'error_msg' => $lastError,
]);
$amId = intval(isset($row['am_id']) ? $row['am_id'] : 0);
if ($amId > 0) {
$this->syncAmRefCheckStatus($amId);
}
return false;
}
$this->updateCheckResult($checkId, [
'can_support' => $canSupport ? 1 : 0,
'is_match' => $canSupport ? 1 : 0,
'confidence' => $confidence,
'reason' => $reason,
'status' => self::RECORD_COMPLETED,
'error_msg' => '',
]);
$amId = intval(isset($row['am_id']) ? $row['am_id'] : 0);
if ($amId > 0) {
$this->syncAmRefCheckStatus($amId);
}
return true;
} catch (\Exception $e) {
$lastError = $e->getMessage();
if ($attempt < 1) {
continue;
}
$this->updateCheckResult($checkId, [
'status' => self::RECORD_FAILED,
'error_msg' => $lastError,
]);
$amId = intval(isset($row['am_id']) ? $row['am_id'] : 0);
if ($amId > 0) {
$this->syncAmRefCheckStatus($amId);
}
return false;
}
}
return false;
}
/** /**
* @return array{refer: array, p_article_id: int, p_refer_id: int, reference_no: int} * @return array{refer: array, p_article_id: int, p_refer_id: int, reference_no: int}
*/ */
@@ -2622,18 +2695,13 @@ class ReferenceCheckService
} }
/** /**
* 第一轮 confidence<=0.65 且能抓到 DOI 真实内容时,延迟入队第二轮复核 * 对已完成且低分的记录尝试同步 Crossref 二轮(供 enqueueSecondPassByArticle 等手工入口)
*
* 跳过条件(避免无意义重跑得到相同结果):
* - check_id 不合法 / 一次置信度高于阈值
* - refer 行不存在
* - refer_doi 为空或 Crossref 未返回摘要
*/ */
public function maybeEnqueueSecondPass($checkId, $confidence) public function runSecondPassIfNeeded($checkId, $confidence)
{ {
$checkId = intval($checkId); $checkId = intval($checkId);
$confidence = floatval($confidence); $confidence = floatval($confidence);
if ($checkId <= 0 || $confidence > 0.65) { if ($checkId <= 0 || $confidence > self::PASS_CONFIDENCE_THRESHOLD) {
return false; return false;
} }
@@ -2658,9 +2726,13 @@ class ReferenceCheckService
return false; return false;
} }
$this->clearReferenceCheckQueueLock($checkId); $contentA = $this->resolveMainContentForJob($row);
$this->pushJob2($checkId, 5); $referText = trim((string)$this->arrGet($row, 'refer_text', ''));
return true; if ($referText === '' && is_array($refer)) {
$referText = $this->formatReferForLlm($refer);
}
return $this->runSecondPassBlocking($checkId, $row, $contentA, $refer, $referText);
} }
/** /**
@@ -3047,72 +3119,93 @@ class ReferenceCheckService
} }
/** /**
* 已入库记录按文献编号正序入队(同号按 am_id、正文位置稳定排序 * 批量记录已入库后创建文章批次并投递 RabbitMQ
* *
* @param array $rows 元素含 check_id、reference_no可选 am_id、text_start * @param array $rows 元素含 check_id
* @param int $pArticleId
* @param string $trigger enqueue|recheck_failed|manual
* @return int[] check_id 列表
*/ */
private function pushJobsSortedByReferenceNo(array $rows) private function enqueueChecksSortedByReferenceNo(array $rows, $pArticleId = 0, $trigger = 'enqueue')
{ {
if (empty($rows)) { $checkIds = [];
foreach ($rows as $row) {
$checkId = intval($row['check_id']);
if ($checkId > 0) {
$checkIds[] = $checkId;
}
}
if (!empty($checkIds)) {
$this->startArticleCheckQueue($checkIds, intval($pArticleId), $trigger);
}
return $checkIds;
}
/**
* 创建文章批次;队首批次立即发 MQ其余批次等待前序完成
*
* @param int[] $checkIds
* @param int $pArticleId
* @param string $trigger
* @return int[]
*/
public function startArticleCheckQueue(array $checkIds, $pArticleId = 0, $trigger = 'enqueue')
{
$checkIds = array_values(array_filter(array_map('intval', $checkIds)));
if (empty($checkIds)) {
return []; return [];
} }
usort($rows, function ($a, $b) { $pArticleId = intval($pArticleId);
if ($a['reference_no'] !== $b['reference_no']) { if ($pArticleId <= 0) {
return $a['reference_no'] - $b['reference_no']; $firstRow = Db::name('article_reference_check_result')->where('id', $checkIds[0])->find();
} $pArticleId = empty($firstRow) ? 0 : intval($this->arrGet($firstRow, 'p_article_id', 0));
$amA = isset($a['am_id']) ? intval($a['am_id']) : 0; }
$amB = isset($b['am_id']) ? intval($b['am_id']) : 0; if ($pArticleId <= 0) {
if ($amA !== $amB) { throw new \RuntimeException('p_article_id is required for reference check queue');
return $amA - $amB; }
}
$posA = isset($a['text_start']) ? intval($a['text_start']) : 0;
$posB = isset($b['text_start']) ? intval($b['text_start']) : 0;
return $posA - $posB;
});
$checkIds = []; $now = date('Y-m-d H:i:s');
$delay = 0; $batchId = Db::name('article_reference_check_batch')->insertGetId([
foreach ($rows as $row) { 'p_article_id' => $pArticleId,
$checkId = intval($row['check_id']); 'batch_status' => 0,
$checkIds[] = $checkId; 'total_count' => count($checkIds),
$this->pushJob($checkId, $delay); 'done_count' => 0,
$delay++; 'failed_count' => 0,
'trigger' => (string)$trigger,
'created_at' => $now,
'updated_at' => $now,
]);
$shouldPublish = !$this->hasEarlierWaitingBatch($batchId) && !$this->hasRunningReferenceCheckBatch();
if ($shouldPublish) {
(new ReferenceCheckMqPublisher())->publishArticleStart($pArticleId, intval($batchId), $trigger);
$this->log('startArticleCheckQueue publish p_article_id=' . $pArticleId . ' batch_id=' . $batchId);
} else {
$this->log('startArticleCheckQueue queued batch_id=' . $batchId . ' p_article_id=' . $pArticleId);
} }
return $checkIds; return $checkIds;
} }
private function pushJob($checkId, $delaySeconds = 0) private function hasRunningReferenceCheckBatch()
{ {
$checkId = intval($checkId); return Db::name('article_reference_check_batch')
$this->clearReferenceCheckQueueLock($checkId); ->where('batch_status', 1)
$jobClass = 'app\api\job\ReferenceCheck@fire'; ->count() > 0;
$data = ['check_id' => $checkId];
try {
if ($delaySeconds > 0) {
$jobId = Queue::later($delaySeconds, $jobClass, $data, self::QUEUE_NAME);
} else {
$jobId = Queue::push($jobClass, $data, self::QUEUE_NAME);
}
} catch (\Exception $e) {
\think\Log::error('ReferenceCheck pushJob failed check_id=' . $checkId . ' ' . $e->getMessage());
throw $e;
}
} }
private function pushJob2($checkId, $delaySeconds = 0)
private function hasEarlierWaitingBatch($batchId)
{ {
$jobClass = 'app\api\job\ReferenceCheckTwo@fire'; return Db::name('article_reference_check_batch')
$data = ['check_id' => $checkId]; ->where('batch_status', 0)
try { ->where('id', '<', intval($batchId))
if ($delaySeconds > 0) { ->count() > 0;
$jobId = Queue::later($delaySeconds, $jobClass, $data, self::QUEUE_NAME); }
} else {
$jobId = Queue::push($jobClass, $data, self::QUEUE_NAME); public function log($msg)
} {
} catch (\Exception $e) { $line = date('Y-m-d H:i:s') . ' ' . $msg . PHP_EOL;
\think\Log::error('ReferenceCheckTwo pushJob failed check_id=' . $checkId . ' ' . $e->getMessage()); @file_put_contents($this->logFile, $line, FILE_APPEND);
throw $e;
}
} }
} }

View File

@@ -33,7 +33,7 @@ class LLMService
public function checkReference($contextText, $referText, $isAgain = false, $doiBlock = null) public function checkReference($contextText, $referText, $isAgain = false, $doiBlock = null)
{ {
// request_failed=true 表示"LLM 通讯/解析层面的失败"(可重试,区别于业务上的"未命中" // request_failed=true 表示"LLM 通讯/解析层面的失败"(可重试,区别于业务上的"未命中"
// 上游 runReferenceCheckOnce 会据此把 DB.status 置为 2(失败) 并抛异常触发队列重试 // 上游 runReferenceCheckOnce 会据此把 DB.status 置为 3(失败) 并抛异常触发 MQ worker 重试
$fallback = [ $fallback = [
'can_support' => false, 'can_support' => false,
'is_match' => false, 'is_match' => false,