死信队列
1.队列nack, reject, 并且requeue = false
2.消息队列的长度达到了饱和, 已满了
3.消息的ttl 时间到了
被接受的队列拒绝了
public class Consumer {
public static void main(String[] args) throws IOException {
Connection connection = ConnectionFactory.getConnection();
Channel channel = connection.createChannel();
String dlxQueueName = "dlx.queue.nack";
String queueName = "test_queue.nack";
//控制消费者每次只消费1个消息, 抓取消息的最大数
channel.basicQos(1);
/*监听死信队列*/
channel.basicConsume(dlxQueueName, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String json = new String(body);
Message message = JSONUtil.toBean(json, Message.class);
Date date = new Date();
long between = DateUtil.between(message.getTimestamp(), date, DateUnit.SECOND);
String format = String.format("[%s] Received message:%s ; spend seconds: %d s ", dlxQueueName, message.toString(), between);
System.out.println(format);
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
/*监听目标队列*/
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("拒收消息");
channel.basicNack(envelope.getDeliveryTag(), false, false);
}
});
}
}
消息队列已满了
public class Consumer {
public static void main(String[] args) throws IOException {
Connection connection = ConnectionFactory.getConnection();
Channel channel = connection.createChannel();
String dlxQueueName = "dlx.queue.length";
String queueName = "test_queue.length";
//控制消费者每次只消费1个消息, 抓取消息的最大数
channel.basicQos(1);
/*监听死信队列*/
channel.basicConsume(dlxQueueName, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String json = new String(body);
Message message = JSONUtil.toBean(json, Message.class);
Date date = new Date();
long between = DateUtil.between(message.getTimestamp(), date, DateUnit.SECOND);
String format = String.format("[%s] Received message:%s ; spend seconds: %d s ", dlxQueueName, message.toString(), between);
System.out.println(format);
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
/*监听目标队列*/
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String json = new String(body);
Message message = JSONUtil.toBean(json, Message.class);
Date date = new Date();
long between = DateUtil.between(message.getTimestamp(), date, DateUnit.SECOND);
System.out.println(String.format("[%s] Received message:%s ; spend seconds: %d s ", queueName, message.toString(), between));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
第三种情况就是 ttl 时间到了
public class Consumer {
public static void main(String[] args) throws IOException {
Connection connection = ConnectionFactory.getConnection();
Channel channel = connection.createChannel();
String dlxQueueName = "dlx.queue";
String queueName = "test_queue";
//控制消费者每次只消费1个消息, 抓取消息的最大数
channel.basicQos(1);
/*监听死信队列*/
channel.basicConsume(dlxQueueName, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String json = new String(body);
Message message = JSONUtil.toBean(json, Message.class);
Date date = new Date();
long between = DateUtil.between(message.getTimestamp(), date, DateUnit.SECOND);
String format = String.format("[%s] Received message:%s ; spend seconds: %d s ", dlxQueueName, message.toString(), between);
System.out.println(format);
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
/*监听目标队列*/
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String json = new String(body);
Message message = JSONUtil.toBean(json, Message.class);
Date date = new Date();
//睡眠5秒收到1条消息, 然后在30s 内会收到6条消息的投递,其他的消息会被死信队列消费掉
try {
Thread.sleep(5_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
long between = DateUtil.between(message.getTimestamp(), date, DateUnit.SECOND);
System.out.println(String.format("[%s] Received message:%s ; spend seconds: %d s ", queueName, message.toString(), between));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
public class Producer {
// # 匹配多个词
//* 匹配一个词
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionFactory.getConnection();
String dlxQueueName = "dlx.queue.nack";
String dlxExchangeName = "dlx.exchange.nack";
String dlxRoutingKey = "topic.#";
Channel channel = connection.createChannel();
String exchangeName = "test_exchange.nack";
String queueName = "test_queue.nack";
String routingKey = "topic.item3";
Map<String, Object> arguments = new HashMap<>(16);
arguments.put("x-dead-letter-exchange", dlxExchangeName);
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, arguments);
channel.queueBind(queueName, exchangeName, routingKey);
//dlx queue config
channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);
channel.queueDeclare(dlxQueueName, true, false, false, null);
channel.queueBind(dlxQueueName, dlxExchangeName, dlxRoutingKey);
for (int i=0; i<10; i++){
Message message = new Message();
message.setMsg("this is a message test ttl");
message.setTimestamp(new Date());
channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, JSONUtil.toJsonStr(message).getBytes());
}
System.out.println("==>已经发送消息");
channel.close();
connection.close();
}
}
public class Producer {
// # 匹配多个词
//* 匹配一个词
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionFactory.getConnection();
String dlxQueueName = "dlx.queue.length";
String dlxExchangeName = "dlx.exchange.length";
String dlxRoutingKey = "topic.#";
Channel channel = connection.createChannel();
String exchangeName = "test_exchange.length";
String queueName = "test_queue.length";
String routingKey = "topic.item2";
Map<String, Object> arguments = new HashMap<>(16);
arguments.put("x-dead-letter-exchange", dlxExchangeName);
//30s time to live 存活时间
arguments.put("x-max-length", 5);
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, arguments);
channel.queueBind(queueName, exchangeName, routingKey);
//dlx queue config
channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);
channel.queueDeclare(dlxQueueName, true, false, false, null);
channel.queueBind(dlxQueueName, dlxExchangeName, dlxRoutingKey);
for (int i=0; i<10; i++){
Message message = new Message();
message.setMsg("this is a message test ttl");
message.setTimestamp(new Date());
channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, JSONUtil.toJsonStr(message).getBytes());
}
System.out.println("==>已经发送消息");
channel.close();
connection.close();
}
}
public class Producer {
// # 匹配多个词
//* 匹配一个词
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionFactory.getConnection();
String dlxQueueName = "dlx.queue";
String dlxExchangeName = "dlx.exchange";
String dlxRoutingKey = "topic.#";
Channel channel = connection.createChannel();
String exchangeName = "test_exchange";
String queueName = "test_queue";
String routingKey = "topic.item";
Map<String, Object> arguments = new HashMap<>(16);
arguments.put("x-dead-letter-exchange", dlxExchangeName);
//30s time to live 存活时间
arguments.put("x-message-ttl", 30_000);
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, arguments);
channel.queueBind(queueName, exchangeName, routingKey);
//dlx queue config
channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);
channel.queueDeclare(dlxQueueName, true, false, false, null);
channel.queueBind(dlxQueueName, dlxExchangeName, dlxRoutingKey);
for (int i=0; i<10; i++){
Message message = new Message();
message.setMsg("this is a message test ttl");
message.setTimestamp(new Date());
channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, JSONUtil.toJsonStr(message).getBytes());
}
System.out.println("==>已经发送消息");
channel.close();
connection.close();
}
}
@Data
public class Message {
private String msg;
private Date timestamp;
@Override
public String toString() {
final StringBuffer sb = new StringBuffer("Message{");
sb.append("msg='").append(msg).append('\'');
sb.append(", timestamp=").append(DateUtil.format(timestamp, DatePattern.NORM_DATETIME_PATTERN));
sb.append('}');
return sb.toString();
}
}
总结一下整理成demo
参考了: https://www.cnblogs.com/yangk1996/p/12674015.html