Files
tougao/application/common/mq/ReferenceCheckArticleWorker.php

214 lines
7.3 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
namespace app\common\mq;
use think\Db;
use app\common\DbReconnectHelper;
use app\common\ReferenceRelevanceCheckService;
/**
* RabbitMQ 消费(队列 reference_check / ref_check.article
* 全局文章串行,文章内 reference_no 升序链式逐条「主题相关性」校对。
*/
class ReferenceCheckArticleWorker
{
const BATCH_WAITING = 0;
const BATCH_RUNNING = 1;
const BATCH_DONE = 2;
const BATCH_PARTIAL_FAILED = 3;
/** @var ReferenceRelevanceCheckService */
private $svc;
public function __construct()
{
$this->svc = new ReferenceRelevanceCheckService();
}
public function handleMessage(array $payload)
{
DbReconnectHelper::ensure();
$pArticleId = intval(isset($payload['p_article_id']) ? $payload['p_article_id'] : 0);
$batchId = intval(isset($payload['batch_id']) ? $payload['batch_id'] : 0);
$trigger = isset($payload['trigger']) ? (string)$payload['trigger'] : 'enqueue';
if ($pArticleId <= 0 || $batchId <= 0) {
$this->svc->log('ReferenceCheckArticleWorker invalid payload');
return;
}
if (!$this->canStartArticleWork($batchId)) {
$this->svc->log('ReferenceCheckArticleWorker defer batch_id=' . $batchId . ' other article running');
(new ReferenceCheckMqPublisher())->publishArticleStart(
$pArticleId,
$batchId,
isset($payload['trigger']) ? $payload['trigger'] : 'enqueue'
);
sleep(3);
return;
}
if (!$this->claimBatch($batchId)) {
$batch = $this->getBatch($batchId);
if (empty($batch) || intval($batch['batch_status']) === self::BATCH_DONE) {
return;
}
}
$this->svc->recoverQueueRowsForArticle($pArticleId);
if ($trigger !== 'recheck_pending_only'
&& ReferenceRelevanceCheckService::PREPARE_LITERATURE_BEFORE_CHECK) {
$this->svc->prepareLiteratureContentByArticle($pArticleId);
}
$this->svc->log('ReferenceCheckArticleWorker start p_article_id=' . $pArticleId . ' batch_id=' . $batchId);
$done = 0;
$failed = 0;
while (true) {
$row = $this->fetchNextPendingRow($pArticleId);
if (empty($row)) {
break;
}
$checkId = $this->svc->resolveCheckRowId($row);
if ($checkId <= 0) {
continue;
}
$result = $this->processOneRow($checkId, $row, $trigger === 'recheck_pending_only');
if ($result === 'ok') {
$done++;
} elseif ($result === 'failed') {
$failed++;
}
}
$this->finalizeBatch($batchId, $done, $failed);
$this->svc->log('ReferenceCheckArticleWorker done p_article_id=' . $pArticleId . ' batch_id=' . $batchId . ' done=' . $done . ' failed=' . $failed);
$this->publishNextWaitingBatch();
}
private function canStartArticleWork($batchId)
{
$running = Db::name('article_reference_relevance_check_batch')
->where('batch_status', self::BATCH_RUNNING)
->where('id', '<>', intval($batchId))
->count();
return intval($running) === 0;
}
private function claimBatch($batchId)
{
$now = date('Y-m-d H:i:s');
$affected = Db::name('article_reference_relevance_check_batch')
->where('id', intval($batchId))
->whereIn('batch_status', [self::BATCH_WAITING, self::BATCH_RUNNING])
->update([
'batch_status' => self::BATCH_RUNNING,
'updated_at' => $now,
]);
return intval($affected) > 0;
}
private function getBatch($batchId)
{
return Db::name('article_reference_relevance_check_batch')->where('id', intval($batchId))->find();
}
private function fetchNextPendingRow($pArticleId)
{
return Db::name('article_reference_relevance_check_result')
->where('p_article_id', intval($pArticleId))
->where('queue_status', ReferenceRelevanceCheckService::QUEUE_PENDING)
->where('status', ReferenceRelevanceCheckService::RECORD_PENDING)
->order('reference_no asc,am_id asc,text_start asc,id asc')
->find();
}
/**
* @return string ok|failed|skip
*/
private function processOneRow($checkId, array $row, $skipLiteratureFetch = false)
{
DbReconnectHelper::ensure();
$claimed = Db::name('article_reference_relevance_check_result')
->where('id', intval($checkId))
->where('queue_status', ReferenceRelevanceCheckService::QUEUE_PENDING)
->update(['queue_status' => ReferenceRelevanceCheckService::QUEUE_RUNNING]);
if (intval($claimed) <= 0) {
return 'skip';
}
$retryCount = intval(isset($row['retry_count']) ? $row['retry_count'] : 0);
try {
$this->svc->runCheckOnce($checkId, $skipLiteratureFetch);
$this->svc->markQueueRuntime($checkId, ReferenceRelevanceCheckService::QUEUE_COMPLETED, $retryCount);
return 'ok';
} catch (\Exception $e) {
$this->svc->log('ReferenceCheckArticleWorker check_id=' . $checkId . ' err=' . $e->getMessage());
DbReconnectHelper::ensure();
if ($retryCount < ReferenceRelevanceCheckService::QUEUE_MAX_RETRY) {
$this->svc->markQueueRuntime($checkId, ReferenceRelevanceCheckService::QUEUE_PENDING, $retryCount + 1);
return $this->processOneRow($checkId, array_merge($row, ['retry_count' => $retryCount + 1]), $skipLiteratureFetch);
}
try {
$fresh = Db::name('article_reference_relevance_check_result')->where('id', intval($checkId))->find();
$groupRows = !empty($fresh) ? $this->svc->findCitationGroupRowsForWorker($fresh) : [];
if (!empty($groupRows)) {
$this->svc->failGroupWithQueue($groupRows, $e->getMessage(), $retryCount);
} else {
$this->svc->updateCheckResult($checkId, [
'status' => ReferenceRelevanceCheckService::RECORD_FAILED,
'error_msg' => $e->getMessage(),
]);
$this->svc->markQueueRuntime($checkId, ReferenceRelevanceCheckService::QUEUE_FAILED, $retryCount);
}
} catch (\Exception $e2) {
\think\Log::error('ReferenceCheckArticleWorker markFailed: ' . $e2->getMessage());
}
return 'failed';
}
}
private function finalizeBatch($batchId, $done, $failed)
{
$batch = $this->getBatch($batchId);
if (empty($batch)) {
return;
}
$total = intval($batch['total_count']);
$status = self::BATCH_DONE;
if ($failed > 0) {
$status = self::BATCH_PARTIAL_FAILED;
}
Db::name('article_reference_relevance_check_batch')->where('id', intval($batchId))->update([
'batch_status' => $status,
'done_count' => intval($done),
'failed_count' => intval($failed),
'updated_at' => date('Y-m-d H:i:s'),
]);
if ($total > 0 && ($done + $failed) < $total) {
$this->svc->log('ReferenceCheckArticleWorker batch_id=' . $batchId . ' incomplete total=' . $total);
}
}
private function publishNextWaitingBatch()
{
$next = Db::name('article_reference_relevance_check_batch')
->where('batch_status', self::BATCH_WAITING)
->order('id asc')
->find();
if (empty($next)) {
return;
}
try {
(new ReferenceCheckMqPublisher())->publishArticleStart(
intval($next['p_article_id']),
intval($next['id']),
isset($next['trigger']) ? $next['trigger'] : 'enqueue'
);
} catch (\Exception $e) {
$this->svc->log('ReferenceCheck publishNextWaitingBatch failed: ' . $e->getMessage());
\think\Log::error('ReferenceCheck publishNextWaitingBatch: ' . $e->getMessage());
}
}
}