内容简介:现在很多知名的互联网公司都有用到RabbitMQ,其性能,可扩展性让很多大公司青睐于使用它,不过想要完全使用好RabbitMQ需要掌握其核心的一些概念,这里就说说掌握RabbitMQ所需的必要知识生产者: 创建消息,然后发送到代理服务器(RabbitMQ)的程序消费者:连接到代理服务器,并订阅到队列上接收消息
现在很多知名的互联网公司都有用到RabbitMQ,其性能,可扩展性让很多大公司青睐于使用它,不过想要完全使用好RabbitMQ需要掌握其核心的一些概念,这里就说说掌握RabbitMQ所需的必要知识
生产者与消费者
生产者: 创建消息,然后发送到代理服务器(RabbitMQ)的程序
消费者:连接到代理服务器,并订阅到队列上接收消息
消息流程
AMQP协议规定,AMQP消息必须有三部分,交换机,队列和绑定。生产者把消息发送到交换机,交换机与队列的绑定关系决定了消息如何路由到特定的队列,最终被消费者接收。
Note:消息是不能直接到达队列(Queue)的
交换机
消息实际上投递到的是交换机,具体路由到那个队列由交换机根据路由键(routing key)完成。
- 当你发消息到代理服务器时,即便路由键是空的,RabbitMQ也会将其和使用的路由键进行匹配。如果路由的消息不匹配任何绑定模式,消息将会进入黑洞。
交换机在队列与消息中间起到了中间层的作用,有了交换机我们可以实现更灵活的功能,RabbitMQ中有三种常用的交换机类型:
- direct: 如果路由键匹配,消息就投递到对应的队列
- fanout:投递消息给所有绑定在当前交换机上面的队列
- topic:允许实现有趣的消息通信场景,使得5不同源头的消息能够达到同一个队列。topic队列名称有两个特殊的关键字。
* #
可以理解,direct为1v1, fanout为1v所有,topic比较灵活,可以1v任意。
虚拟主机
每一个虚拟主机(vhost)相当于mini版的RabbitMQ服务器,拥有自己的队列,交换机和绑定,权限... 这使得一个RabbitMQ服务众多的应用程序,而不会互相冲突。
rabbitMQ默认的虚拟主机为: "/" ,一般我们在创建Rabbit的用户时会再给用户分配一个虚拟主机。
操作虚拟主机,除了命令行之外还有一个web管理页面
#创建虚拟主机 rabbitmqctl add vhost [vhost_name] #删除虚拟主机 rabbitmqctl delete vhost [vhost_name] #列出虚拟主机 rabbitmqctl list_vhosts 复制代码
消息投递策略
默认情况下RabbitMQ的队列和交换机在RabbitMQ服务器重启之后会消失,原因在于队列和交换机的durable属性,该属性默认情况下为false.
能从AMQP服务器崩溃中恢复的消息称为持久化消息,如果想要从崩溃中恢复那么消息必须
- 投递模式设置2,来标记消息为持久化
- 发送到持久化的交换机
- 到到持久化的队列
缺点:消息写入磁盘性能差很多。除非特别关键的消息会使用
关键API
以上都是概念性的内容,实际我们还是要通过编程来实现我们的目的,RabbitMQ的客户端api提供了很多功能,通过看代码,来了解它的强大之处。
基本步骤之前的RabbitMQ快速入门已经提过了,Channel类是关键的部分:包含了很多我们想要的功能
消息确认
生成端可以添加监听事件:
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------no ack!-----------");
}
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------ack!-----------");
}
});
复制代码
消费端可以确认消息状态:
public class MyConsumer extends DefaultConsumer {
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("body: " + new String(body));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if((Integer)properties.getHeaders().get("num") == 0) {
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
复制代码
channel.basicAck与basicNack最后一个参数指定消息是否重回队列。
监听不可达消息
我们的消息生产者通过指定交换机和路由键来把消息送到队列中,但有时候指定的路由键不存在,或者交换机不存在,那么消息就会return,我们可以通过添加return listener来实现:
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange,
String routingKey, BasicProperties properties, byte[] body) throws IOException {
System.err.println("---------handle return----------");
System.err.println("replyCode: " + replyCode);
System.err.println("replyText: " + replyText);
System.err.println("exchange: " + exchange);
System.err.println("routingKey: " + routingKey);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
});
channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
复制代码
在basicPublish中的Mandatory要设置为true才会生效,否则broker会删除该消息
消费端限流
假设MQ服务器上面囤积了成千上万条的消息的时候,这个时候突然连接消费端,那么巨量的消息全部推过来,但是客户端无法一次性处理这么多的数据。
在高并发的时候,瞬间产生的流量很大,消息很大,而MQ有个重要的作用就是限流,限流则是消费端做的。
RabbitMQ提供了一种Qos(服务质量保证)功能,即在非自动确认消息的前提下,在一定数量的消息未被消费前,不进行消费新的消息。
// prefetchSize消息的限制大小,一般设置为0,在生产端限制 // prefetchCount 我们一次最多消费多少条消息,一般设置为1 // global,一般设置为false,在消费端进行限制 channel.basicQos(int prefetchSize, int prefetchCount, boolean global) // 使用 channel.basicQos(0, 1, false); channel.basicConsume(queueName, false, new MyConsumer(channel)); 复制代码
Note:autoAck设置为false, 一定要手工签收消息
死信队列(DLX)
当消息在队列中变成死信,没有消费者进行消费的时候,消息可能会被重新发布到另外一个队列中,这个队列就是死信队列。
以下情况会导致消息进入死信队列:
-
basic.reject/basic.nack 并且 requeue为false(不重回队列)的时候,消息就是死信
-
消息TTL过期
-
队列达到最大的长度
死信队列也是正常的Exchange,和一般的Exchange没什么区别,不过要做一点操作。
设置死信队列包括:
- 设置Exchange(dlx.exchange名称随意),设置Queue(dlx.queue),设置RoutingKey(#)
- 创建正常的交换机,队列,绑定,只不过加上一个参数 arguments.put("x-dead-letter-exchange","dlx.exchange")
// 这就是一个普通的交换机 和 队列 以及路由
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.#";
String queueName = "test_dlx_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
Map<String, Object> agruments = new HashMap<String, Object>();
agruments.put("x-dead-letter-exchange", "dlx.exchange");
//这个agruments属性,要设置到声明队列上
channel.queueDeclare(queueName, true, false, false, agruments);
channel.queueBind(queueName, exchangeName, routingKey);
//要进行死信队列的声明:
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
复制代码
最后
这里主要讲了一些使用RabbitMQ中经常涉及到的概念,懂了概念,在进行应用的时候才不至于糊涂。然后列举了MQ的 Java 客户端重要的几个API。
参考
- 《RabbitMQ实战》
- RabbitMQ消息中间件继续精讲
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Java Servlet & JSP Cookbook
Bruce W. Perry / O'Reilly Media / 2003-12-1 / USD 49.99
With literally hundreds of examples and thousands of lines of code, the Java Servlet and JSP Cookbook yields tips and techniques that any Java web developer who uses JavaServer Pages or servlets will ......一起来看看 《Java Servlet & JSP Cookbook》 这本书的介绍吧!