文献校对功能转rabbitMQ
This commit is contained in:
77
application/command/ReferenceCheckMqConsume.php
Normal file
77
application/command/ReferenceCheckMqConsume.php
Normal file
@@ -0,0 +1,77 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace app\command;
|
||||||
|
|
||||||
|
use think\console\Command;
|
||||||
|
use think\console\Input;
|
||||||
|
use think\console\Output;
|
||||||
|
use app\common\mq\RabbitMqConfig;
|
||||||
|
use app\common\mq\ReferenceCheckArticleWorker;
|
||||||
|
|
||||||
|
class ReferenceCheckMqConsume extends Command
|
||||||
|
{
|
||||||
|
protected function configure()
|
||||||
|
{
|
||||||
|
$this->setName('reference_check:mq-consume')
|
||||||
|
->setDescription('Consume RabbitMQ reference check article queue');
|
||||||
|
}
|
||||||
|
|
||||||
|
protected function execute(Input $input, Output $output)
|
||||||
|
{
|
||||||
|
if (!class_exists('\\PhpAmqpLib\\Connection\\AMQPStreamConnection')) {
|
||||||
|
$output->writeln('<error>php-amqplib not installed. Run: php composer.phar require php-amqplib/php-amqplib:^2.12</error>');
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
$rc = RabbitMqConfig::referenceCheck();
|
||||||
|
$exchange = isset($rc['exchange']) ? $rc['exchange'] : 'reference_check';
|
||||||
|
$queue = isset($rc['queue']) ? $rc['queue'] : 'ref_check.article';
|
||||||
|
$routeKey = isset($rc['route_key']) ? $rc['route_key'] : 'article.start';
|
||||||
|
|
||||||
|
$conn = new \PhpAmqpLib\Connection\AMQPStreamConnection(
|
||||||
|
RabbitMqConfig::get('host', '127.0.0.1'),
|
||||||
|
intval(RabbitMqConfig::get('port', 5672)),
|
||||||
|
RabbitMqConfig::get('user', 'guest'),
|
||||||
|
RabbitMqConfig::get('password', 'guest'),
|
||||||
|
RabbitMqConfig::get('vhost', '/')
|
||||||
|
);
|
||||||
|
$ch = $conn->channel();
|
||||||
|
$ch->exchange_declare($exchange, 'direct', false, true, false);
|
||||||
|
$dlq = isset($rc['dlq']) ? $rc['dlq'] : 'ref_check.article.dlq';
|
||||||
|
$ch->queue_declare($dlq, false, true, false, false);
|
||||||
|
$ch->queue_declare($queue, false, true, false, false, false, new \PhpAmqpLib\Wire\AMQPTable([
|
||||||
|
'x-dead-letter-exchange' => '',
|
||||||
|
'x-dead-letter-routing-key' => $dlq,
|
||||||
|
]));
|
||||||
|
$ch->queue_bind($queue, $exchange, $routeKey);
|
||||||
|
$ch->basic_qos(null, 1, null);
|
||||||
|
|
||||||
|
$output->writeln('Waiting on queue: ' . $queue);
|
||||||
|
|
||||||
|
$worker = new ReferenceCheckArticleWorker();
|
||||||
|
$callback = function ($msg) use ($worker, $output) {
|
||||||
|
$payload = json_decode($msg->body, true);
|
||||||
|
if (!is_array($payload)) {
|
||||||
|
$msg->ack();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
$worker->handleMessage($payload);
|
||||||
|
$msg->ack();
|
||||||
|
} catch (\Exception $e) {
|
||||||
|
\think\Log::error('reference_check:mq-consume ' . $e->getMessage());
|
||||||
|
$output->writeln('<error>' . $e->getMessage() . '</error>');
|
||||||
|
$msg->nack(false, false);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
$ch->basic_consume($queue, '', false, false, false, false, $callback);
|
||||||
|
while ($ch->is_consuming()) {
|
||||||
|
$ch->wait();
|
||||||
|
}
|
||||||
|
|
||||||
|
$ch->close();
|
||||||
|
$conn->close();
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
24
application/common/mq/RabbitMqConfig.php
Normal file
24
application/common/mq/RabbitMqConfig.php
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace app\common\mq;
|
||||||
|
|
||||||
|
class RabbitMqConfig
|
||||||
|
{
|
||||||
|
public static function get($key = null, $default = null)
|
||||||
|
{
|
||||||
|
$cfg = config('rabbitmq');
|
||||||
|
if (!is_array($cfg)) {
|
||||||
|
$cfg = [];
|
||||||
|
}
|
||||||
|
if ($key === null) {
|
||||||
|
return $cfg;
|
||||||
|
}
|
||||||
|
return isset($cfg[$key]) ? $cfg[$key] : $default;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static function referenceCheck()
|
||||||
|
{
|
||||||
|
$rc = self::get('reference_check', []);
|
||||||
|
return is_array($rc) ? $rc : [];
|
||||||
|
}
|
||||||
|
}
|
||||||
200
application/common/mq/ReferenceCheckArticleWorker.php
Normal file
200
application/common/mq/ReferenceCheckArticleWorker.php
Normal file
@@ -0,0 +1,200 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace app\common\mq;
|
||||||
|
|
||||||
|
use think\Db;
|
||||||
|
use app\common\ReferenceCheckService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RabbitMQ 消费:按文章串行,文章内 reference_no 升序逐条校对(含低分同步二轮)
|
||||||
|
*/
|
||||||
|
class ReferenceCheckArticleWorker
|
||||||
|
{
|
||||||
|
const BATCH_WAITING = 0;
|
||||||
|
const BATCH_RUNNING = 1;
|
||||||
|
const BATCH_DONE = 2;
|
||||||
|
const BATCH_PARTIAL_FAILED = 3;
|
||||||
|
|
||||||
|
/** @var ReferenceCheckService */
|
||||||
|
private $svc;
|
||||||
|
|
||||||
|
public function __construct()
|
||||||
|
{
|
||||||
|
$this->svc = new ReferenceCheckService();
|
||||||
|
}
|
||||||
|
|
||||||
|
public function handleMessage(array $payload)
|
||||||
|
{
|
||||||
|
$pArticleId = intval(isset($payload['p_article_id']) ? $payload['p_article_id'] : 0);
|
||||||
|
$batchId = intval(isset($payload['batch_id']) ? $payload['batch_id'] : 0);
|
||||||
|
if ($pArticleId <= 0 || $batchId <= 0) {
|
||||||
|
$this->svc->log('ReferenceCheckArticleWorker invalid payload');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!$this->canStartArticleWork($batchId)) {
|
||||||
|
$this->svc->log('ReferenceCheckArticleWorker defer batch_id=' . $batchId . ' other article running');
|
||||||
|
(new ReferenceCheckMqPublisher())->publishArticleStart($pArticleId, $batchId, isset($payload['trigger']) ? $payload['trigger'] : 'enqueue');
|
||||||
|
sleep(3);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!$this->claimBatch($batchId)) {
|
||||||
|
$batch = $this->getBatch($batchId);
|
||||||
|
if (empty($batch) || intval($batch['batch_status']) === self::BATCH_DONE) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->svc->log('ReferenceCheckArticleWorker start p_article_id=' . $pArticleId . ' batch_id=' . $batchId);
|
||||||
|
|
||||||
|
$done = 0;
|
||||||
|
$failed = 0;
|
||||||
|
while (true) {
|
||||||
|
$row = $this->fetchNextPendingRow($pArticleId);
|
||||||
|
if (empty($row)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
$checkId = $this->svc->resolveCheckRowId($row);
|
||||||
|
if ($checkId <= 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
$result = $this->processOneRow($checkId, $row);
|
||||||
|
if ($result === 'ok') {
|
||||||
|
$done++;
|
||||||
|
} elseif ($result === 'failed') {
|
||||||
|
$failed++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->finalizeBatch($batchId, $done, $failed);
|
||||||
|
$this->svc->log('ReferenceCheckArticleWorker done p_article_id=' . $pArticleId . ' batch_id=' . $batchId . ' done=' . $done . ' failed=' . $failed);
|
||||||
|
|
||||||
|
$this->publishNextWaitingBatch();
|
||||||
|
}
|
||||||
|
|
||||||
|
private function canStartArticleWork($batchId)
|
||||||
|
{
|
||||||
|
$running = Db::name('article_reference_check_batch')
|
||||||
|
->where('batch_status', self::BATCH_RUNNING)
|
||||||
|
->where('id', '<>', intval($batchId))
|
||||||
|
->count();
|
||||||
|
return intval($running) === 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private function claimBatch($batchId)
|
||||||
|
{
|
||||||
|
$now = date('Y-m-d H:i:s');
|
||||||
|
$affected = Db::name('article_reference_check_batch')
|
||||||
|
->where('id', intval($batchId))
|
||||||
|
->whereIn('batch_status', [self::BATCH_WAITING, self::BATCH_RUNNING])
|
||||||
|
->update([
|
||||||
|
'batch_status' => self::BATCH_RUNNING,
|
||||||
|
'updated_at' => $now,
|
||||||
|
]);
|
||||||
|
return intval($affected) > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private function getBatch($batchId)
|
||||||
|
{
|
||||||
|
return Db::name('article_reference_check_batch')->where('id', intval($batchId))->find();
|
||||||
|
}
|
||||||
|
|
||||||
|
private function fetchNextPendingRow($pArticleId)
|
||||||
|
{
|
||||||
|
return Db::name('article_reference_check_result')
|
||||||
|
->where('p_article_id', intval($pArticleId))
|
||||||
|
->where('queue_status', ReferenceCheckService::QUEUE_PENDING)
|
||||||
|
->where('status', ReferenceCheckService::RECORD_PENDING)
|
||||||
|
->order('reference_no asc,am_id asc,text_start asc,id asc')
|
||||||
|
->find();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return string ok|failed|skip
|
||||||
|
*/
|
||||||
|
private function processOneRow($checkId, array $row)
|
||||||
|
{
|
||||||
|
$claimed = Db::name('article_reference_check_result')
|
||||||
|
->where('id', intval($checkId))
|
||||||
|
->where('queue_status', ReferenceCheckService::QUEUE_PENDING)
|
||||||
|
->update(['queue_status' => ReferenceCheckService::QUEUE_RUNNING]);
|
||||||
|
if (intval($claimed) <= 0) {
|
||||||
|
return 'skip';
|
||||||
|
}
|
||||||
|
|
||||||
|
$retryCount = intval(isset($row['retry_count']) ? $row['retry_count'] : 0);
|
||||||
|
try {
|
||||||
|
$this->svc->runReferenceCheckOnce($checkId);
|
||||||
|
$amId = intval(isset($row['am_id']) ? $row['am_id'] : 0);
|
||||||
|
if ($amId > 0) {
|
||||||
|
$this->svc->syncAmRefCheckStatus($amId);
|
||||||
|
}
|
||||||
|
$this->svc->markQueueRuntime($checkId, ReferenceCheckService::QUEUE_COMPLETED, $retryCount);
|
||||||
|
return 'ok';
|
||||||
|
} catch (\Exception $e) {
|
||||||
|
$this->svc->log('ReferenceCheckArticleWorker check_id=' . $checkId . ' err=' . $e->getMessage());
|
||||||
|
if ($retryCount < ReferenceCheckService::QUEUE_MAX_RETRY) {
|
||||||
|
$this->svc->markQueueRuntime($checkId, ReferenceCheckService::QUEUE_PENDING, $retryCount + 1);
|
||||||
|
return $this->processOneRow($checkId, array_merge($row, ['retry_count' => $retryCount + 1]));
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
$this->svc->updateCheckResult($checkId, [
|
||||||
|
'status' => ReferenceCheckService::RECORD_FAILED,
|
||||||
|
'error_msg' => $e->getMessage(),
|
||||||
|
]);
|
||||||
|
$this->svc->markQueueRuntime($checkId, ReferenceCheckService::QUEUE_FAILED, $retryCount);
|
||||||
|
} catch (\Exception $e2) {
|
||||||
|
\think\Log::error('ReferenceCheckArticleWorker markFailed: ' . $e2->getMessage());
|
||||||
|
}
|
||||||
|
$amId = intval(isset($row['am_id']) ? $row['am_id'] : 0);
|
||||||
|
if ($amId > 0) {
|
||||||
|
$this->svc->syncAmRefCheckStatus($amId);
|
||||||
|
}
|
||||||
|
return 'failed';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private function finalizeBatch($batchId, $done, $failed)
|
||||||
|
{
|
||||||
|
$batch = $this->getBatch($batchId);
|
||||||
|
if (empty($batch)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
$total = intval($batch['total_count']);
|
||||||
|
$status = self::BATCH_DONE;
|
||||||
|
if ($failed > 0) {
|
||||||
|
$status = self::BATCH_PARTIAL_FAILED;
|
||||||
|
}
|
||||||
|
Db::name('article_reference_check_batch')->where('id', intval($batchId))->update([
|
||||||
|
'batch_status' => $status,
|
||||||
|
'done_count' => intval($done),
|
||||||
|
'failed_count' => intval($failed),
|
||||||
|
'updated_at' => date('Y-m-d H:i:s'),
|
||||||
|
]);
|
||||||
|
if ($total > 0 && ($done + $failed) < $total) {
|
||||||
|
$this->svc->log('ReferenceCheckArticleWorker batch_id=' . $batchId . ' incomplete total=' . $total);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private function publishNextWaitingBatch()
|
||||||
|
{
|
||||||
|
$next = Db::name('article_reference_check_batch')
|
||||||
|
->where('batch_status', self::BATCH_WAITING)
|
||||||
|
->order('id asc')
|
||||||
|
->find();
|
||||||
|
if (empty($next)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
(new ReferenceCheckMqPublisher())->publishArticleStart(
|
||||||
|
intval($next['p_article_id']),
|
||||||
|
intval($next['id']),
|
||||||
|
isset($next['trigger']) ? $next['trigger'] : 'enqueue'
|
||||||
|
);
|
||||||
|
} catch (\Exception $e) {
|
||||||
|
$this->svc->log('publishNextWaitingBatch failed: ' . $e->getMessage());
|
||||||
|
\think\Log::error('publishNextWaitingBatch: ' . $e->getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
73
application/common/mq/ReferenceCheckMqPublisher.php
Normal file
73
application/common/mq/ReferenceCheckMqPublisher.php
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace app\common\mq;
|
||||||
|
|
||||||
|
class ReferenceCheckMqPublisher
|
||||||
|
{
|
||||||
|
public function publishArticleStart($pArticleId, $batchId, $trigger = 'enqueue')
|
||||||
|
{
|
||||||
|
if (!class_exists('\\PhpAmqpLib\\Connection\\AMQPStreamConnection')) {
|
||||||
|
throw new \RuntimeException('php-amqplib not installed. Run: php composer.phar require php-amqplib/php-amqplib:^2.12');
|
||||||
|
}
|
||||||
|
|
||||||
|
$pArticleId = intval($pArticleId);
|
||||||
|
$batchId = intval($batchId);
|
||||||
|
if ($pArticleId <= 0 || $batchId <= 0) {
|
||||||
|
throw new \InvalidArgumentException('invalid p_article_id or batch_id');
|
||||||
|
}
|
||||||
|
|
||||||
|
$body = json_encode([
|
||||||
|
'p_article_id' => $pArticleId,
|
||||||
|
'batch_id' => $batchId,
|
||||||
|
'trigger' => (string)$trigger,
|
||||||
|
'ts' => time(),
|
||||||
|
], JSON_UNESCAPED_UNICODE);
|
||||||
|
|
||||||
|
$rc = RabbitMqConfig::referenceCheck();
|
||||||
|
$exchange = isset($rc['exchange']) ? $rc['exchange'] : 'reference_check';
|
||||||
|
$routeKey = isset($rc['route_key']) ? $rc['route_key'] : 'article.start';
|
||||||
|
|
||||||
|
$conn = $this->connect();
|
||||||
|
try {
|
||||||
|
$ch = $conn->channel();
|
||||||
|
$this->declareTopology($ch, $rc);
|
||||||
|
$msg = new \PhpAmqpLib\Message\AMQPMessage($body, [
|
||||||
|
'content_type' => 'application/json',
|
||||||
|
'delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT,
|
||||||
|
]);
|
||||||
|
$ch->basic_publish($msg, $exchange, $routeKey);
|
||||||
|
$ch->close();
|
||||||
|
} finally {
|
||||||
|
$conn->close();
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private function connect()
|
||||||
|
{
|
||||||
|
return new \PhpAmqpLib\Connection\AMQPStreamConnection(
|
||||||
|
RabbitMqConfig::get('host', '127.0.0.1'),
|
||||||
|
intval(RabbitMqConfig::get('port', 5672)),
|
||||||
|
RabbitMqConfig::get('user', 'guest'),
|
||||||
|
RabbitMqConfig::get('password', 'guest'),
|
||||||
|
RabbitMqConfig::get('vhost', '/')
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private function declareTopology($ch, array $rc)
|
||||||
|
{
|
||||||
|
$exchange = isset($rc['exchange']) ? $rc['exchange'] : 'reference_check';
|
||||||
|
$queue = isset($rc['queue']) ? $rc['queue'] : 'ref_check.article';
|
||||||
|
$dlq = isset($rc['dlq']) ? $rc['dlq'] : 'ref_check.article.dlq';
|
||||||
|
$routeKey = isset($rc['route_key']) ? $rc['route_key'] : 'article.start';
|
||||||
|
|
||||||
|
$ch->exchange_declare($exchange, 'direct', false, true, false);
|
||||||
|
$ch->queue_declare($dlq, false, true, false, false);
|
||||||
|
$ch->queue_declare($queue, false, true, false, false, false, new \PhpAmqpLib\Wire\AMQPTable([
|
||||||
|
'x-dead-letter-exchange' => '',
|
||||||
|
'x-dead-letter-routing-key' => $dlq,
|
||||||
|
]));
|
||||||
|
$ch->queue_bind($queue, $exchange, $routeKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
16
application/extra/rabbitmq.php
Normal file
16
application/extra/rabbitmq.php
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
return [
|
||||||
|
'host' => '127.0.0.1',
|
||||||
|
'port' => 5672,
|
||||||
|
'user' => 'admin',
|
||||||
|
'password' => '751019',
|
||||||
|
'vhost' => '/',
|
||||||
|
|
||||||
|
'reference_check' => [
|
||||||
|
'exchange' => 'reference_check',
|
||||||
|
'queue' => 'ref_check.article',
|
||||||
|
'dlq' => 'ref_check.article.dlq',
|
||||||
|
'route_key' => 'article.start',
|
||||||
|
],
|
||||||
|
];
|
||||||
Reference in New Issue
Block a user