74 lines
2.4 KiB
PHP
74 lines
2.4 KiB
PHP
<?php
|
|
|
|
namespace app\common\mq;
|
|
|
|
class ReferenceCheckMqPublisher
|
|
{
|
|
public function publishArticleStart($pArticleId, $batchId, $trigger = 'enqueue')
|
|
{
|
|
if (!class_exists('\\PhpAmqpLib\\Connection\\AMQPStreamConnection')) {
|
|
throw new \RuntimeException('php-amqplib not installed. Run: php composer.phar require php-amqplib/php-amqplib:^2.12');
|
|
}
|
|
|
|
$pArticleId = intval($pArticleId);
|
|
$batchId = intval($batchId);
|
|
if ($pArticleId <= 0 || $batchId <= 0) {
|
|
throw new \InvalidArgumentException('invalid p_article_id or batch_id');
|
|
}
|
|
|
|
$body = json_encode([
|
|
'p_article_id' => $pArticleId,
|
|
'batch_id' => $batchId,
|
|
'trigger' => (string)$trigger,
|
|
'ts' => time(),
|
|
], JSON_UNESCAPED_UNICODE);
|
|
|
|
$rc = RabbitMqConfig::referenceCheck();
|
|
$exchange = isset($rc['exchange']) ? $rc['exchange'] : 'reference_check';
|
|
$routeKey = isset($rc['route_key']) ? $rc['route_key'] : 'article.start';
|
|
|
|
$conn = $this->connect();
|
|
try {
|
|
$ch = $conn->channel();
|
|
$this->declareTopology($ch, $rc);
|
|
$msg = new \PhpAmqpLib\Message\AMQPMessage($body, [
|
|
'content_type' => 'application/json',
|
|
'delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT,
|
|
]);
|
|
$ch->basic_publish($msg, $exchange, $routeKey);
|
|
$ch->close();
|
|
} finally {
|
|
$conn->close();
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
private function connect()
|
|
{
|
|
return new \PhpAmqpLib\Connection\AMQPStreamConnection(
|
|
RabbitMqConfig::get('host', '127.0.0.1'),
|
|
intval(RabbitMqConfig::get('port', 5672)),
|
|
RabbitMqConfig::get('user', 'guest'),
|
|
RabbitMqConfig::get('password', 'guest'),
|
|
RabbitMqConfig::get('vhost', '/')
|
|
);
|
|
}
|
|
|
|
private function declareTopology($ch, array $rc)
|
|
{
|
|
$exchange = isset($rc['exchange']) ? $rc['exchange'] : 'reference_check';
|
|
$queue = isset($rc['queue']) ? $rc['queue'] : 'ref_check.article';
|
|
$dlq = isset($rc['dlq']) ? $rc['dlq'] : 'ref_check.article.dlq';
|
|
$routeKey = isset($rc['route_key']) ? $rc['route_key'] : 'article.start';
|
|
|
|
$ch->exchange_declare($exchange, 'direct', false, true, false);
|
|
$ch->queue_declare($dlq, false, true, false, false);
|
|
$ch->queue_declare($queue, false, true, false, false, false, new \PhpAmqpLib\Wire\AMQPTable([
|
|
'x-dead-letter-exchange' => '',
|
|
'x-dead-letter-routing-key' => $dlq,
|
|
]));
|
|
$ch->queue_bind($queue, $exchange, $routeKey);
|
|
}
|
|
}
|