77 lines
2.6 KiB
Java
77 lines
2.6 KiB
Java
package com.peanut.modules.mq.Consumer;
|
||
|
||
import com.alibaba.fastjson.JSONObject;
|
||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||
import com.peanut.config.DelayQueueConfig;
|
||
import com.peanut.modules.book.entity.MyUserEntity;
|
||
import com.peanut.modules.book.service.MyUserService;
|
||
import lombok.Data;
|
||
import lombok.extern.slf4j.Slf4j;
|
||
import org.springframework.amqp.core.Message;
|
||
import org.springframework.amqp.rabbit.annotation.Exchange;
|
||
import org.springframework.amqp.rabbit.annotation.Queue;
|
||
import org.springframework.amqp.rabbit.annotation.QueueBinding;
|
||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||
import org.springframework.beans.factory.annotation.Autowired;
|
||
import org.springframework.stereotype.Component;
|
||
|
||
import java.time.Duration;
|
||
import java.time.LocalDateTime;
|
||
import java.time.format.DateTimeFormatter;
|
||
import java.util.Date;
|
||
import java.util.Map;
|
||
|
||
/**
|
||
* 延时队列消息消费者
|
||
* @author AnYuan
|
||
*/
|
||
|
||
@Component
|
||
@Slf4j
|
||
public class DelayMsgConsumer {
|
||
|
||
@Autowired
|
||
private MyUserService userService;
|
||
|
||
// 会员开通 时间到期 进入死信队列
|
||
|
||
@RabbitListener(bindings = @QueueBinding(
|
||
value = @Queue(DelayQueueConfig.DEAD_LETTER_QUEUE),
|
||
exchange = @Exchange(DelayQueueConfig.DEAD_LETTER_EXCHANGE)))
|
||
public void queueAConsumer(Message message) {
|
||
|
||
Msg msg = JSONObject.parseObject(new String(message.getBody()), Msg.class);
|
||
LocalDateTime now = LocalDateTime.now();
|
||
Duration duration = Duration.between(msg.getTime(), now);
|
||
String id = msg.getMsg();
|
||
System.out.println("--------------------------->"+id);
|
||
MyUserEntity user = userService.getById(id);
|
||
Date vipValidtime = user.getVipValidtime();
|
||
Date date = new Date();
|
||
|
||
long times = date.getTime() - vipValidtime.getTime();
|
||
if (times >= 0) {
|
||
user.setVip("0");
|
||
}
|
||
userService.updateById(user);
|
||
log.info("DelayMsgConsumer死信队列消费---->Msg:{}, 发送时间:{}, 当前时间:{}, 相差时间:{}秒,消息设置的ttl:{}",
|
||
JSONObject.toJSONString(msg),
|
||
localDateTimeToString(msg.getTime()),
|
||
localDateTimeToString(now),
|
||
duration.getSeconds(),
|
||
msg.getTtl());
|
||
}
|
||
|
||
@Data
|
||
public static class Msg {
|
||
private String ttl;
|
||
private String msg;
|
||
private LocalDateTime time;
|
||
}
|
||
|
||
private String localDateTimeToString(LocalDateTime localDateTime){
|
||
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||
return dateTimeFormatter.format(localDateTime);
|
||
}
|
||
}
|