“简单”的消息队列与kafka

栏目: 服务器 · Apache · 发布时间: 5年前

内容简介:小时候就特别喜欢龙系精灵,特别是乘龙,后来才知道只是冰水。。。尴尬。 在宠物小精灵中,乘龙一直是训练家的载人伙伴,和我们下面的MQ好像有几分相似呢~~MQ,全称消息队列,现在市面上有很多种消息队列,像大家耳熟能详的RabbitMQ,RocketMQ,Kafka等等,接下来为大家详细的介绍消息队列。俗话说的好,多用多错,不能为了技术而技术,要结合实际的业务场景使用合适的技术。
“简单”的消息队列与kafka

小时候就特别喜欢龙系精灵,特别是乘龙,后来才知道只是冰水。。。尴尬。 在宠物小精灵中,乘龙一直是训练家的载人伙伴,和我们下面的MQ好像有几分相似呢~~

前言

MQ,全称消息队列,现在市面上有很多种消息队列,像大家耳熟能详的RabbitMQ,RocketMQ,Kafka等等,接下来为大家详细的介绍消息队列。

使用场景

俗话说的好,多用多错,不能为了技术而技术,要结合实际的业务场景使用合适的技术。

例如你用了 Redis 缓存,这时候你也许得考虑主从架构。因为主从架构,你可能得考虑主从切换。同时也许你还得考虑集群模式。这就大大的提高了开发与维护的成本。因此选择一种合适的技术是十分重要的。

异步解耦

“简单”的消息队列与kafka
  1. 如果是单体应用,在用户下单时,下订单,减库存,物流记录这三个操作是同步阻塞的。如果将这三步的操作存放到各自的消息队列中,然后监听这三个消息队列,那么可以大大的减少时间。
  2. 但同时,由于是分布式应用,你可能得考虑分布式事务的必要性了。此外,为了保证MQ的高可用,你可能得去调研市场上集群模式支持的最好的中间件了。

流量削峰

  1. 在一些秒杀场景,为了防止极高的并发,将数据库冲垮,除了可以在负载均衡端进行限流的设置,同时也可以将用户的请求数据存放到消息队列中,然后通过消息队列中控制处理速度。
  2. 同时这个处理的进程可以监听zk的某个节点,在秒杀结束后,修改这个节点的值,进程监听到这个节点值的变化,将不再处理请求(可以做成拦截器)。这个有时也被用来作为大数据中流处理的缓冲
“简单”的消息队列与kafka

市面上的消息队列

RabbitMQ

介绍

RabbitMQ 试用于逻辑相对复杂,同时对于队列的性能需求相对不高的场景。

模型

“简单”的消息队列与kafka
  1. RabbitMQ 发送消息时会先发送到 Exchange (交换机),指定 routing key
  2. Exchange 和队列绑定时指定 binding key
  3. 只有当 routing keybinding key 的匹配规则成立时,才会将消息发往指定的队列。
  4. 两个消费者监听同一个队列,一条消息不会被两个队列同时消费。这是和kafka有区别的。

kafka

kafka topic

RabbitMQ中 数据存储的最小单位是 queue ,而在kafka中 的最小单位是 partition , partition 包含在 topic 中,见下图。

直连模型

“简单”的消息队列与kafka
一个消费组 gourp1 监听一个 topic

, c1和c2会分别消费一个分区。 如果是一个消费组的话,必须指定默认消费者组名称

订阅模型

“简单”的消息队列与kafka

多个消费组(每个组中只有一个消费者), 监听一个topic,每个消费者会消费topic中的所有分区。

kafka 偏移量(offset)

每条消息在分区中都会有一个偏移量。在消息进入分区前,会给新的消息分配一个唯一的偏移量,保证了每个分区中消息的顺序性。

RabbitMQ 中, 多个消费者监听一个队列,由于消息是异步发送的,由于网络等原因,可能 Consumer2 先接受到 Message2 ,这样会导致消息的处理顺序不会按照消息存入队列的顺序。当然,我们可以通过一个消费者监听一个队列来保证消息的有序性。

而kafka呢,多个消费者在一个消费者组中,监听一个 topictopic 中的不同的分区会被不同的消费者消费(一个消费者消费一个分区)。当然,我们只能保证每个分区的消息时被顺序消费的,不同的分区则不能保证了。

kafka实战

一个分区,一个消费者组,一个消费者

/** 一个消费者 一个消费者组 一个分区 **/
    public void send1() {
        kafkaTemplate.send("test1",0,"test1","data1");
        System.out.println("生产者发送消息");
    }
复制代码
@KafkaListener(topics = {"test1"}, groupId = "test1")
    public void test1(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.out.println("消费者接受消息" + record);
        }
    }
