利用RabbitMQ 的死信队列来做定时任务

归云
归云
发布于 2020-01-07 / 2983 阅读
0
0

利用RabbitMQ 的死信队列来做定时任务

常用的应用场景

死信队列常常用作延时关闭订单(如订单的超时后的取消订单等),虽然小项目中可以用定时轮询的方法进行检查,但是数据量一旦比较大时,定时轮询将给数据库带来不小的压力,而且定时间隔无法进行动态调整,特别是一个系统中,同时存在好几个定时器的时候,就显得非常的麻烦,同时给数据库造成巨大的访问压力。这时候就可以使RabbitMQ的死信队列。

概念解释

DLX

  • Dead Letter Exchange 的缩写
  • DLX也叫死信邮箱(网上的译法),死信交换机(字面翻译)。归根结底就是一个交换机,当队列中出现死信时,通过这个交换机将死信重新发送到死信队列中(指定好rabbitmq会自动发送)。

什么是死信

  • 消息被拒绝(basic.reject或basic.nack)并且requeue=false.
  • 消息TTL过期
  • 队列达到最大长度(队列满了,无法再添加数据到mq中)

死信交换机

  • 在定义业务队列的时候,要考虑指定一个死信交换机,死信交换机可以和任何一个普通的队列进行绑定,然后在业务队列出现死信的时候就会将数据发送到死信队列。

什么是死信队列

  • 死信队列实际上就是一个普通的队列,只是这个队列跟死信交换机进行了绑定,用来存放死信而已。

消息变成死信后,会被重新投递(publish)到另一个交换机上(Exchange),这个交换机就被称作DLX及死信交换机,然后交换机根据绑定规则转发到对应的队列上,监听该队列就可以被重新消费。

生产者-->发送消息-->交换机-->队列-->变成死信队列-->DLX交换机-->队列-->监听-->消费者

示例代码

添加依赖

implementation 'org.springframework.boot:spring-boot-starter-amqp'

添加配置

spring:
  rabbitmq:
    host: 10.0.0.19
    port: 5672
    username: guiyun
    password: 111222

添加配置类

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author Guiyun
 * @date 2020/1/7 下午 1:51
 */
@Configuration
public class RabbitConfig{

    /**
     * 死信队列 交换机标识符
     */
    private static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
    /**
     * 死信队列 交换机绑定键标识符
     */
    private static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";


    /**
     * 创建死信交换机
     * @return
     */
    @Bean("deadLetterExchange")
    public Exchange deadLetterExchange() {
        return ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build();
    }


    /**
     * 创建一个死信队列.
     * @return
     */
    @Bean("deadLetterQueue")
    public Queue deadLetterQueue() {
        Map<String, Object> args = new HashMap<>(2);
        //声明 死信交换机
        args.put(DEAD_LETTER_QUEUE_KEY, "DL_EXCHANGE");
        //声明 死信路由键
        args.put(DEAD_LETTER_ROUTING_KEY, "KEY_R");
        return QueueBuilder.durable("DL_QUEUE").withArguments(args).build();
    }

    /**
     * 定义死信队列转发队列.
     * @return the queue
     */
    @Bean("redirectQueue")
    public Queue redirectQueue() {
        return QueueBuilder.durable("REDIRECT_QUEUE").build();
    }

    /**
     * 死信路由通过 DL_KEY 绑定键绑定到死信队列上.
     * @return the binding
     */
    @Bean
    public Binding deadLetterBinding() {
        return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null);

    }

    /**
     * 死信路由通过 KEY_R 绑定键绑定到死信队列上.
     * @return the binding
     */
    @Bean
    public Binding redirectBinding() {
        return new Binding("REDIRECT_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "KEY_R", null);
    }

}

生产者向业务队列发送消息

@Autowired
RabbitTemplate rabbitTemplate;

public void spend(JSONObject data) {
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    MessagePostProcessor messagePostProcessor = message -> {
        MessageProperties messageProperties = message.getMessageProperties();
        // 设置编码
        messageProperties.setContentEncoding("utf-8");
        // 设置过期时间5秒
        messageProperties.setExpiration(5000);
        return message;
    };
    rabbitTemplate.convertAndSend("DL_EXCHANGE", "DL_KEY", data, messagePostProcessor, correlationData);
}
    

死信队列消费者

import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author Guiyun
 * @date 2020/1/7 上午 11:09
 */
@Component
@RabbitListener(queues = "DL_QUEUE")
public class DeadLetterConsumer {

    private static final Logger log = LoggerFactory.getLogger(DeadLetterConsumer.class);


    @RabbitListener(queues = {"REDIRECT_QUEUE"})
    public void redirect(JSONObject json) throws IOException {
        log.info(json.toJSONString());
    }
}


评论