begin new project
This commit is contained in:
133
src/main/java/com/peanut/modules/mq/Consumer/CommonConsumer.java
Normal file
133
src/main/java/com/peanut/modules/mq/Consumer/CommonConsumer.java
Normal file
@@ -0,0 +1,133 @@
|
||||
package com.peanut.modules.mq.Consumer;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.peanut.config.DelayQueueConfig;
|
||||
import com.peanut.modules.common.dao.*;
|
||||
import com.peanut.modules.common.entity.*;
|
||||
import com.peanut.modules.common.service.*;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
//通用延迟队列
|
||||
@Component
|
||||
@Slf4j
|
||||
public class CommonConsumer {
|
||||
|
||||
@Autowired
|
||||
private ClassEntityDao classEntityDao;
|
||||
@Autowired
|
||||
private ClassExamUserDao classExamUserDao;
|
||||
@Autowired
|
||||
private ClassExamService classexamService;
|
||||
@Autowired
|
||||
private MyUserDao myUserDao;
|
||||
@Autowired
|
||||
private ClassCourseDao classCourseDao;
|
||||
@Autowired
|
||||
private UserCourseBuyDao userCourseBuyDao;
|
||||
@Autowired
|
||||
private ClassEntityService classEntityService;
|
||||
@Autowired
|
||||
private CouponHistoryService couponHistoryService;
|
||||
@Autowired
|
||||
private CourseCatalogueDao courseCatalogueDao;
|
||||
@Autowired
|
||||
private UserVipService userVipService;
|
||||
@Autowired
|
||||
private AiVipLogService aiVipLogService;
|
||||
|
||||
@RabbitListener(queues = DelayQueueConfig.COMMON_QUEUE)
|
||||
public void commonConsumer(String typeAndParam) {
|
||||
try {
|
||||
//参数为 业务模块 + , + 参数
|
||||
String[] typeAndParams = typeAndParam.split(",");
|
||||
//考试周天数,根据设置的天数将班级状态从进行中设置成考试中
|
||||
if ("examDays".equals(typeAndParams[0])){
|
||||
ClassEntity classEntity = classEntityDao.selectById(typeAndParams[1]);
|
||||
if (classEntity!=null&&!"3".equals(classEntity.getState())){
|
||||
classEntity.setState("3");
|
||||
classEntityDao.updateById(classEntity);
|
||||
}
|
||||
}
|
||||
//在考试结束时检查是否提交,未完成者自动提交
|
||||
if ("examSubmit".equals(typeAndParams[0])){
|
||||
ClassExamUser classExamUser = classExamUserDao.selectById(typeAndParams[1]);
|
||||
if (classExamUser!=null&&classExamUser.getScoreSuccess()==0){
|
||||
Map<String,Object> map = new HashMap<>();
|
||||
map.put("id",classExamUser.getId());
|
||||
classexamService.submitExamPaper(map);
|
||||
}
|
||||
}
|
||||
//加班后24小时未买课踢出去
|
||||
if ("joinClass".equals(typeAndParams[0])){
|
||||
ClassEntity classEntity = classEntityDao.selectById(typeAndParams[1]);
|
||||
if (classEntity!=null&&"0".equals(classEntity.getState())){
|
||||
MyUserEntity user = myUserDao.selectById(typeAndParams[2]);
|
||||
if (user!=null){
|
||||
List<ClassCourse> courses = classCourseDao.selectList(new LambdaQueryWrapper<ClassCourse>()
|
||||
.eq(ClassCourse::getModelId,classEntity.getModelId()));
|
||||
for (ClassCourse classCourse:courses){
|
||||
UserVip userVip = userVipService.ownCourseCatalogueByVip(user,classCourse.getCourseId());
|
||||
if (userVip == null) {
|
||||
//不是vip查询每门课是否购买
|
||||
List<CourseCatalogueEntity> catalogues = courseCatalogueDao.selectList(new LambdaQueryWrapper<CourseCatalogueEntity>()
|
||||
.eq(CourseCatalogueEntity::getCourseId,classCourse.getCourseId()));
|
||||
for (CourseCatalogueEntity catalog:catalogues){
|
||||
int ucbCount = userCourseBuyDao.selectCount(new LambdaQueryWrapper<UserCourseBuyEntity>()
|
||||
.eq(UserCourseBuyEntity::getUserId,user.getId())
|
||||
.eq(UserCourseBuyEntity::getCatalogueId,catalog.getId())).intValue();
|
||||
if (ucbCount == 0){
|
||||
Map<String,Object> map = new HashMap<>();
|
||||
map.put("classId",classEntity.getId());
|
||||
map.put("userId",user.getId());
|
||||
classEntityService.quitClass(map);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
//加班后72小时未实名
|
||||
if ("checkName".equals(typeAndParams[0])){
|
||||
ClassEntity classEntity = classEntityDao.selectById(typeAndParams[1]);
|
||||
if (classEntity!=null&&"0".equals(classEntity.getState())){
|
||||
MyUserEntity user = myUserDao.selectById(typeAndParams[2]);
|
||||
if (user!=null){
|
||||
if (StringUtils.isEmpty(user.getName())){
|
||||
Map<String,Object> map = new HashMap<>();
|
||||
map.put("classId",classEntity.getId());
|
||||
map.put("userId",user.getId());
|
||||
classEntityService.quitClass(map);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
//优惠卷过期
|
||||
if ("couponExpire".equals(typeAndParams[0])){
|
||||
CouponHistory couponHistory = couponHistoryService.getById(typeAndParams[1]);
|
||||
if (couponHistory != null&&couponHistory.getStatus()==0) {
|
||||
couponHistory.setStatus(2);
|
||||
couponHistoryService.updateById(couponHistory);
|
||||
}
|
||||
}
|
||||
//aiVip过期
|
||||
if ("aiVipExpire".equals(typeAndParams[0])){
|
||||
AiVipLog aiVipLog = aiVipLogService.getById(typeAndParams[1]);
|
||||
if (aiVipLog != null&&aiVipLog.getState()==0) {
|
||||
aiVipLog.setState(1);
|
||||
aiVipLogService.updateById(aiVipLog);
|
||||
}
|
||||
}
|
||||
}catch (Exception e) {
|
||||
log.error("mq通用队列消费异常",e.getMessage()+"-"+typeAndParam);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
package com.peanut.modules.mq.Consumer;
|
||||
|
||||
import com.peanut.config.DelayQueueConfig;
|
||||
import com.peanut.modules.common.dao.UserCourseBuyDao;
|
||||
import com.peanut.modules.common.entity.UserCourseBuyEntity;
|
||||
import org.apache.commons.lang.time.DateUtils;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
|
||||
//购买的课程到期操作
|
||||
@Component
|
||||
public class CourseConsumer {
|
||||
|
||||
@Autowired
|
||||
private UserCourseBuyDao userCourseBuyDao;
|
||||
|
||||
@RabbitListener(queues = DelayQueueConfig.COURSE_QUEUE)
|
||||
public void courseConsumer(String userCourseBuyId) {
|
||||
UserCourseBuyEntity userCourseBuyEntity = userCourseBuyDao.selectById(userCourseBuyId);
|
||||
if (userCourseBuyEntity!=null){
|
||||
if (userCourseBuyEntity.getStartTime()==null) {
|
||||
//未开始学习,超过一年自动开始
|
||||
userCourseBuyEntity.setStartTime(DateUtils.addYears(userCourseBuyEntity.getCreateTime(),1));
|
||||
userCourseBuyEntity.setEndTime(DateUtils.addDays(userCourseBuyEntity.getStartTime(),userCourseBuyEntity.getDays()));
|
||||
userCourseBuyDao.updateById(userCourseBuyEntity);
|
||||
}else {
|
||||
//结束时间,过期删除
|
||||
userCourseBuyDao.deleteById(userCourseBuyEntity.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
package com.peanut.modules.mq.Consumer;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
/**
|
||||
* 延时队列消息消费者
|
||||
* @author AnYuan
|
||||
*/
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
public class DelayMsgConsumer {
|
||||
|
||||
@Data
|
||||
public static class Msg {
|
||||
private String ttl;
|
||||
private String msg;
|
||||
private LocalDateTime time;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.peanut.modules.mq.Consumer;
|
||||
|
||||
import com.peanut.common.utils.HttpClientUtils;
|
||||
import com.peanut.config.DelayQueueConfig;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.Exchange;
|
||||
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||
import org.springframework.amqp.rabbit.annotation.QueueBinding;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class FMSMsgConsumer {
|
||||
|
||||
@RabbitListener(queues = DelayQueueConfig.FMS_QUEUE)
|
||||
public void delivery(Message message){
|
||||
|
||||
System.out.println("收到消息");
|
||||
|
||||
// HttpClientUtils.sendPost("http://192.168.161.1:8080/order/online","");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,65 @@
|
||||
package com.peanut.modules.mq.Consumer;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.peanut.config.Constants;
|
||||
import com.peanut.config.DelayQueueConfig;
|
||||
import com.peanut.modules.common.dao.BuyOrderProductDao;
|
||||
import com.peanut.modules.common.dao.ShopProductDao;
|
||||
import com.peanut.modules.common.entity.BuyOrder;
|
||||
import com.peanut.modules.common.entity.BuyOrderProduct;
|
||||
import com.peanut.modules.common.entity.ShopProduct;
|
||||
import com.peanut.modules.book.service.BuyOrderService;
|
||||
import com.peanut.modules.common.service.CouponService;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @Description: 消息超时取消死信队列消费者
|
||||
* @Author: Cauchy
|
||||
* @CreateTime: 2023/10/10
|
||||
*/
|
||||
@Component
|
||||
public class OrderCancelConsumer {
|
||||
|
||||
@Autowired
|
||||
BuyOrderService buyOrderService;
|
||||
@Autowired
|
||||
BuyOrderProductDao buyOrderProductDao;
|
||||
@Autowired
|
||||
ShopProductDao shopProductDao;
|
||||
|
||||
@RabbitListener(queues = DelayQueueConfig.ORDER_CANCEL_DEAD_LETTER_QUEUE)
|
||||
public void orderConsumer(String orderId) {
|
||||
if (StringUtils.isNotEmpty(orderId)){
|
||||
BuyOrder buyOrder = buyOrderService.getById(orderId);
|
||||
if(buyOrder == null){
|
||||
return;
|
||||
}
|
||||
if(Constants.ORDER_STATUS_TO_BE_PAID.equals(buyOrder.getOrderStatus())){
|
||||
buyOrder.setOrderStatus(Constants.ORDER_STATUS_OUT_OF_TIME);
|
||||
//回滚优惠卷
|
||||
if (buyOrder.getCouponId()!=null&&buyOrder.getCouponId()!=0){
|
||||
buyOrder.setCouponId(null);
|
||||
}
|
||||
//回滚库存
|
||||
LambdaQueryWrapper<BuyOrderProduct> wrapper = new LambdaQueryWrapper<>();
|
||||
wrapper.eq(BuyOrderProduct::getOrderId,buyOrder.getOrderId());
|
||||
List<BuyOrderProduct> buyOrderProducts = buyOrderProductDao.selectList(wrapper);
|
||||
for (BuyOrderProduct b : buyOrderProducts){
|
||||
ShopProduct shopProduct = shopProductDao.selectById(b.getProductId());
|
||||
if (shopProduct!=null){
|
||||
shopProduct.setProductStock(shopProduct.getProductStock()+b.getQuantity());
|
||||
shopProductDao.updateById(shopProduct);
|
||||
}
|
||||
}
|
||||
buyOrderService.updateById(buyOrder);
|
||||
}
|
||||
if(Constants.ORDER_STATUS_OUT_OF_TIME.equals(buyOrder.getOrderStatus())){
|
||||
buyOrderService.removeById(buyOrder);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,39 @@
|
||||
package com.peanut.modules.mq.Consumer;
|
||||
|
||||
import com.peanut.config.DelayQueueConfig;
|
||||
import com.peanut.modules.common.dao.MyUserDao;
|
||||
import com.peanut.modules.common.dao.UserVipDao;
|
||||
import com.peanut.modules.common.entity.MyUserEntity;
|
||||
import com.peanut.modules.common.entity.UserVip;
|
||||
import org.apache.commons.lang.time.DateUtils;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
//超v过期死信操作
|
||||
@Component
|
||||
public class UserVipConsumer {
|
||||
|
||||
@Autowired
|
||||
private UserVipDao userVipDao;
|
||||
@Autowired
|
||||
private MyUserDao userDao;
|
||||
|
||||
@RabbitListener(queues = DelayQueueConfig.USERVIP_QUEUE)
|
||||
public void userVipConsumer(String userVipId) {
|
||||
// UserVip userVip = userVipDao.selectById(userVipId);
|
||||
// if(userVip != null){
|
||||
// if(DateUtils.addSeconds(new Date(),3).getTime()>userVip.getEndTime().getTime()){
|
||||
// userVip.setState(1);
|
||||
// userVipDao.updateById(userVip);
|
||||
// MyUserEntity user = userDao.selectById(userVip.getUserId());
|
||||
// user.setVip("0");
|
||||
// userDao.updateById(user);
|
||||
//// System.out.println(user.getName()+"-"+user.getTel()+"-"+user.getEmail()+"超v过期,vip更新完成");
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
package com.peanut.modules.mq.service;
|
||||
|
||||
/**
|
||||
* rabbiMq服务
|
||||
* @author AnYuan
|
||||
*/
|
||||
|
||||
public interface RabbitMqService {
|
||||
|
||||
/**
|
||||
* 统一发送mq
|
||||
*
|
||||
* @param exchange 交换机
|
||||
* @param routingKey 路由key
|
||||
* @param msg 消息
|
||||
* @param ttl 过期时间
|
||||
*/
|
||||
void send(String exchange, String routingKey, String msg, Long ttl);
|
||||
}
|
||||
@@ -0,0 +1,33 @@
|
||||
package com.peanut.modules.mq.service.impl;
|
||||
|
||||
import com.peanut.modules.mq.service.RabbitMqService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.core.MessageProperties;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* rabbitmq服务
|
||||
* @author AnYuan
|
||||
*/
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
public class RabbitMqServiceImpl implements RabbitMqService {
|
||||
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
@Override
|
||||
public void send(String exchange, String routingKey, String msg, Long ttl) {
|
||||
MessageProperties messageProperties = new MessageProperties();
|
||||
// 第二种方式设置消息过期时间
|
||||
messageProperties.setExpiration(ttl.toString());
|
||||
// 构建一个消息对象
|
||||
Message message = new Message(msg.getBytes(), messageProperties);
|
||||
// 发送RabbitMq消息
|
||||
rabbitTemplate.convertAndSend(exchange, routingKey, message);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user