Java RabbitMQ的TTL和DLX全面精解

本节继续介绍RabbitMQ的高级特性:TTL(Time-To-Live消息有效期)和DLX(Dead-Letter-Exchange死信交换机、死信队列)

RabbitMQ的TTL

1、TTL概述

RabbitMQ的TTL全称为Time-To-Live,表示的是消息的有效期。消息如果在队列中一直没有被消费并且存在时间超过了TTL,消息就会变成了"死信" (Dead Message),后续无法再被消费了。设置TTL有两种方式:

  1. 第一种是声明队列的时候,在队列的属性中设置,这样该队列中的消息都会有相同的有效期;
  2. 第二种是发送消息时给消息设置属性,可以为每条消息都设置不同的TTL。

如果两种方式都设置了,则以设置的较小的为准。两者的区别:如果声明队列时设置了有效期,则消息过期了就会被删掉;如果是发消息时设置的有效期,消息过期了也不会被立马删掉,因为这时消息是否过期是在要投递给消费者时判断的。至于为啥要这样处理很容易想清楚:第一种方式队列的消息有效期都一样,先入队的在队列头部,头部也是最早要过期的消息,RabbitMQ起一个定时任务从队列的头部开始扫描是否有过期消息即可;第二种方式每条消息的过期时间不同,所以只有遍历整个队列才可以筛选出来过期的消息,这样效率太低了,而且消息量大了之后根本不可行的,可以等到消息要投递给消费者时再判断删除,虽然删除的不够及时但是不影响功能,其实就是用空间换时间。

如果不设置TTL,则表示此消息永久有效(默认消息是不会失效的)。如果将TTL设为0,则表示如果消息不能被立马消费则会被立即丢掉,这个特性可以部分替代RabbitMQ3.0以前支持的immediate参数,之所以所部分代替,是应为immediate参数在投递失败会有basic.return方法将消息体返回(这个功能可以利用死信队列来实现)。

2、设置消息有效期

2.1、通过队列设置有效期

还记得我们之前声明队列的方法吗,queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments),该方法的最后一个参数可以设置队列的属性,属性名为x-message-ttl,单位为毫秒。如果不清楚队列属性有哪些,可以查看web控制台的添加队列的地方。

具体代码如下:

//设置队列上所有的消息的有效期,单位为毫秒
Map<String, Object> argss = new HashMap<String , Object>();
arguments.put("x-message-ttl " , 5000);//5秒钟
channel.queueDeclare(queueName , durable , exclusive , autoDelete , arguments) ;

查看控制台的队列列表如下:D表示持久化,TTL表示设置了消息的有效期。

过了几秒钟后发现消息已经不存在了。

也可以用RabbitMQ的命令行模式来设置:

rabbitmqctl set_policy TTL ".*" "{"message-ttl":60000}" --apply-to queues

还可以通过HTTP接口调用:

$ curl -i -u guest:guest -H "content-type:application/json"  -XPUT -d"{"auto_delete":false,"durable":true,"arguments":{"x-message-ttl": 60000}}" 
http://ip:15672/api/queues/{vhost}/{queuename}

2.2、通过发送消息时设置有效期

发送消息时basicPublish方法可以设置属性参数,里面通过expiration属性设置消息有效期,单位为毫秒,代码如下所示

Builder bd = new AMQP.BasicProperties().builder();
bd.deliveryMode(2);//持久化
bd.expiration("100000");//设置消息有效期100秒钟
BasicProperties pros = bd.build();
String message = "测试ttl消息";
channel.basicPublish(EXCHANGE_NAME, "error", true,false, pros, message.getBytes());

另外也可以通过HTTPAPI 接口设置:

$ curl -i -u guest:guest -H "content-type:application/json"  -XPOST -d
"{"properties":{"expiration":"60000"},"routing_key":"routingkey","payload":"my body","payload_encoding":"string"}"  
http://localhost:15672/api/exchanges/{vhost}/{exchangename}/publish

完整的通过队列设置消息有效期、发布消息时通过属性设置有效期的代码如下:可以运行后,观察下控制台,可以发现同时设置时,消息的有效期是以较小的为准的。项目GitHub地址 https://github.com/RookieMember/RabbitMQ-Learning.git。

package cn.wkp.rabbitmq.newest.ttl;
 
import java.util.HashMap;
import java.util.Map;
 
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AMQP.BasicProperties.Builder;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
 
import cn.wkp.rabbitmq.util.ConnectionUtil;
 
/**
 * 
 * @ClassName: Send
 * @Description: 消息有效期 
 * @author wkg
 * @date: 2019年4月1日 下午11:28:22
 */
public class Send {
 
