Fixing .gitignore
This commit is contained in:
@@ -1,76 +1,76 @@
|
||||
package com.peanut.modules.mq.Consumer;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||
import com.peanut.config.DelayQueueConfig;
|
||||
import com.peanut.modules.book.entity.MyUserEntity;
|
||||
import com.peanut.modules.book.service.MyUserService;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.Exchange;
|
||||
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||
import org.springframework.amqp.rabbit.annotation.QueueBinding;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 延时队列消息消费者
|
||||
* @author AnYuan
|
||||
*/
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
public class DelayMsgConsumer {
|
||||
|
||||
@Autowired
|
||||
private MyUserService userService;
|
||||
|
||||
// 会员开通 时间到期 进入死信队列
|
||||
|
||||
@RabbitListener(bindings = @QueueBinding(
|
||||
value = @Queue(DelayQueueConfig.DEAD_LETTER_QUEUE),
|
||||
exchange = @Exchange(DelayQueueConfig.DEAD_LETTER_EXCHANGE)))
|
||||
public void queueAConsumer(Message message) {
|
||||
|
||||
Msg msg = JSONObject.parseObject(new String(message.getBody()), Msg.class);
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
Duration duration = Duration.between(msg.getTime(), now);
|
||||
String id = msg.getMsg();
|
||||
System.out.println("--------------------------->"+id);
|
||||
MyUserEntity user = userService.getById(id);
|
||||
Date vipValidtime = user.getVipValidtime();
|
||||
Date date = new Date();
|
||||
|
||||
long times = date.getTime() - vipValidtime.getTime();
|
||||
if (times >= 0) {
|
||||
user.setVip("0");
|
||||
}
|
||||
userService.updateById(user);
|
||||
log.info("DelayMsgConsumer死信队列消费---->Msg:{}, 发送时间:{}, 当前时间:{}, 相差时间:{}秒,消息设置的ttl:{}",
|
||||
JSONObject.toJSONString(msg),
|
||||
localDateTimeToString(msg.getTime()),
|
||||
localDateTimeToString(now),
|
||||
duration.getSeconds(),
|
||||
msg.getTtl());
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class Msg {
|
||||
private String ttl;
|
||||
private String msg;
|
||||
private LocalDateTime time;
|
||||
}
|
||||
|
||||
private String localDateTimeToString(LocalDateTime localDateTime){
|
||||
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
return dateTimeFormatter.format(localDateTime);
|
||||
}
|
||||
}
|
||||
package com.peanut.modules.mq.Consumer;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||
import com.peanut.config.DelayQueueConfig;
|
||||
import com.peanut.modules.book.entity.MyUserEntity;
|
||||
import com.peanut.modules.book.service.MyUserService;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.Exchange;
|
||||
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||
import org.springframework.amqp.rabbit.annotation.QueueBinding;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 延时队列消息消费者
|
||||
* @author AnYuan
|
||||
*/
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
public class DelayMsgConsumer {
|
||||
|
||||
@Autowired
|
||||
private MyUserService userService;
|
||||
|
||||
// 会员开通 时间到期 进入死信队列
|
||||
|
||||
@RabbitListener(bindings = @QueueBinding(
|
||||
value = @Queue(DelayQueueConfig.DEAD_LETTER_QUEUE),
|
||||
exchange = @Exchange(DelayQueueConfig.DEAD_LETTER_EXCHANGE)))
|
||||
public void queueAConsumer(Message message) {
|
||||
|
||||
Msg msg = JSONObject.parseObject(new String(message.getBody()), Msg.class);
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
Duration duration = Duration.between(msg.getTime(), now);
|
||||
String id = msg.getMsg();
|
||||
System.out.println("--------------------------->"+id);
|
||||
MyUserEntity user = userService.getById(id);
|
||||
Date vipValidtime = user.getVipValidtime();
|
||||
Date date = new Date();
|
||||
|
||||
long times = date.getTime() - vipValidtime.getTime();
|
||||
if (times >= 0) {
|
||||
user.setVip("0");
|
||||
}
|
||||
userService.updateById(user);
|
||||
log.info("DelayMsgConsumer死信队列消费---->Msg:{}, 发送时间:{}, 当前时间:{}, 相差时间:{}秒,消息设置的ttl:{}",
|
||||
JSONObject.toJSONString(msg),
|
||||
localDateTimeToString(msg.getTime()),
|
||||
localDateTimeToString(now),
|
||||
duration.getSeconds(),
|
||||
msg.getTtl());
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class Msg {
|
||||
private String ttl;
|
||||
private String msg;
|
||||
private LocalDateTime time;
|
||||
}
|
||||
|
||||
private String localDateTimeToString(LocalDateTime localDateTime){
|
||||
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
return dateTimeFormatter.format(localDateTime);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,22 +1,22 @@
|
||||
package com.peanut.modules.mq.Consumer;
|
||||
|
||||
import com.peanut.common.utils.HttpClientUtils;
|
||||
import com.peanut.config.DelayQueueConfig;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.Exchange;
|
||||
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||
import org.springframework.amqp.rabbit.annotation.QueueBinding;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class FMSMsgConsumer {
|
||||
|
||||
@RabbitListener(queues = DelayQueueConfig.FMS_QUEUE)
|
||||
public void delivery(Message message){
|
||||
|
||||
System.out.println("收到消息");
|
||||
|
||||
// HttpClientUtils.sendPost("http://192.168.161.1:8080/order/online","");
|
||||
}
|
||||
}
|
||||
package com.peanut.modules.mq.Consumer;
|
||||
|
||||
import com.peanut.common.utils.HttpClientUtils;
|
||||
import com.peanut.config.DelayQueueConfig;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.Exchange;
|
||||
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||
import org.springframework.amqp.rabbit.annotation.QueueBinding;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class FMSMsgConsumer {
|
||||
|
||||
@RabbitListener(queues = DelayQueueConfig.FMS_QUEUE)
|
||||
public void delivery(Message message){
|
||||
|
||||
System.out.println("收到消息");
|
||||
|
||||
// HttpClientUtils.sendPost("http://192.168.161.1:8080/order/online","");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,19 +1,19 @@
|
||||
package com.peanut.modules.mq.service;
|
||||
|
||||
/**
|
||||
* rabbiMq服务
|
||||
* @author AnYuan
|
||||
*/
|
||||
|
||||
public interface RabbitMqService {
|
||||
|
||||
/**
|
||||
* 统一发送mq
|
||||
*
|
||||
* @param exchange 交换机
|
||||
* @param routingKey 路由key
|
||||
* @param msg 消息
|
||||
* @param ttl 过期时间
|
||||
*/
|
||||
void send(String exchange, String routingKey, String msg, Long ttl);
|
||||
}
|
||||
package com.peanut.modules.mq.service;
|
||||
|
||||
/**
|
||||
* rabbiMq服务
|
||||
* @author AnYuan
|
||||
*/
|
||||
|
||||
public interface RabbitMqService {
|
||||
|
||||
/**
|
||||
* 统一发送mq
|
||||
*
|
||||
* @param exchange 交换机
|
||||
* @param routingKey 路由key
|
||||
* @param msg 消息
|
||||
* @param ttl 过期时间
|
||||
*/
|
||||
void send(String exchange, String routingKey, String msg, Long ttl);
|
||||
}
|
||||
|
||||
@@ -1,33 +1,33 @@
|
||||
package com.peanut.modules.mq.service.impl;
|
||||
|
||||
import com.peanut.modules.mq.service.RabbitMqService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.core.MessageProperties;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* rabbitmq服务
|
||||
* @author AnYuan
|
||||
*/
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
public class RabbitMqServiceImpl implements RabbitMqService {
|
||||
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
@Override
|
||||
public void send(String exchange, String routingKey, String msg, Long ttl) {
|
||||
MessageProperties messageProperties = new MessageProperties();
|
||||
// 第二种方式设置消息过期时间
|
||||
messageProperties.setExpiration(ttl.toString());
|
||||
// 构建一个消息对象
|
||||
Message message = new Message(msg.getBytes(), messageProperties);
|
||||
// 发送RabbitMq消息
|
||||
rabbitTemplate.convertAndSend(exchange, routingKey, message);
|
||||
}
|
||||
package com.peanut.modules.mq.service.impl;
|
||||
|
||||
import com.peanut.modules.mq.service.RabbitMqService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.core.MessageProperties;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* rabbitmq服务
|
||||
* @author AnYuan
|
||||
*/
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
public class RabbitMqServiceImpl implements RabbitMqService {
|
||||
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
@Override
|
||||
public void send(String exchange, String routingKey, String msg, Long ttl) {
|
||||
MessageProperties messageProperties = new MessageProperties();
|
||||
// 第二种方式设置消息过期时间
|
||||
messageProperties.setExpiration(ttl.toString());
|
||||
// 构建一个消息对象
|
||||
Message message = new Message(msg.getBytes(), messageProperties);
|
||||
// 发送RabbitMq消息
|
||||
rabbitTemplate.convertAndSend(exchange, routingKey, message);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user