内容简介:小时候就特别喜欢龙系精灵,特别是乘龙,后来才知道只是冰水。。。尴尬。 在宠物小精灵中,乘龙一直是训练家的载人伙伴,和我们下面的MQ好像有几分相似呢~~MQ,全称消息队列,现在市面上有很多种消息队列,像大家耳熟能详的RabbitMQ,RocketMQ,Kafka等等,接下来为大家详细的介绍消息队列。俗话说的好,多用多错,不能为了技术而技术,要结合实际的业务场景使用合适的技术。
小时候就特别喜欢龙系精灵,特别是乘龙,后来才知道只是冰水。。。尴尬。 在宠物小精灵中,乘龙一直是训练家的载人伙伴,和我们下面的MQ好像有几分相似呢~~
前言
MQ,全称消息队列,现在市面上有很多种消息队列,像大家耳熟能详的RabbitMQ,RocketMQ,Kafka等等,接下来为大家详细的介绍消息队列。
使用场景
俗话说的好,多用多错,不能为了技术而技术,要结合实际的业务场景使用合适的技术。
例如你用了 Redis 缓存,这时候你也许得考虑主从架构。因为主从架构,你可能得考虑主从切换。同时也许你还得考虑集群模式。这就大大的提高了开发与维护的成本。因此选择一种合适的技术是十分重要的。
异步解耦
- 如果是单体应用,在用户下单时,下订单,减库存,物流记录这三个操作是同步阻塞的。如果将这三步的操作存放到各自的消息队列中,然后监听这三个消息队列,那么可以大大的减少时间。
- 但同时,由于是分布式应用,你可能得考虑分布式事务的必要性了。此外,为了保证MQ的高可用,你可能得去调研市场上集群模式支持的最好的中间件了。
流量削峰
- 在一些秒杀场景,为了防止极高的并发,将数据库冲垮,除了可以在负载均衡端进行限流的设置,同时也可以将用户的请求数据存放到消息队列中,然后通过消息队列中控制处理速度。
- 同时这个处理的进程可以监听zk的某个节点,在秒杀结束后,修改这个节点的值,进程监听到这个节点值的变化,将不再处理请求(可以做成拦截器)。这个有时也被用来作为大数据中流处理的缓冲
市面上的消息队列
RabbitMQ
介绍
RabbitMQ 试用于逻辑相对复杂,同时对于队列的性能需求相对不高的场景。
模型
-
RabbitMQ
发送消息时会先发送到Exchange
(交换机),指定routing key
。 -
Exchange
和队列绑定时指定binding key
- 只有当
routing key
和binding key
的匹配规则成立时,才会将消息发往指定的队列。 - 两个消费者监听同一个队列,一条消息不会被两个队列同时消费。这是和kafka有区别的。
kafka
kafka topic
RabbitMQ中 数据存储的最小单位是 queue
,而在kafka中 的最小单位是 partition
, partition
包含在 topic
中,见下图。
直连模型
一个消费组gourp1
监听一个
topic
, c1和c2会分别消费一个分区。 如果是一个消费组的话,必须指定默认消费者组名称
订阅模型
多个消费组(每个组中只有一个消费者), 监听一个topic,每个消费者会消费topic中的所有分区。
kafka 偏移量(offset)
每条消息在分区中都会有一个偏移量。在消息进入分区前,会给新的消息分配一个唯一的偏移量,保证了每个分区中消息的顺序性。
在 RabbitMQ
中, 多个消费者监听一个队列,由于消息是异步发送的,由于网络等原因,可能 Consumer2
先接受到 Message2
,这样会导致消息的处理顺序不会按照消息存入队列的顺序。当然,我们可以通过一个消费者监听一个队列来保证消息的有序性。
而kafka呢,多个消费者在一个消费者组中,监听一个 topic
。 topic
中的不同的分区会被不同的消费者消费(一个消费者消费一个分区)。当然,我们只能保证每个分区的消息时被顺序消费的,不同的分区则不能保证了。
kafka实战
一个分区,一个消费者组,一个消费者
/** 一个消费者 一个消费者组 一个分区 **/ public void send1() { kafkaTemplate.send("test1",0,"test1","data1"); System.out.println("生产者发送消息"); } 复制代码
@KafkaListener(topics = {"test1"}, groupId = "test1") public void test1(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("消费者接受消息" + record); } } 复制代码
生产者发送消息 消费者接受消息ConsumerRecord(topic = test1, partition = 0, offset = 0, CreateTime = 1560260495702, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1) 复制代码
2个分区,一个消费者组,一个消费者,消费所有分区
## 增加分区数 kafka-topics --zookeeper localhost:2181 --alter --topic test1 --partitions 2 ## 结果 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded! 复制代码
## 生产者 kafkaTemplate.send("test1",0 #分区数,"test1","data1"); kafkaTemplate.send("test1",1,"test11","data11"); System.out.println("生产者发送消息"); 复制代码
## 消费者 @KafkaListener(topics = {"test1"}, groupId = "test1") public void test1(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("消费者1接受消息" + record); } } 复制代码
消费者1接受消息ConsumerRecord(topic = test1, partition = 0, offset = 4, CreateTime = 1560261029521, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1) 消费者1接受消息ConsumerRecord(topic = test1, partition = 1, offset = 1, CreateTime = 1560261029521, serialized key size = 6, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = test11, value = data11) 复制代码
一个分区,2个消费者组,2个消费者 ,每个消费者都消费一个分区
/** 2个消费者 2个消费者组 1个分区 **/ public void send3() { kafkaTemplate.send("test1",0,"test1","data1"); System.out.println("生产者发送消息"); } 复制代码
@KafkaListener(topics = {"test1"}, groupId = "test1") public void test1(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("消费者1接受消息" + record); } } @KafkaListener(topics = {"test1"}, groupId = "test2") public void test2(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("消费者2接受消息" + record); } } 复制代码
消费者1接受消息ConsumerRecord(topic = test1, partition = 0, offset = 7, CreateTime = 1560261183386, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1) 消费者2接受消息ConsumerRecord(topic = test1, partition = 0, offset = 7, CreateTime = 1560261183386, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1) 复制代码
2个分区,2个消费者,1个消费者组(负载均衡)
/** 2个消费者 1个消费者组 2个分区 **/ public void send4() { kafkaTemplate.send("test1",0,"test1","data1"); kafkaTemplate.send("test1",1,"test11","data11"); System.out.println("生产者发送消息"); } 复制代码
@KafkaListener(topics = {"test1"}, groupId = "test1") public void test1(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("消费者1接受消息" + record); } } @KafkaListener(topics = {"test1"}, groupId = "test1") public void test2(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("消费者2接受消息" + record); } } 复制代码
消费者1接受消息ConsumerRecord(topic = test1, partition = 1, offset = 3, CreateTime = 1560261444482, serialized key size = 6, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = test11, value = data11) 消费者2接受消息ConsumerRecord(topic = test1, partition = 0, offset = 9, CreateTime = 1560261444482, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1) 复制代码
2个分区,3个消费者,1个消费者组(有个消费者未消费分区)
/** 2个消费者 1个消费者组 2个分区 **/ public void send4() { kafkaTemplate.send("test1",0,"test1","data1"); kafkaTemplate.send("test1",1,"test11","data11"); System.out.println("生产者发送消息"); } 复制代码
@KafkaListener(topics = {"test1"}, groupId = "test1") public void test1(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("消费者1接受消息" + record); } } @KafkaListener(topics = {"test1"}, groupId = "test1") public void test2(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("消费者2接受消息" + record); } } @KafkaListener(topics = {"test1"}, groupId = "test1") public void test3(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("消费者3接受消息" + record); } } 复制代码
消费者1接受消息ConsumerRecord(topic = test1, partition = 1, offset = 3, CreateTime = 1560261444482, serialized key size = 6, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = test11, value = data11) 消费者2接受消息ConsumerRecord(topic = test1, partition = 0, offset = 9, CreateTime = 1560261444482, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1) 复制代码
以上所述就是小编给大家介绍的《“简单”的消息队列与kafka》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 消息队列(三)常见消息队列介绍
- 消息队列探秘 – RabbitMQ 消息队列介绍
- springboot整合各种消息队列(二):rabbitmq消息队列
- springboot整合各种消息队列(一):redis消息队列
- 消息队列系列二(IOT中消息队列的应用)
- 消息队列(七)RocketMQ消息发送
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。