first commit
This commit is contained in:
185
src/main/java/com/peanut/config/DelayQueueConfig.java
Normal file
185
src/main/java/com/peanut/config/DelayQueueConfig.java
Normal file
@@ -0,0 +1,185 @@
|
||||
package com.peanut.config;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.*;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* RabbitMq 延时队列实现
|
||||
* @author AnYuan
|
||||
*/
|
||||
|
||||
@Slf4j
|
||||
@Configuration
|
||||
public class DelayQueueConfig {
|
||||
|
||||
/**
|
||||
* 延迟队列
|
||||
*/
|
||||
public static final String DELAY_EXCHANGE = "delay.queue.business.exchange";
|
||||
public static final String DELAY_QUEUE = "delay.queue.business.queue";
|
||||
public static final String DELAY_QUEUE_ROUTING_KEY = "delay.queue.business.queue.routingKey";
|
||||
|
||||
|
||||
/**
|
||||
* 订单队列
|
||||
*/
|
||||
public static final String ORDER_EVENT_EXCHANGE = "order-event-exchange";
|
||||
public static final String ORDER_DELAY_QUEUE = "order.delay.queue";
|
||||
public static final String ORDER_RELEASE_QUEUE = "order.release.queue";
|
||||
public static final String ORDER_QUEUE_ROUTING_KEY = "order.release.order";
|
||||
|
||||
/**
|
||||
* 死信队列
|
||||
*/
|
||||
public static final String DEAD_LETTER_EXCHANGE = "delay.queue.deadLetter.exchange";
|
||||
public static final String DEAD_LETTER_QUEUE_ROUTING_KEY = "delay.queue.deadLetter.delay_10s.routingKey";
|
||||
public static final String DEAD_LETTER_QUEUE = "delay.queue.deadLetter.queue";
|
||||
|
||||
|
||||
/**
|
||||
* 快递队列
|
||||
*/
|
||||
public static final String FMS_QUEUE= "fms.queue";
|
||||
public static final String FMS_EXCHANGE = "fms.exchange";
|
||||
public static final String FMS_ROUTING_KEY = "fms.routing.key";
|
||||
|
||||
|
||||
/**
|
||||
* 声明 死信交换机
|
||||
* @return deadLetterExchange
|
||||
*/
|
||||
@Bean
|
||||
public DirectExchange deadLetterExchange() {
|
||||
return new DirectExchange(DEAD_LETTER_EXCHANGE);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 声明 订单交换机
|
||||
* @return deadLetterExchange
|
||||
*/
|
||||
@Bean
|
||||
public Exchange orderEventExchange() {
|
||||
return new TopicExchange(ORDER_EVENT_EXCHANGE,true,false);
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue orderDelayQueue() {
|
||||
|
||||
HashMap<String, Object> map = new HashMap<>();
|
||||
|
||||
// 队列绑定DLX参数(关键一步)
|
||||
map.put("x-dead-letter-exchange", ORDER_EVENT_EXCHANGE);
|
||||
// 队列绑定 死信RoutingKey参数
|
||||
map.put("x-dead-letter-routing-key", ORDER_QUEUE_ROUTING_KEY);
|
||||
|
||||
map.put("x-message-ttl", 60000);
|
||||
|
||||
Queue queue = new Queue(ORDER_DELAY_QUEUE, true, false, false,map);
|
||||
|
||||
return queue;
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue orderReleaseOrderQueue() {
|
||||
|
||||
Queue queue = new Queue(ORDER_RELEASE_QUEUE, true, false, false);
|
||||
|
||||
return queue;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 声明 死信队列 用于接收死信消息
|
||||
* @return deadLetterQueueA
|
||||
*/
|
||||
@Bean
|
||||
public Queue deadLetterQueueA() {
|
||||
return new Queue(DEAD_LETTER_QUEUE);
|
||||
}
|
||||
|
||||
/**
|
||||
* 将 死信队列 绑定到死信交换机上
|
||||
* @return deadLetterBindingA
|
||||
*/
|
||||
@Bean
|
||||
public Binding deadLetterBindingA() {
|
||||
return BindingBuilder
|
||||
.bind(deadLetterQueueA())
|
||||
.to(deadLetterExchange())
|
||||
.with(DEAD_LETTER_QUEUE_ROUTING_KEY);
|
||||
}
|
||||
|
||||
/**
|
||||
* 声明 延时交换机
|
||||
* @return delayExchange
|
||||
*/
|
||||
@Bean
|
||||
public DirectExchange directExchange() {
|
||||
return new DirectExchange(DELAY_EXCHANGE);
|
||||
}
|
||||
|
||||
/**
|
||||
* 将 延时队列 绑定参数
|
||||
* @return Queue
|
||||
*/
|
||||
@Bean
|
||||
public Queue delayQueueA() {
|
||||
Map<String, Object> maps = Maps.newHashMapWithExpectedSize(3);
|
||||
// 队列绑定DLX参数(关键一步)
|
||||
maps.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
|
||||
// 队列绑定 死信RoutingKey参数
|
||||
maps.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_ROUTING_KEY);
|
||||
// 消息过期采用第一种设置队列的 ttl 时间,消息过期时间全部相同。 单位:毫秒,这里设置为8秒
|
||||
// maps.put("x-message-ttl", 8000);
|
||||
return QueueBuilder.durable(DELAY_QUEUE).withArguments(maps).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 将 延时队列 绑定到延时交换机上面
|
||||
* @return delayBindingA
|
||||
*/
|
||||
@Bean
|
||||
public Binding delayBindingA() {
|
||||
return BindingBuilder
|
||||
.bind(delayQueueA())
|
||||
.to(directExchange())
|
||||
.with(DELAY_QUEUE_ROUTING_KEY);
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Binding orderCreateOrderBingding(){
|
||||
return new Binding(ORDER_DELAY_QUEUE, Binding.DestinationType.QUEUE,ORDER_EVENT_EXCHANGE,"order.create.order",null);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Binding orderReleaseOrderBingding(){
|
||||
return new Binding(ORDER_RELEASE_QUEUE, Binding.DestinationType.QUEUE,ORDER_EVENT_EXCHANGE,ORDER_QUEUE_ROUTING_KEY,null);
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue FMSQueue(){
|
||||
return new Queue(FMS_QUEUE);
|
||||
}
|
||||
@Bean
|
||||
TopicExchange FMSExchange() {
|
||||
return new TopicExchange(FMS_EXCHANGE);
|
||||
}
|
||||
|
||||
Binding bindingExchangeMessage(){
|
||||
return BindingBuilder.bind(FMSQueue()).to(FMSExchange()).with(FMS_ROUTING_KEY);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user