	private final static String EXCHANGE_NAME = "ttl_exchange";
	private final static String QUEUE_NAME = "ttl_queue";
 
	public static void main(String[] argv) throws Exception {
		// 获取到连接以及mq通道
		Connection connection = ConnectionUtil.getConnection();
		// 从连接中创建通道
		Channel channel = connection.createChannel();
 
		// 声明交换机
		channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);
		
		//*****1:通过队列设置有效期 2:通过消息属性设置有效期,如果都设置了以较小的为准*****
		//声明队列
		Map<String, Object> arguments=new HashMap<String,Object>();
		//设置队列上所有的消息的有效期,单位为毫秒
		arguments.put("x-message-ttl", 5000);//5秒钟
		channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
		//绑定
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
		
		Builder bd = new AMQP.BasicProperties().builder();
		bd.deliveryMode(2);//持久化
		bd.expiration("100000");//设置消息有效期100秒钟
		BasicProperties pros = bd.build();
		String message = "测试ttl消息";
		channel.basicPublish(EXCHANGE_NAME, "error", true,false, pros, message.getBytes());
		System.out.println("Sent message:" + message);
//		 关闭通道和连接
		channel.close();
		connection.close();
	}
}

3、设置队列有效期(不常用,仅作了解)

上面在web管控台添加队列的时候,我们看到有一个x-expires参数,可以让队列在指定时间内 "未被使用" 的话会自动过期删除,未使用的意思是 queue 上没有任何 consumer,queue 没有被重新声明,并且在过期时间段内未调用过 basic.get 命令。该方式可用于,例如,RPC-style 的回复 queue, 其中许多queue 会被创建出来,但是却从未被使用。

服务器会确保在过期时间到达后 queue 被删除,但是不保证删除的动作有多么的及时。在服务器重启后,持久化的queue 的超时时间将重新计算。 x-expires 参数值以毫秒为单位,并且服从和 x-message-ttl 一样的约束条件,且不能设置为 0 。所以,如果该参数设置为 1000 ,则表示该 queue 如果在 1s之内未被使用则会被删除。

Map<String, Object> args = new HashMap<String, Object>();  
args.put("x-expires", 18000);  //队列有效期18秒
channel.queueDeclare("myqueue", false, false, false, args);  

RabbitMQ的DLX

1、DLX是什么

DLX是Dead-Letter-Exchange的简写,意思是死信交换机。

它的作用其实是用来接收死信消息(dead message)的。那什么是死信消息呢?一般消息变成死信消息有如下几种情况:

  • 消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false
  • 消息过期
  • 队列达到最大长度

当消息在一个队列中变成了死信消息后,可以被发送到另一个交换机,这个交换机就是DLX,绑定DLX的队列成为死信队列。当这个队列中存在死信时, RabbitMQ 就会立即自动地将这个消息重新发布到设置的DLX 上去,进而被路由到绑定该DLX的死信队列上。可以监听这个队列中的消息、以进行相应的处理,这个特性与将消息的TTL 设置为0 配合使用可以弥补imrnediate 参数的功能。

2、DLX有什么用

因为消息如果未被正常消费并设置了requeue为false时会进入死信队列,我们可以监控消费死信队列中消息,来观察和分析系统的问题。DLX还有一个非常重要的作用,就是结合TTL实现延迟队列(延迟队列的使用范围还是挺广的:比如下单超过多长时间自动关闭;比如我们接入过第三方支付系统的同学一定知道,我们的订单中会传一个notify_url用于接收支付结果知,如果我们给第三方支付响应的不是成功的消息,其会隔一段时间继续调用通知我们的notify_url,超过几次后不再进行通知,一般通知频率都是 0秒-5秒-30秒-5分钟-30分钟-1小时-6小时-12小时;比如我们的家用电器定时关机。。。。。。这些场景都是可以用延迟队列实现的)。

3、DLX使用方式

下面在web管控台添加队列的时候,我们看到有两个DLX相关的参数:x-dead-letter-exchange和x-dead-letter-routing-key。x-dead-letter-exchange是设置队列的DLX的;x-dead-letter-routing-key是设置死信消息进入DLX时的routing key的,这个是可以不设置的,如果不设置,则默认使用原队列的routing key。

客户端可以通过channel.queueDeclare方法声明队列时设置x-dead-letter-exchange参数,具体代码如下所示

channel.exchangeDeclare("dlx_exchange" , "direct"); //创建DLX: dlx_exchange
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange" , "dlx_exchange ");//设置DLX
args.put("x-dead-letter-routing-key" , "dlx-routing-key");//设置DLX的路由键(可以不设置)
//为队列myqueue 添加DLX
channel.queueDeclare("myqueue" , false , false , false , args);

