78 lines
2.5 KiB
PHP
78 lines
2.5 KiB
PHP
<?php
|
|
|
|
namespace app\command;
|
|
|
|
use think\console\Command;
|
|
use think\console\Input;
|
|
use think\console\Output;
|
|
use app\common\mq\RabbitMqConfig;
|
|
use app\common\mq\ReferenceCheckArticleWorker;
|
|
|
|
class ReferenceCheckMqConsume extends Command
|
|
{
|
|
protected function configure()
|
|
{
|
|
$this->setName('reference_check:mq-consume')
|
|
->setDescription('Consume RabbitMQ reference check article queue');
|
|
}
|
|
|
|
protected function execute(Input $input, Output $output)
|
|
{
|
|
if (!class_exists('\\PhpAmqpLib\\Connection\\AMQPStreamConnection')) {
|
|
$output->writeln('<error>php-amqplib not installed. Run: php composer.phar require php-amqplib/php-amqplib:^2.12</error>');
|
|
return 1;
|
|
}
|
|
|
|
$rc = RabbitMqConfig::referenceCheck();
|
|
$exchange = isset($rc['exchange']) ? $rc['exchange'] : 'reference_check';
|
|
$queue = isset($rc['queue']) ? $rc['queue'] : 'ref_check.article';
|
|
$routeKey = isset($rc['route_key']) ? $rc['route_key'] : 'article.start';
|
|
|
|
$conn = new \PhpAmqpLib\Connection\AMQPStreamConnection(
|
|
RabbitMqConfig::get('host', '127.0.0.1'),
|
|
intval(RabbitMqConfig::get('port', 5672)),
|
|
RabbitMqConfig::get('user', 'guest'),
|
|
RabbitMqConfig::get('password', 'guest'),
|
|
RabbitMqConfig::get('vhost', '/')
|
|
);
|
|
$ch = $conn->channel();
|
|
$ch->exchange_declare($exchange, 'direct', false, true, false);
|
|
$dlq = isset($rc['dlq']) ? $rc['dlq'] : 'ref_check.article.dlq';
|
|
$ch->queue_declare($dlq, false, true, false, false);
|
|
$ch->queue_declare($queue, false, true, false, false, false, new \PhpAmqpLib\Wire\AMQPTable([
|
|
'x-dead-letter-exchange' => '',
|
|
'x-dead-letter-routing-key' => $dlq,
|
|
]));
|
|
$ch->queue_bind($queue, $exchange, $routeKey);
|
|
$ch->basic_qos(null, 1, null);
|
|
|
|
$output->writeln('Waiting on queue: ' . $queue);
|
|
|
|
$worker = new ReferenceCheckArticleWorker();
|
|
$callback = function ($msg) use ($worker, $output) {
|
|
$payload = json_decode($msg->body, true);
|
|
if (!is_array($payload)) {
|
|
$msg->ack();
|
|
return;
|
|
}
|
|
try {
|
|
$worker->handleMessage($payload);
|
|
$msg->ack();
|
|
} catch (\Exception $e) {
|
|
\think\Log::error('reference_check:mq-consume ' . $e->getMessage());
|
|
$output->writeln('<error>' . $e->getMessage() . '</error>');
|
|
$msg->nack(false, false);
|
|
}
|
|
};
|
|
|
|
$ch->basic_consume($queue, '', false, false, false, false, $callback);
|
|
while ($ch->is_consuming()) {
|
|
$ch->wait();
|
|
}
|
|
|
|
$ch->close();
|
|
$conn->close();
|
|
return 0;
|
|
}
|
|
}
|