setName('reference_check:mq-consume') ->setDescription('Consume RabbitMQ reference check article queue'); } protected function execute(Input $input, Output $output) { if (!class_exists('\\PhpAmqpLib\\Connection\\AMQPStreamConnection')) { $output->writeln('php-amqplib not installed. Run: php composer.phar require php-amqplib/php-amqplib:^2.12'); return 1; } $rc = RabbitMqConfig::referenceCheck(); $exchange = isset($rc['exchange']) ? $rc['exchange'] : 'reference_check'; $queue = isset($rc['queue']) ? $rc['queue'] : 'ref_check.article'; $routeKey = isset($rc['route_key']) ? $rc['route_key'] : 'article.start'; $conn = new \PhpAmqpLib\Connection\AMQPStreamConnection( RabbitMqConfig::get('host', '127.0.0.1'), intval(RabbitMqConfig::get('port', 5672)), RabbitMqConfig::get('user', 'guest'), RabbitMqConfig::get('password', 'guest'), RabbitMqConfig::get('vhost', '/') ); $ch = $conn->channel(); $ch->exchange_declare($exchange, 'direct', false, true, false); $dlq = isset($rc['dlq']) ? $rc['dlq'] : 'ref_check.article.dlq'; $ch->queue_declare($dlq, false, true, false, false); $ch->queue_declare($queue, false, true, false, false, false, new \PhpAmqpLib\Wire\AMQPTable([ 'x-dead-letter-exchange' => '', 'x-dead-letter-routing-key' => $dlq, ])); $ch->queue_bind($queue, $exchange, $routeKey); $ch->basic_qos(null, 1, null); $output->writeln('Waiting on queue: ' . $queue); $worker = new ReferenceCheckArticleWorker(); $callback = function ($msg) use ($worker, $output) { $payload = json_decode($msg->body, true); if (!is_array($payload)) { $msg->ack(); return; } try { $worker->handleMessage($payload); $msg->ack(); } catch (\Exception $e) { \think\Log::error('reference_check:mq-consume ' . $e->getMessage()); $output->writeln('' . $e->getMessage() . ''); $msg->nack(false, false); } }; $ch->basic_consume($queue, '', false, false, false, false, $callback); while ($ch->is_consuming()) { $ch->wait(); } $ch->close(); $conn->close(); return 0; } }