From 100f3cf35cebb19649fbb229451355381224ff72 Mon Sep 17 00:00:00 2001 From: wyn <1074145239@qq.com> Date: Tue, 2 Jun 2026 10:00:36 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=87=E7=8C=AE=E6=A0=A1=E5=AF=B9=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=E8=BD=ACrabbitMQ?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../command/ReferenceCheckMqConsume.php | 77 +++++++ application/common/mq/RabbitMqConfig.php | 24 +++ .../common/mq/ReferenceCheckArticleWorker.php | 200 ++++++++++++++++++ .../common/mq/ReferenceCheckMqPublisher.php | 73 +++++++ application/extra/rabbitmq.php | 16 ++ 5 files changed, 390 insertions(+) create mode 100644 application/command/ReferenceCheckMqConsume.php create mode 100644 application/common/mq/RabbitMqConfig.php create mode 100644 application/common/mq/ReferenceCheckArticleWorker.php create mode 100644 application/common/mq/ReferenceCheckMqPublisher.php create mode 100644 application/extra/rabbitmq.php diff --git a/application/command/ReferenceCheckMqConsume.php b/application/command/ReferenceCheckMqConsume.php new file mode 100644 index 00000000..4f2f22f7 --- /dev/null +++ b/application/command/ReferenceCheckMqConsume.php @@ -0,0 +1,77 @@ +setName('reference_check:mq-consume') + ->setDescription('Consume RabbitMQ reference check article queue'); + } + + protected function execute(Input $input, Output $output) + { + if (!class_exists('\\PhpAmqpLib\\Connection\\AMQPStreamConnection')) { + $output->writeln('php-amqplib not installed. Run: php composer.phar require php-amqplib/php-amqplib:^2.12'); + return 1; + } + + $rc = RabbitMqConfig::referenceCheck(); + $exchange = isset($rc['exchange']) ? $rc['exchange'] : 'reference_check'; + $queue = isset($rc['queue']) ? $rc['queue'] : 'ref_check.article'; + $routeKey = isset($rc['route_key']) ? $rc['route_key'] : 'article.start'; + + $conn = new \PhpAmqpLib\Connection\AMQPStreamConnection( + RabbitMqConfig::get('host', '127.0.0.1'), + intval(RabbitMqConfig::get('port', 5672)), + RabbitMqConfig::get('user', 'guest'), + RabbitMqConfig::get('password', 'guest'), + RabbitMqConfig::get('vhost', '/') + ); + $ch = $conn->channel(); + $ch->exchange_declare($exchange, 'direct', false, true, false); + $dlq = isset($rc['dlq']) ? $rc['dlq'] : 'ref_check.article.dlq'; + $ch->queue_declare($dlq, false, true, false, false); + $ch->queue_declare($queue, false, true, false, false, false, new \PhpAmqpLib\Wire\AMQPTable([ + 'x-dead-letter-exchange' => '', + 'x-dead-letter-routing-key' => $dlq, + ])); + $ch->queue_bind($queue, $exchange, $routeKey); + $ch->basic_qos(null, 1, null); + + $output->writeln('Waiting on queue: ' . $queue); + + $worker = new ReferenceCheckArticleWorker(); + $callback = function ($msg) use ($worker, $output) { + $payload = json_decode($msg->body, true); + if (!is_array($payload)) { + $msg->ack(); + return; + } + try { + $worker->handleMessage($payload); + $msg->ack(); + } catch (\Exception $e) { + \think\Log::error('reference_check:mq-consume ' . $e->getMessage()); + $output->writeln('' . $e->getMessage() . ''); + $msg->nack(false, false); + } + }; + + $ch->basic_consume($queue, '', false, false, false, false, $callback); + while ($ch->is_consuming()) { + $ch->wait(); + } + + $ch->close(); + $conn->close(); + return 0; + } +} diff --git a/application/common/mq/RabbitMqConfig.php b/application/common/mq/RabbitMqConfig.php new file mode 100644 index 00000000..df30aa5e --- /dev/null +++ b/application/common/mq/RabbitMqConfig.php @@ -0,0 +1,24 @@ +svc = new ReferenceCheckService(); + } + + public function handleMessage(array $payload) + { + $pArticleId = intval(isset($payload['p_article_id']) ? $payload['p_article_id'] : 0); + $batchId = intval(isset($payload['batch_id']) ? $payload['batch_id'] : 0); + if ($pArticleId <= 0 || $batchId <= 0) { + $this->svc->log('ReferenceCheckArticleWorker invalid payload'); + return; + } + + if (!$this->canStartArticleWork($batchId)) { + $this->svc->log('ReferenceCheckArticleWorker defer batch_id=' . $batchId . ' other article running'); + (new ReferenceCheckMqPublisher())->publishArticleStart($pArticleId, $batchId, isset($payload['trigger']) ? $payload['trigger'] : 'enqueue'); + sleep(3); + return; + } + + if (!$this->claimBatch($batchId)) { + $batch = $this->getBatch($batchId); + if (empty($batch) || intval($batch['batch_status']) === self::BATCH_DONE) { + return; + } + } + + $this->svc->log('ReferenceCheckArticleWorker start p_article_id=' . $pArticleId . ' batch_id=' . $batchId); + + $done = 0; + $failed = 0; + while (true) { + $row = $this->fetchNextPendingRow($pArticleId); + if (empty($row)) { + break; + } + $checkId = $this->svc->resolveCheckRowId($row); + if ($checkId <= 0) { + continue; + } + $result = $this->processOneRow($checkId, $row); + if ($result === 'ok') { + $done++; + } elseif ($result === 'failed') { + $failed++; + } + } + + $this->finalizeBatch($batchId, $done, $failed); + $this->svc->log('ReferenceCheckArticleWorker done p_article_id=' . $pArticleId . ' batch_id=' . $batchId . ' done=' . $done . ' failed=' . $failed); + + $this->publishNextWaitingBatch(); + } + + private function canStartArticleWork($batchId) + { + $running = Db::name('article_reference_check_batch') + ->where('batch_status', self::BATCH_RUNNING) + ->where('id', '<>', intval($batchId)) + ->count(); + return intval($running) === 0; + } + + private function claimBatch($batchId) + { + $now = date('Y-m-d H:i:s'); + $affected = Db::name('article_reference_check_batch') + ->where('id', intval($batchId)) + ->whereIn('batch_status', [self::BATCH_WAITING, self::BATCH_RUNNING]) + ->update([ + 'batch_status' => self::BATCH_RUNNING, + 'updated_at' => $now, + ]); + return intval($affected) > 0; + } + + private function getBatch($batchId) + { + return Db::name('article_reference_check_batch')->where('id', intval($batchId))->find(); + } + + private function fetchNextPendingRow($pArticleId) + { + return Db::name('article_reference_check_result') + ->where('p_article_id', intval($pArticleId)) + ->where('queue_status', ReferenceCheckService::QUEUE_PENDING) + ->where('status', ReferenceCheckService::RECORD_PENDING) + ->order('reference_no asc,am_id asc,text_start asc,id asc') + ->find(); + } + + /** + * @return string ok|failed|skip + */ + private function processOneRow($checkId, array $row) + { + $claimed = Db::name('article_reference_check_result') + ->where('id', intval($checkId)) + ->where('queue_status', ReferenceCheckService::QUEUE_PENDING) + ->update(['queue_status' => ReferenceCheckService::QUEUE_RUNNING]); + if (intval($claimed) <= 0) { + return 'skip'; + } + + $retryCount = intval(isset($row['retry_count']) ? $row['retry_count'] : 0); + try { + $this->svc->runReferenceCheckOnce($checkId); + $amId = intval(isset($row['am_id']) ? $row['am_id'] : 0); + if ($amId > 0) { + $this->svc->syncAmRefCheckStatus($amId); + } + $this->svc->markQueueRuntime($checkId, ReferenceCheckService::QUEUE_COMPLETED, $retryCount); + return 'ok'; + } catch (\Exception $e) { + $this->svc->log('ReferenceCheckArticleWorker check_id=' . $checkId . ' err=' . $e->getMessage()); + if ($retryCount < ReferenceCheckService::QUEUE_MAX_RETRY) { + $this->svc->markQueueRuntime($checkId, ReferenceCheckService::QUEUE_PENDING, $retryCount + 1); + return $this->processOneRow($checkId, array_merge($row, ['retry_count' => $retryCount + 1])); + } + try { + $this->svc->updateCheckResult($checkId, [ + 'status' => ReferenceCheckService::RECORD_FAILED, + 'error_msg' => $e->getMessage(), + ]); + $this->svc->markQueueRuntime($checkId, ReferenceCheckService::QUEUE_FAILED, $retryCount); + } catch (\Exception $e2) { + \think\Log::error('ReferenceCheckArticleWorker markFailed: ' . $e2->getMessage()); + } + $amId = intval(isset($row['am_id']) ? $row['am_id'] : 0); + if ($amId > 0) { + $this->svc->syncAmRefCheckStatus($amId); + } + return 'failed'; + } + } + + private function finalizeBatch($batchId, $done, $failed) + { + $batch = $this->getBatch($batchId); + if (empty($batch)) { + return; + } + $total = intval($batch['total_count']); + $status = self::BATCH_DONE; + if ($failed > 0) { + $status = self::BATCH_PARTIAL_FAILED; + } + Db::name('article_reference_check_batch')->where('id', intval($batchId))->update([ + 'batch_status' => $status, + 'done_count' => intval($done), + 'failed_count' => intval($failed), + 'updated_at' => date('Y-m-d H:i:s'), + ]); + if ($total > 0 && ($done + $failed) < $total) { + $this->svc->log('ReferenceCheckArticleWorker batch_id=' . $batchId . ' incomplete total=' . $total); + } + } + + private function publishNextWaitingBatch() + { + $next = Db::name('article_reference_check_batch') + ->where('batch_status', self::BATCH_WAITING) + ->order('id asc') + ->find(); + if (empty($next)) { + return; + } + try { + (new ReferenceCheckMqPublisher())->publishArticleStart( + intval($next['p_article_id']), + intval($next['id']), + isset($next['trigger']) ? $next['trigger'] : 'enqueue' + ); + } catch (\Exception $e) { + $this->svc->log('publishNextWaitingBatch failed: ' . $e->getMessage()); + \think\Log::error('publishNextWaitingBatch: ' . $e->getMessage()); + } + } +} diff --git a/application/common/mq/ReferenceCheckMqPublisher.php b/application/common/mq/ReferenceCheckMqPublisher.php new file mode 100644 index 00000000..f3831eec --- /dev/null +++ b/application/common/mq/ReferenceCheckMqPublisher.php @@ -0,0 +1,73 @@ + $pArticleId, + 'batch_id' => $batchId, + 'trigger' => (string)$trigger, + 'ts' => time(), + ], JSON_UNESCAPED_UNICODE); + + $rc = RabbitMqConfig::referenceCheck(); + $exchange = isset($rc['exchange']) ? $rc['exchange'] : 'reference_check'; + $routeKey = isset($rc['route_key']) ? $rc['route_key'] : 'article.start'; + + $conn = $this->connect(); + try { + $ch = $conn->channel(); + $this->declareTopology($ch, $rc); + $msg = new \PhpAmqpLib\Message\AMQPMessage($body, [ + 'content_type' => 'application/json', + 'delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT, + ]); + $ch->basic_publish($msg, $exchange, $routeKey); + $ch->close(); + } finally { + $conn->close(); + } + + return true; + } + + private function connect() + { + return new \PhpAmqpLib\Connection\AMQPStreamConnection( + RabbitMqConfig::get('host', '127.0.0.1'), + intval(RabbitMqConfig::get('port', 5672)), + RabbitMqConfig::get('user', 'guest'), + RabbitMqConfig::get('password', 'guest'), + RabbitMqConfig::get('vhost', '/') + ); + } + + private function declareTopology($ch, array $rc) + { + $exchange = isset($rc['exchange']) ? $rc['exchange'] : 'reference_check'; + $queue = isset($rc['queue']) ? $rc['queue'] : 'ref_check.article'; + $dlq = isset($rc['dlq']) ? $rc['dlq'] : 'ref_check.article.dlq'; + $routeKey = isset($rc['route_key']) ? $rc['route_key'] : 'article.start'; + + $ch->exchange_declare($exchange, 'direct', false, true, false); + $ch->queue_declare($dlq, false, true, false, false); + $ch->queue_declare($queue, false, true, false, false, false, new \PhpAmqpLib\Wire\AMQPTable([ + 'x-dead-letter-exchange' => '', + 'x-dead-letter-routing-key' => $dlq, + ])); + $ch->queue_bind($queue, $exchange, $routeKey); + } +} diff --git a/application/extra/rabbitmq.php b/application/extra/rabbitmq.php new file mode 100644 index 00000000..a98eb526 --- /dev/null +++ b/application/extra/rabbitmq.php @@ -0,0 +1,16 @@ + '127.0.0.1', + 'port' => 5672, + 'user' => 'admin', + 'password' => '751019', + 'vhost' => '/', + + 'reference_check' => [ + 'exchange' => 'reference_check', + 'queue' => 'ref_check.article', + 'dlq' => 'ref_check.article.dlq', + 'route_key' => 'article.start', + ], +];