上面说的可能比较抽象,下面我们通过一个具体的例子,来演示一下DLX的具体使用:

package cn.wkp.rabbitmq.newest.dlx;
 
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
 
import cn.wkp.rabbitmq.util.ConnectionUtil;
 
public class SendDLX {
 
	public static void main(String[] args) throws Exception {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		//声明一个交换机,做死信交换机用
		channel.exchangeDeclare("dlx_exchange", "topic", true, false, null);
		//声明一个队列,做死信队列用
		channel.queueDeclare("dlx_queue", true, false, false, null);
		//队列绑定到交换机上
		channel.queueBind("dlx_queue", "dlx_exchange", "dlx.*");
		
		channel.exchangeDeclare("normal_exchange", "fanout", true, false, null);
		Map<String, Object> arguments=new HashMap<String, Object>();
		arguments.put("x-message-ttl" , 5000);//设置消息有效期1秒,过期后变成私信消息,然后进入DLX
		arguments.put("x-dead-letter-exchange" , "dlx_exchange");//设置DLX
		arguments.put("x-dead-letter-routing-key" , "dlx.test");//设置DLX的路由键(可以不设置)
		//为队列normal_queue 添加DLX
		channel.queueDeclare("normal_queue", true, false, false, arguments);
		channel.queueBind("normal_queue", "normal_exchange", "");
		
		channel.basicPublish("normal_exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, ("测试死信消息").getBytes());
		System.out.println("发送消息时间:"+ConnectionUtil.formatDate(new Date()));
		
		channel.close();
		connection.close();
	}
}

上面是发送者的代码,运行后观察控制台可以看到如下所示:

死信队列dlx_queue的绑定如下,其已与死信交换机dlx_exchange(topic类型)进行了绑定,routing key为"dlx.*"

队列normal_queue的绑定如下,其已与交换机normal_exchange(fanout类型)进行了绑定

queues视图如下:DLX和DLK表示设置给normal_queue设置了死信交换机和死信消息的routing key,我们看到消息已经被路由到了死信队列上面。整个流程为:

  • 消息发送到交换机normal_exchange,然后路由到队列normal_queue上
  • 因为队列normal_queue没有消费者,消息过期后成为死信消息
  • 死信消息携带设置的x-dead-letter-routing-key=dlx.test进入到死信交换机dlx_exechage
  • dlx_exechage与dlx_queue绑定的routing key为"dlx.*",死信消息的路由键dlx.test符合该规则被路由到dlx.queue上面。

然后我们给死信队列添加消费者如下:我们测试一下死信消息进入DLX的时间,先将之前的那个死信消息删除

package cn.wkp.rabbitmq.newest.dlx;
 
import java.io.IOException;
import java.util.Date;
 
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
 
import cn.wkp.rabbitmq.util.ConnectionUtil;
 
public class RecvDLX {
 
	public static void main(String[] argv) throws Exception {
		Connection connection = ConnectionUtil.getConnection();
		final Channel channel = connection.createChannel();
 
		channel.exchangeDeclare("dlx_exchange", "topic", true, false, null);
		channel.queueDeclare("dlx_queue", true, false, false, null);
		channel.queueBind("dlx_queue", "dlx_exchange", "dlx.*");
 
		// 指该消费者在接收到队列里的消息但没有返回确认结果之前,它不会将新的消息分发给它。
		channel.basicQos(1);
 
		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				System.out.println("消费者收到消息:" + new String(body)+",当前时间:"+ConnectionUtil.formatDate(new Date()));
				// 消费者手动发送ack应答
				channel.basicAck(envelope.getDeliveryTag(), false);
			}
		};
		System.out.println("消费死信队列中的消息======================");
		// 监听队列
		channel.basicConsume("dlx_queue", false, consumer);
	}
}

运行结果如下(先运行的死信队列消费者,然后运行生产者):我们看到消息过期后10毫秒就被死信队列的消费者消费到了,显然,消息成为死信后是立即被发送到了DLX中。

消费死信队列中的消息======================
消费者收到消息:测试死信消息,当前时间:2019-04-13 16:30:05:740

发送消息时间:2019-04-13 16:30:00:730

关于RabbitMQ的TTL和DLX就先介绍到这里,下一节会继续介绍RabbitMQ的高级特性:RabbitMQ的延迟队列。

参考 朱忠华《RabbitMQ实战指南》

到此这篇关于Java RabbitMQ的TTL和DLX全面精解的文章就介绍到这了,更多相关Java RabbitMQ TTL DLX内容请搜索云海天教程以前的文章或继续浏览下面的相关文章希望大家以后多多支持云海天教程!