复制代码
生产者发送消息
消费者接受消息ConsumerRecord(topic = test1, partition = 0, offset = 0, CreateTime = 1560260495702, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
复制代码

2个分区,一个消费者组,一个消费者,消费所有分区

## 增加分区数
kafka-topics --zookeeper localhost:2181 --alter --topic test1 --partitions 2
## 结果
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
复制代码
## 生产者
kafkaTemplate.send("test1",0 #分区数,"test1","data1");
kafkaTemplate.send("test1",1,"test11","data11");
System.out.println("生产者发送消息");
复制代码
## 消费者
@KafkaListener(topics = {"test1"}, groupId = "test1")
    public void test1(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.out.println("消费者1接受消息" + record);
        }
    }
复制代码
消费者1接受消息ConsumerRecord(topic = test1, partition = 0, offset = 4, CreateTime = 1560261029521, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
消费者1接受消息ConsumerRecord(topic = test1, partition = 1, offset = 1, CreateTime = 1560261029521, serialized key size = 6, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = test11, value = data11)
复制代码

一个分区,2个消费者组,2个消费者 ,每个消费者都消费一个分区

/** 2个消费者 2个消费者组 1个分区 **/
    public void send3() {
        kafkaTemplate.send("test1",0,"test1","data1");
        System.out.println("生产者发送消息");
    }
复制代码
@KafkaListener(topics = {"test1"}, groupId = "test1")
    public void test1(ConsumerRecord<?, ?> record) {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            System.out.println("消费者1接受消息" + record);
        }
    }

    @KafkaListener(topics = {"test1"}, groupId = "test2")
    public void test2(ConsumerRecord<?, ?> record) {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            System.out.println("消费者2接受消息" + record);
        }
    }
复制代码
消费者1接受消息ConsumerRecord(topic = test1, partition = 0, offset = 7, CreateTime = 1560261183386, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
消费者2接受消息ConsumerRecord(topic = test1, partition = 0, offset = 7, CreateTime = 1560261183386, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
复制代码

2个分区,2个消费者,1个消费者组(负载均衡)

/** 2个消费者 1个消费者组 2个分区 **/
    public void send4() {
        kafkaTemplate.send("test1",0,"test1","data1");
        kafkaTemplate.send("test1",1,"test11","data11");
        System.out.println("生产者发送消息");

    }
复制代码
@KafkaListener(topics = {"test1"}, groupId = "test1")
    public void test1(ConsumerRecord<?, ?> record) {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            System.out.println("消费者1接受消息" + record);
        }
    }

    @KafkaListener(topics = {"test1"}, groupId = "test1")
    public void test2(ConsumerRecord<?, ?> record) {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            System.out.println("消费者2接受消息" + record);
        }
    }
复制代码
消费者1接受消息ConsumerRecord(topic = test1, partition = 1, offset = 3, CreateTime = 1560261444482, serialized key size = 6, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = test11, value = data11)
消费者2接受消息ConsumerRecord(topic = test1, partition = 0, offset = 9, CreateTime = 1560261444482, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
复制代码

2个分区,3个消费者,1个消费者组(有个消费者未消费分区)

/** 2个消费者 1个消费者组 2个分区 **/
    public void send4() {
        kafkaTemplate.send("test1",0,"test1","data1");
        kafkaTemplate.send("test1",1,"test11","data11");
        System.out.println("生产者发送消息");

    }
复制代码
@KafkaListener(topics = {"test1"}, groupId = "test1")
    public void test1(ConsumerRecord<?, ?> record) {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            System.out.println("消费者1接受消息" + record);
        }
    }

    @KafkaListener(topics = {"test1"}, groupId = "test1")
    public void test2(ConsumerRecord<?, ?> record) {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            System.out.println("消费者2接受消息" + record);
        }
    }
    
    @KafkaListener(topics = {"test1"}, groupId = "test1")
    public void test3(ConsumerRecord<?, ?> record) {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            System.out.println("消费者3接受消息" + record);
        }
    }
复制代码
消费者1接受消息ConsumerRecord(topic = test1, partition = 1, offset = 3, CreateTime = 1560261444482, serialized key size = 6, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = test11, value = data11)
消费者2接受消息ConsumerRecord(topic = test1, partition = 0, offset = 9, CreateTime = 1560261444482, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
复制代码

以上所述就是小编给大家介绍的《“简单”的消息队列与kafka》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

程序员的呐喊

程序员的呐喊

[美]Steve Yegge / 徐旭铭 / 人民邮电出版社 / 2014-5-1 / 45.00元

《程序员的呐喊》的作者是业界知名的程序员—来自google的steve yegge,他写过很多颇富争议的文章,其中有不少就收录在这本书中。本书是他的精彩文章的合集。 《程序员的呐喊》涉及编程语言文化、代码方法学、google公司文化等热点话题。 对工厂业界的各种现象、技术、趋势等,作者都在本书中表达了自己独特犀利的观点。比如java真的是一门优秀的面向对象语言吗?重构真的那么美好吗?强......一起来看看 《程序员的呐喊》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

随机密码生成器
随机密码生成器

多种字符组合密码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器