案例描述

订单下单后一定时间内没有支付的话,是需要被取消掉的,可以在下单时向rabbitmq发送一条定时消息,到指定时间后取出消息,检查订单有无支付,若无支付则取消掉该订单,从而就实现了业务要求。

运行环境

springboot框架:2.7.10

jdk版本:1.8

包管理工具:maven

引入依赖 | Rabbitmq

<!--   RabbitMQ 消息队列-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

基础配置类 | Rabbitmq

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @description 消息队列配置
 */
@Configuration
public class RabbitConfig {

    /**
     * 定义订单队列、交换机和路由键
     */
    public static final String ORDER_QUEUE = "order.queue";
    public static final String ORDER_EXCHANGE = "order.exchange";
    public static final String ORDER_ROUTING_KEY = "order.routing.key";

    /**
     * 定义死信队列、交换机和路由键
     */
    public static final String DEAD_LETTER_QUEUE = "order.dead.letter.queue";
    public static final String DEAD_LETTER_EXCHANGE = "order.dead.letter.exchange";
    public static final String DEAD_LETTER_ROUTING_KEY = "order.dead.letter.routing.key";

    /**
     * 定义订单队列
     * @return
     */
    @Bean
    public Queue orderQueue() {
        // 使用QueueBuilder定义持久化队列,并设置死信交换机和路由键
        return QueueBuilder.durable(ORDER_QUEUE)
                .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE)
                .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY)
                .build();
    }

    /**
     * 定义订单交换机
     * @return
     */
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(ORDER_EXCHANGE);
    }

    /**
     * 定义订单交换机和订单队列之间的绑定关系
     * @return
     */
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);
    }

    /**
     * 定义死信队列
     * @return
     */
    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }

    /**
     * 定义死信交换机
     * @return
     */
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    /**
     * 定义死信队列和死信交换机之间的绑定关系
     * @return
     */
    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(DEAD_LETTER_ROUTING_KEY);
    }
}

消息实体类

@Data
public class MqEntity implements Serializable {

    /**
    * 消息体
    */
    private List<String> orderIds;

    /**
     * 当前消息的类型,用于表示多种消息
     * 0 订单
     */
    private Integer type;

    public MqEntity(List<String> orderIds, Integer type){
        this.orderIds = orderIds;
        this.type = type;

    }

}

消息发送服务类

@Component
@Slf4j
public class CancelOrderSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

		// 订单超时时间,可在配置文件中指定,也可发送消息时单独指定
    @Value("${business.orderTimeout}")
    private Long orderTimeout;

    /**
     * 发送消息到订单队列,并设置消息的生存时间
     * @param entity 消息
     * @param delayTimes 超时时间
     */
    public void sendMessage(MqEntity entity, Long delayTimes) {
        if (null == delayTimes){
            delayTimes = orderTimeout;
        }
        Long finalDelayTimes = delayTimes;
        if (0 == entity.getType()){
            rabbitTemplate.convertAndSend(RabbitConfig.ORDER_EXCHANGE, RabbitConfig.ORDER_ROUTING_KEY, entity, new MessagePostProcessor() {
                @Override
                public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) {
                    message.getMessageProperties().setExpiration(String.valueOf(finalDelayTimes));
                    return message;
                }
            });
        }
        log.info("消息队列生产任务,订单编号:{},超时时间为:{} 秒", entity, delayTimes / 1000);
    }
}