package com.peanut.config; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * RabbitMq 延时队列实现 * * @author AnYuan */ @Slf4j @Configuration public class DelayQueueConfig { /** * 延迟队列 */ public static final String DELAY_EXCHANGE = "delay.queue.business.exchange"; public static final String DELAY_QUEUE = "delay.queue.business.queue"; public static final String DELAY_QUEUE_ROUTING_KEY = "delay.queue.business.queue.routingKey"; /** * 订单队列 */ public static final String ORDER_EVENT_EXCHANGE = "order-event-exchange"; public static final String ORDER_DELAY_QUEUE = "order.delay.queue"; public static final String ORDER_RELEASE_QUEUE = "order.release.queue"; public static final String ORDER_QUEUE_ROUTING_KEY = "order.release.order"; /** * 死信队列 */ public static final String DEAD_LETTER_EXCHANGE = "delay.queue.deadLetter.exchange"; public static final String DEAD_LETTER_QUEUE_ROUTING_KEY = "delay.queue.deadLetter.delay_10s.routingKey"; public static final String DEAD_LETTER_QUEUE = "delay.queue.deadLetter.queue"; /** * 订单待支付+订单取消 */ public static final String ORDER_TO_BE_PAY_EXCHANGE = "order_to_be_pay_exchange"; public static final String ORDER_TO_BE_PAY_ROUTING_KEY = "order_to_be_pay_routingKey"; public static final String ORDER_TO_BE_PAY_QUEUE = "order_to_be_pay_queue"; public static final String ORDER_CANCEL_DEAD_LETTER_EXCHANGE = "order_cancel_dead_letter_exchange"; public static final String ORDER_CANCEL_DEAD_LETTER_ROUTING_KEY = "order_cancel_dead_letter_routingKey"; public static final String ORDER_CANCEL_DEAD_LETTER_QUEUE = "order_cancel_dead_letter_queue"; //超V过期延迟队列 public static final String USERVIP_QUEUE = "uservip_queue"; public static final String USERVIP_EXCHANGE = "uservip_exchange"; public static final String USERVIP_ROUTING_KEY = "uservip_routingKey"; //课程过期延迟队列 public static final String COURSE_QUEUE = "course_queue"; public static final String COURSE_EXCHANGE = "course_exchange"; public static final String COURSE_ROUTING_KEY = "course_routingKey"; //通用队列 public static final String COMMON_QUEUE = "common_queue"; public static final String COMMON_EXCHANGE = "common_exchange"; public static final String COMMON_ROUTING_KEY = "common_routingKey"; /** * 快递队列 */ public static final String FMS_QUEUE = "fms.queue"; public static final String FMS_EXCHANGE = "fms.exchange"; public static final String FMS_ROUTING_KEY = "fms.routing.key"; /** * 声明 死信队列 用于接收死信消息 * * @return deadLetterQueueA */ @Bean public Queue deadLetterQueueA() { return new Queue(DEAD_LETTER_QUEUE); } /** * 声明 死信交换机 * * @return deadLetterExchange */ @Bean public DirectExchange deadLetterExchange() { return new DirectExchange(DEAD_LETTER_EXCHANGE); } /** * 将 死信队列 绑定到死信交换机上 * * @return deadLetterBindingA */ @Bean public Binding deadLetterBindingA() { return BindingBuilder .bind(deadLetterQueueA()) .to(deadLetterExchange()) .with(DEAD_LETTER_QUEUE_ROUTING_KEY); } /** * 将 延时队列 绑定参数 * * @return Queue */ @Bean public Queue delayQueueA() { Map maps = Maps.newHashMapWithExpectedSize(3); // 队列绑定DLX参数(关键一步) maps.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // 队列绑定 死信RoutingKey参数 maps.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_ROUTING_KEY); // 消息过期采用第一种设置队列的 ttl 时间,消息过期时间全部相同。 单位:毫秒,这里设置为8秒 // maps.put("x-message-ttl", 8000); return QueueBuilder.durable(DELAY_QUEUE).withArguments(maps).build(); } /** * 声明 延时交换机 * * @return delayExchange */ @Bean public DirectExchange directExchange() { return new DirectExchange(DELAY_EXCHANGE); } /** * 将 延时队列 绑定到延时交换机上面 * * @return delayBindingA */ @Bean public Binding delayBindingA() { return BindingBuilder .bind(delayQueueA()) .to(directExchange()) .with(DELAY_QUEUE_ROUTING_KEY); } @Bean public Queue orderDelayQueue() { HashMap map = new HashMap<>(); // 队列绑定DLX参数(关键一步) map.put("x-dead-letter-exchange", ORDER_EVENT_EXCHANGE); // 队列绑定 死信RoutingKey参数 map.put("x-dead-letter-routing-key", ORDER_QUEUE_ROUTING_KEY); map.put("x-message-ttl", 60000); Queue queue = new Queue(ORDER_DELAY_QUEUE, true, false, false, map); return queue; } @Bean public Queue orderReleaseOrderQueue() { Queue queue = new Queue(ORDER_RELEASE_QUEUE, true, false, false); return queue; } /** * 声明 订单交换机 * * @return deadLetterExchange */ @Bean public Exchange orderEventExchange() { return new TopicExchange(ORDER_EVENT_EXCHANGE, true, false); } @Bean public Binding orderCreateOrderBingding() { return new Binding(ORDER_DELAY_QUEUE, Binding.DestinationType.QUEUE, ORDER_EVENT_EXCHANGE, "order.create.order", null); } @Bean public Binding orderReleaseOrderBingding() { return new Binding(ORDER_RELEASE_QUEUE, Binding.DestinationType.QUEUE, ORDER_EVENT_EXCHANGE, ORDER_QUEUE_ROUTING_KEY, null); } //订单队列+过期死信 @Bean public Queue orderQueue() { Map arguments = new HashMap<>(2); // 绑定死信交换机 arguments.put("x-dead-letter-exchange", ORDER_CANCEL_DEAD_LETTER_EXCHANGE); // 绑定我们的路由key arguments.put("x-dead-letter-routing-key", ORDER_CANCEL_DEAD_LETTER_ROUTING_KEY); return new Queue(ORDER_TO_BE_PAY_QUEUE, true, false, false, arguments); } @Bean public DirectExchange orderExchange() { return new DirectExchange(ORDER_TO_BE_PAY_EXCHANGE); } @Bean public Binding orderBinding() { return BindingBuilder .bind(orderQueue()) .to(orderExchange()) .with(ORDER_TO_BE_PAY_ROUTING_KEY); } @Bean public Queue orderCancelDeadLetterQueue() { return new Queue(ORDER_CANCEL_DEAD_LETTER_QUEUE); } @Bean public DirectExchange orderCancelDeadLetterExchange() { return new DirectExchange(ORDER_CANCEL_DEAD_LETTER_EXCHANGE); } @Bean public Binding binding() { return BindingBuilder .bind(orderCancelDeadLetterQueue()) .to(orderCancelDeadLetterExchange()) .with(ORDER_CANCEL_DEAD_LETTER_ROUTING_KEY); } //超V过期延迟队列 @Bean public Queue uservipQueue() { return new Queue(USERVIP_QUEUE, true, false, false); } @Bean public CustomExchange uservipExchange() { Map args = new HashMap<>(); //自定义交换机的类型 args.put("x-delayed-type", "direct"); return new CustomExchange(USERVIP_EXCHANGE, "x-delayed-message", true, false,args); } @Bean public Binding uservipBinding() { return BindingBuilder .bind(uservipQueue()) .to(uservipExchange()) .with(USERVIP_ROUTING_KEY) .noargs(); } //课程过期延迟队列 @Bean public Queue courseQueue() { return new Queue(COURSE_QUEUE, true, false, false); } @Bean public CustomExchange courseExchange() { Map args = new HashMap<>(); //自定义交换机的类型 args.put("x-delayed-type", "direct"); return new CustomExchange(COURSE_EXCHANGE, "x-delayed-message", true, false,args); } @Bean public Binding courseBinding() { return BindingBuilder .bind(courseQueue()) .to(courseExchange()) .with(COURSE_ROUTING_KEY) .noargs(); } //通用延迟队列 @Bean public Queue commonQueue() { return new Queue(COMMON_QUEUE, true, false, false); } @Bean public CustomExchange commonExchange() { Map args = new HashMap<>(); //自定义交换机的类型 args.put("x-delayed-type", "direct"); return new CustomExchange(COMMON_EXCHANGE, "x-delayed-message", true, false,args); } @Bean public Binding commonBinding() { return BindingBuilder .bind(commonQueue()) .to(commonExchange()) .with(COMMON_ROUTING_KEY) .noargs(); } @Bean public Queue FMSQueue() { return new Queue(FMS_QUEUE); } @Bean TopicExchange FMSExchange() { return new TopicExchange(FMS_EXCHANGE); } Binding bindingExchangeMessage() { return BindingBuilder .bind(FMSQueue()) .to(FMSExchange()) .with(FMS_ROUTING_KEY); } }