Kafka入门之旅

栏目: 服务器 · Apache · 发布时间: 5年前

内容简介:虽然这是一本关于Kafka Streams的书,但是要研究Kafka Streams不可能不探讨Kafka,毕竟,Kafka Streams是一个运行在Kafka之上的库。Kafka Streams设计得非常好,因此即使具有很少或者零Kafka经验的人都可以启动和运行Kafka Streams。但是,你所取得的进步和对Kafka调优的能力将是有限的。掌握Kafka的基础知识对有效使用Kafka Streams来说是必要的。Kafka是一个很大的话题,很难通过一章进行完整论述。本章将会覆盖足以使读者很好地理解

本文主要内容

    • 考察Kafka架构。
    • 生产者发送消息。
    • 消费者读取消息。
    • Kafka安装与运行。

虽然这是一本关于Kafka Streams的书,但是要研究Kafka Streams不可能不探讨Kafka,毕竟,Kafka Streams是一个运行在Kafka之上的库。

Kafka Streams设计得非常好,因此即使具有很少或者零Kafka经验的人都可以启动和运行Kafka Streams。但是,你所取得的进步和对Kafka调优的能力将是有限的。掌握Kafka的基础知识对有效使用Kafka Streams来说是必要的。

注意

本章面向的读者是对Kafka Streams有兴趣,但对Kafka本身具有很少或零经验的开发者。如果读者对Kafka具备很好的应用知识,那么就可以跳过本章,直接阅读第3章。

Kafka是一个很大的话题,很难通过一章进行完整论述。本章将会覆盖足以使读者很好地理解Kafka的工作原理和一些核心配置项设置的必备知识。要想更深入了解Kafka的知识,请看Dylan Scott写的 Kafka in Action (Manning,2018)

1数据问题

如今,各组织都在研究数据。互联网公司、金融企业以及大型零售商现在比以往任何时候都更善于利用这些数据。通过利用数据,既能更好地服务于客户,又能找到更有效的经营方式(我们要对这种情况持积极态度,并且在看待客户数据时要从好的意图出发)。

让我们考虑一下在ZMart数据管理解决方案中的各种需求。

    • 需要一种将数据快速发送到中央存储的方法。
    • 由于服务器经常发生故障,这就需要复制数据的能力,有了这种能力,不可避免的故障就不会导致停机和数据丢失。
    • 需要能够扩展到任意数量消费者的数据,而不必跟踪不同的应用程序。需要让组织中的任何人都能使用这些数据,而不必跟踪哪些人已经查看了数据,哪些人还没有查看。

2使用Kafka处理数据

在第1章中,已介绍过大型零售公司ZMart。那时,ZMart需要一个流式处理平台来利用公司的销售数据,以便更好地提供客户服务并提升销售总额。但在那时的6个月前,ZMart期待了解它的数据情况,ZMart最初有一个定制的非常有效的解决方案,但是很快就发现该解决方案变得难以驾驭了,接下来将看到其原因。

2.1ZMart原始的数据平台

最初,ZMart是一家小公司,零售销售数据从各分离的应用程序流入系统。这种方法起初效果还是不错的,但随着时间的推移,显然需要一种新的方法。一个部门的销售数据不再只是该部门所感兴趣的,公司的其他部门也可能感兴趣,并且不同的部门对数据的重要性和数据结构都有不同的需求。图2-1展示了ZMart原始的数据平台。

Kafka入门之旅

图2-1ZMart原始数据架构简单,足够使每个信息源流入和流出信息

随着时间的推移,ZMart通过收购其他公司以及扩大其现有商店的产品而持续增长。随着应用程序的添加,应用程序之间的连接变得更加复杂,由最初的少量的应用程序之间的通信演变成了一堆名副其实的意大利面条。如图2-2所示,即使只有3个应用程序,连接的数量也很烦琐且令人困惑。可以看到,随着时间的推移,添加新的应用程序将使这种数据架构变得难以管理。

Kafka入门之旅

图2-2随着时间的推移,越来越多的应用程序被添加进来,连接所有这些信息源变得非常复杂

2.2一个Kafka销售交易数据中心

一个解决ZMart问题的方案是创建一个接收进程来控制所有的交易数据,即建立一个交易数据中心。这个交易数据中心应该是无状态的,它以一种方式接受交易数据并存储,这种方式是任何消费应用程序可以根据自己的需要从数据中心提取信息。对哪些数据的追踪取决于消费应用程序,交易数据中心只知道需要将交易数据保存多久,以及在什么时候切分或删除这些数据。

也许你还没有猜到,我们有Kafka完美的用例。Kafka是一个具有容错能力、健壮的发布/订阅系统。一个Kafka节点被称为一个代理,多个Kafka服务器组成一个集群。Kafka将生产者写入的消息存储在Kafka的主题之中,消费者订阅Kafka主题,与Kafka进行通信以查看订阅的主题是否有可用的消息。图 2-3 展示了如何将Kafka想象为销售交易数据 中心。

现在大家已经对Kafka的概况有了大致的了解,在下面的几节中将进行仔细研究。

Kafka入门之旅

图2-3使用Kafka作为销售交易中心显著简化了ZMart数据架构,现在每台服务器不需要知道其他的信息来源,它们只需要知道如何从Kafka读取数据和将数据写入Kafka

3Kafka架构

在接下来的几个小节中,我们将介绍Kafka体系架构的关键部分以及Kafka的工作原理。如果想尽早地体验运行Kafka,可以直接跳到2.6节,安装和运行Kafka。等Kafka安装之后,再回到这里来继续学习Kafka。

3.1Kafka是一个消息代理

在前一节中,我曾说过Kafka是一个发布/订阅系统,但更精确地说法是Kafka充当了消息代理。代理是一个中介,将进行互利交换或交易但不一定相互了解的两部分汇聚在一起。图2-4展示了ZMart数据架构的演化。生产者和消费者被添加到图中以展示各单独部分如何与Kafka进行通信,它们之间不会直接进行通信。

Kafka将消息存储在主题中,并从主题检索消息。消息的生产者和消费者之间不会直接连接。此外,Kafka并不会保持有关生产者和消费者的任何状态,它仅作为一个消息交换中心。

Kafka主题底层的技术是日志,它是Kafka追加输入记录的文件。为了帮助管理进入主题的消息负载,Kafka使用分区。在第1章我们讨论了分区,大家可以回忆一下,分区的一个应用是将位于不同服务器上的数据汇集到同一台服务器上,稍后我们将详细讨论分区。

Kafka入门之旅

图2-4Kafka是一个消息代理,生产者将消息发送到Kafka,这些消息被存储,并通过主题订阅的方式提供给消费者

3.2Kafka是一个日志

Kafka底层的机制就是日志。大多数软件工程师都对日志很熟悉,日志用于记录应用程序正在做什么。如果在应用程序中出现性能问题或者错误,首先检查的是应用程序的日志,但这是另一种类型的日志。在Kafka(或者其他分布式系统)的上下文中,日志是“一种只能追加的,完全按照时间顺序排列的记录序列”。

图2-5展示了日志的样子,当记录到达时,应用程序将它们追加到日志的末尾。记录有一个隐含的时间顺序,尽管有可能不是与每条记录相关联的时间戳,因为最早的记录在左边,后达到的记录在右端。

日志是具有强大含义的简单数据抽象,如果记录按时间有序,解决冲突或确定将哪个更新应用到不同的机器就变得明确了:最新记录获胜。

Kafka中的主题是按主题名称分隔的日志,几乎可以将主题视为有标签的日志。如果日志在一个集群中有多个副本,那么当一台服务器宕机后,就能够很容易使服务器恢复正常:只需重放日志文件。从故障中恢复的能力正是分布式提交日志具有的。

Kafka入门之旅

图2-5日志是追加传入记录的文件——每条新到达的记录都被立即放在接收到的最后一条记录之后,这个过程按时间顺序对记录进行排序

我们只触及了关于分布式应用程序和数据一致性的深入话题的表面,但到目前为止所讲解的知识应该能让读者对Kafka涉及的内容有了一个基本的了解。

3.3Kafka日志工作原理

当安装Kafka时,其中一个配置项是 log.dir ,该配置项用来指定Kafka存储日志数据的路径。每个主题都映射到指定日志路径下的一个子目录。子目录数与主题对应的分区数相同,目录名格式为“主题名_分区编号”(将在下一节介绍分区)。每个目录里面存放的都是用于追加传入消息的日志文件,一旦日志文件达到某个规模(磁盘上的记录总数或者记录的大小),或者消息的时间戳间的时间间隔达到了所配置的时间间隔时,日志文件就会被切分,传入的消息将会被追加到一个新的日志文件中(如图2-6所示)。

Kafka入门之旅

图2-6logs目录是消息存储的根目录,/logs目录下的每个目录代表一个主题的分区,目录中的文件名以主题的名称打头,然后是下划线,后面接一个分区的编号

可以看到日志和主题是高度关联的概念,可以说一个主题是一个日志,或者说一个主题代表一个日志。通过主题名可以很好地处理经由生产者发送到Kafka的消息将被存储到哪个日志当中。既然已经讨论了日志的概念,那么我们再来讨论Kafka另一个基本概念——分区。

3.4Kafka和分区

分区是Kafka设计的一个重要部分,它对性能来说必不可少。分区保证了同一个键的数据将会按序被发送给同一个消费者。图2-7展示了分区的工作原理。

Kafka入门之旅

图2-7Kafka使用分区来实现高吞吐量,并将一个主题的消息在集群的不同服务器中传播

对主题作分区的本质是将发送到主题的数据切分到多个平行流之中,这是Kafka能够实现巨大吞吐量的关键。我们解释过每个主题就是一个分布式日志,每个分区类似于一个它自己的日志,并遵循相同的规则。Kafka将每个传入的消息追加到日志末尾,并且所有的消息都严格按时间顺序排列,每条消息都有一个分配给它的偏移量。Kafka不保证跨分区的消息有序,但是能够保证每个分区内的消息是有序的。

除了增加吞吐量,分区还有另一个目的,它允许主题的消息分散在多台机器上,这样给定主题的容量就不会局限于一台服务器上的可用磁盘空间。

现在让我们看看分区扮演的另一个关键角色:确保具有相同键的消息最终在一起。

3.5分区按键对数据进行分组

Kafka处理键/值对格式的数据,如果键为空,那么生产者将采用轮询(round-robin)方式选择分区写入记录。图2-8展示了用非空键如何分配分区的操作。

如果键不为空,Kafka会使用以下公式(如下伪代码所示)确定将键/值对发送到哪个分区:

HashCode.(key) %number of partitions

通过使用确定性方法来选择分区,使得具有相同键的记录将会按序总是被发送到同一个分区。默认的分区器使用此方法,如果需要使用不同的策略选择分区,则可以提供自定义的分区器。

Kafka入门之旅

图2-8 “foo”被发送到分区0,“bar”被发送到分区1。通过键的

字节散列与分区总数取模来获得数据被分配的分区

3.6编写自定义分区器

为什么要编写自定义分区器呢?在几个可能的原因中,下面将举一个简单的例子——组合键的使用。

假设将购买数据写入Kafka,该数据的键包括两个值,即客户ID和交易日期,需要根据客户ID对值进行分组,因此对客户ID和交易日期进行散列是行不通的。在这种情况下,就需要编写一个自定义分区器,该分区器知道组合键的哪一部分决定使用哪个分区。例如,/src/main/java/ bbejeck/model/PurchaseKey.java中的组合键,如代码清单2-1所示。

代码清单2-1组合键PurchaseKey类

public class PurchaseKey {

    private String customerId;
    private Date transactionDate;

    public PurchaseKey(String customerId, Date transactionDate) {
        this.customerId = customerId;
        this.transactionDate = transactionDate;
    }

    public String getCustomerId() {
        return customerId;
    }

    public Date getTransactionDate() {
        return transactionDate;
    }
}

当提及分区时,需要保证特定用户的所有交易信息都会被发送到同一个分区中。但是整体作为键就无法保证,因为购买行为会在多个日期发生,包括交易日期的记录对一个用户而言就会导致不同的键值,就会将交易数据随机分布到不同的分区中。若需要确保具有相同客户ID的交易信息都发送到同一个分区,唯一的方法就是在确定分区时使用客户ID作为键。

代码清单2-2所示的自定义分区器的例子就满足需求。 PurchaseKeyPartitioner 类(源代码见src/ main/java/bbejeck/chapter_2/partitioner/PurchaseKeyPartitioner.java)从键中提取客户ID来确定使用哪个分区。

代码清单2-2自定义分区器PurchaseKeyPartitioner类

public class PurchaseKeyPartitioner extends DefaultPartitioner {

    @Override
    public int partition(String topic, Object key,
                        byte[] keyBytes, Object value,
                         byte[] valueBytes, Cluster cluster) {
        Object newKey = null;
        if (key != null) {  ⇽--- 如果键不为空,那么提取客户ID
           PurchaseKey purchaseKey = (PurchaseKey) key;
            newKey = purchaseKey.getCustomerId();
            keyBytes = ((String) newKey).getBytes();  ⇽--- 将键的字节赋值给新的值

        }
        return super.partition(topic, newKey, keyBytes, value,   ⇽--- 返回具有已被更新键的分区,并将其委托给超类
        valueBytes, cluster);
   }
}

该自定义分区器继承自 DefaultPartitioner 类,当然也可以直接实现 Partitioner 接口,但是在这个例子中,在 DefaultPartitioner 类中有一个已存在的逻辑。

请注意,在创建自定义分区器时,不仅局限于使用键,单独使用值或与键组合使用都是有效的。

注意

Kafka API提供了一个可以用来实现自定义分区器的 Partitioner 接口,本书不打算讲解从头开始写一个分区器,但是实现原则与代码清单2-2相同。

已经看到如何构造一个自定义分区器,接下来,将分区器与Kafka结合起来。

3.7指定一个自定义分区器

既然已编写了一个自定义分区器,那就需要告诉Kafka使用自定义的分区器代替默认的分区器。虽然还没有讨论生产者,但在设置Kafka生产者配置时可以指定一个不同的分区器,配置如下:

partitioner.class=bbejeck_2.partitioner.PurchaseKeyPartitioner

通过为每个生产者实例设置分区器的方式,就可以随意地为任何生产者指定任何分区器类。在讨论Kafka生产者时再对生产者的配置做详细介绍。

警告

在决定使用的键以及选择键/值对的部分作为分区依据时,一定要谨慎行事。要确保所选择的键在所有数据中具有合理的分布,否则,由于大多数数据都分布在少数几个分区上,最终导致数据倾斜。

3.8确定恰当的分区数

在创建主题时决定要使用的分区数既是一门艺术也是一门科学。其中一个重要的考虑因素是流入该主题的数据量。更多的数据意味着更多的分区以获得更高的吞吐量,但与生活中的任何事物一样,也要有取舍。

增加分区数的同时也增加了TCP连接数和打开的文件句柄数。此外,消费者处理传入记录所花费的时间也会影响吞吐量。如果消费者线程有重量级处理操作,那么增加分区数可能有帮助,但是较慢的处理操作最终将会影响性能。

3.9分布式日志

我们已经讨论了日志和对主题进行分区的概念,现在,花点时间结合这两个概念来阐述分布式日志。

到目前为止,我们讨论日志和对主题进行分区都是基于一台Kafka服务器或者代理,但典型的Kafka生产集群环境包括多台服务器。故意将讨论集中单个节点上,是因为考虑一个节点更容易理解概念。但在实践中,总是使用包括多台服务器的Kafka集群。

当对主题进行分区时,Kafka不会将这些分区分布在一台服务上,而是将分区分散到集群中的多台服务器上。由于Kafka是在日志中追加记录,因此Kafka通过分区将这些记录分发到多台服务器上。图2-9展示了这个过程。

让我们通过使用图2-9作为一个向导来完成一个快速实例。对于这个实例,我们假设有一个主题,并且键为空,因此生产者将通过轮询的方式分配分区。

生产者将第1条消息发送到位于Kafka代理1上的分区0中,第2条消息被发送到位于Kafka代理1上的分区1中,第3条消息被发送到位于Kafka代理2上的分区2中。当生产者发送第6条消息时,消息将会被发送到Kafka代理3上的分区5中,从下一条消息开始,又将重复该步骤,消息将被发送到位于Kafka代理1上的分区0中。以这种方式继续分配消息,将消息分配到Kafka集群的所有节点中。

Kafka入门之旅

图2-9生产者将消息写入主题的分区中,如果消息没有关联键,那么生产者就会通过轮询方式选择一个分区,否则通过键的散列值与分区总数取模来决定分区

虽然远程存储数据听起来会有风险,因为服务器有可能会宕机,但Kafka提供了数据冗余。当数据被写入Kafka的一个代理时,数据会被复制到集群中一台或多台机器上(在后面小节会介绍副本)。

3.10ZooKeeper:领导者、追随者和副本

到目前为止,我们已经讨论了主题在Kafka中的作用,以及主题如何及为什么要进行分区。可以看到,分区并不都位于同一台服务器上,而是分布在整个集群的各个代理上。现在是时候来看看当服务器故障时Kafka如何提供数据可用性。

Kafka代理有 领导者 (leader)和 追随者 (follower)的概念。在Kafka中,对每一个主题分区(topic partition),会选择其中一个代理作为其他代理(追随者)的领导者。领导者的一个主要职责是分配主题分区的副本给追随者代理服务器。就像Kafka在集群中为一个主题分配分区一样,Kafka也会在集群的多台服务器中复制分区数据。在深入探讨领导者、追随者和副本是如何工作之前,先来介绍Kafka为实现这一点所使用的技术。

3.11Apache ZooKeeper

如果你是个Kafka菜鸟,你可能会问自己:“为什么在Kafka的书中会谈论Apache ZooKeeper?”Apache ZooKeeper是Kafka架构不可或缺的部分,正是由于ZooKeeper才使得Kafka有领导者代理,并使领导者代理做诸如跟踪主题副本的事情,ZooKeeper官网对其介绍如下:

ZooKeeper是一个集中式服务,用于维护配置信息、命名、提供分布式同步和组服务。这些类型的所有服务都是通过分布式应用程序以某种形式使用。

既然Kafka是一个分布式应用程序,那么它一开始就应该知道ZooKeeper在其架构中的作用。在这里的讨论中,我们只考虑两个或多个Kafka服务器的安装问题。

在Kafka集群中,其中一个代理会被选为控制器。在2.3.4节我们介绍了分区以及如何在集群的不同服务器之间分配分区。主题分区有一个领导者分区和一到多个追随者分区(复制的级别决定复制的程度),当生成消息时,Kafka将记录发送到领导者分区对应的代理上。

3.12选择一个控制器

Kafka使用ZooKeeper来选择代理控制器,对于其中涉及的一致性算法的探讨已超出本书所讲内容的范围,因此我们不做深入探讨,只声明ZooKeeper从集群中选择一个代理作为控制器。

如果代理控制器发生故障或者由于任何原因而不可用时,ZooKeeper从与领导者保持同步的一系列代理(已同步的副本[ISR])中选出一个新的控制器,构成该系列的代理是动态的,ZooKeeper只会从这个代理系列中选择一个领导者。

3.13副本

Kafka在代理之间复制记录,以确保当集群中的节点发生故障时数据可用。可以为每个主题(正如前面介绍的消息发布或消费实例中的主题)单独设置复制级别也可以为集群中的所有主题设置复制级别。图2-10演示了代理之间的复制流。

Kafka复制过程非常简单,一个主题分区对应的各代理从该主题分区的领导者分区消费消息,并将消息追加到自己的日志当中。正如2.3.12节所论述的,与领导者代理保持同步的追随者代理被认为是ISR,这些ISR代理在当前领导者发生故障或者不可用时有资格被选举为领导者。

Kafka入门之旅

图2-10代理1和代理3是一个主题分区的领导者,同时也是另外一个分区的追随者,而代理2只是追随者,追随者代理从领导者代理复制数据

3.14控制器的职责

代理控制器的职责是为一个主题的所有分区建立领导者分区和追随者分区的关系,如果一个Kafka节点宕机或者没有响应(与ZooKeeper之间的心跳),那么所有已分配的分区(包括领导者和追随者)都将由代理控制器重新进行分配。图2-11演示了一个正在运行的代理控制器。

图2-11展示了一个简单的故障情景。第1步,代理控制器检测到代理3不可用。第2步,代理控制器将代理3上分区的领导权重新分配给代理2。

ZooKeeper也参与了Kafka以下几个方面的操作。

    • 集群成员 ——代理加入集群和维护集群中成员关系。如果一个代理不可用,则ZooKeeper将该代理从集群成员中移除。
    • 主题配置 ——跟踪集群中的主题,记录哪个代理是主题的领导者,各主题有多少个分区以及主题的哪些特定的配置被覆盖。
    • 访问控制 ——识别谁可以从特定主题中读取或写入消息。

Kafka入门之旅

图2-11代理控制器负责将其他代理分配为某些主题/分区的领导者代理和另一些主题/分区的追随者代理, 当代理不可用时,代理控制器将已分配给不可用代理的重新分配给集群中的其他代理

现在可知Kafka为什么依赖于Apache ZooKeeper了,正是ZooKeeper使得Kafka有了一个带着追随者的领导者代理,领导者代理的关键角色是为追随者分配主题分区,以便进行复制,以及在代理成员出现故障时重新分配主题分区。

3.15日志管理

对追加日志已进行了介绍,但还没有谈到随着日志持续增长如何对其进行管理。一个集群中旋转磁盘的空间是一个有限的资源,因此对Kafka而言,随着时间的推移,删除消息是很重要的事。在谈到删除Kafka中的旧数据时,有两种方法,即传统的日志删除和日志压缩。

3.16日志删除

日志删除策略是一个两阶段的方法:首先,将日志分成多个日志段,然后将最旧的日志段删除。为了管理Kafka不断增加的日志,Kafka将日志切分成多个日志段。日志切分的时间基于消息中内置的时间戳。当一条新消息到达时,如果它的时间戳大于日志中第一个消息的时间戳加上 log.roll.ms 配置项配置的值时,Kafka就会切分日志。此时,日志被切分,一个新的日志段会被创建并作为一个活跃的日志段,而以前的活跃日志段仍然为消费者提供消息检索。

日志切分是在设置Kafka代理时进行设置的。日志切分有两个可选的配置项。

    • log.roll.ms ——这个是主配置项,但没有默认值。
    • log.roll.hours ——这是辅助配置项,仅当 log.roll.ms 没有被设置时使用,该配置项默认值是168小时。

随着时间的推移,日志段的数据也将不断增加,为了为传入的数据腾出空间,需要将较旧的日志段删除。为了删除日志段,可以指定日志段保留的时长。图2-12说明了日志切分的过程。

Kafka入门之旅

图2-12左边是当前日志段,右上角是一个已被删除的日志段,在其下面是最近切分的仍然在使用的日志段

与日志切分一样,日志段的删除也基于消息的时间戳,而不仅是时钟时间或文件最后被修改的时间,日志段的删除基于日志中最大的时间戳。用来设置日志段删除策略的3个配置项按优先级依次列出如下,这里按优先级排列意味着排在前面的配置项会覆盖后面的配置项。

log.retention.ms
log.retention.minutes
log.retention.hours

提出这些设置的前提是基于大容量主题的假设,这里大容量是指在一个给定的时间段内保证能够达到文件最大值。另一个配置项 log.retention.bytes ,可以指定较长的切分时间阈值,以控制I/O操作。最后,为了防止日志切分阈值设置得相对较大而出现日志量显著增加的情况,请使用配置项 log.segment.bytes 来控制单个日志段的大小。

对于键为空的记录以及独立的记录,删除日志的效果很好。但是,如果消息有键并需要预期的更新操作,那么还有一种方法更适合。

3.17日志压缩

假设日志中已存储的消息都有键,并且还在不停地接收更新的消息,这意味着具有相同键的新记录将会更新先前的值。例如,股票代码可以作为消息的键,每股的价格作为定期更新的值。想象一下,使用这些信息来展示股票的价值,并出现程序崩溃或者重启,这就需要能够让每个键恢复到最新数据。

如果使用删除策略,那么从最后一次更新到应用程序崩溃或重启之间的日志段就可能被去除,启动时就得不到所有的记录。一种较好的方式是保留给定键的最近已知值,用与更新数据库表键一样的方式对待下一条记录。

按键更新记录是实现压缩主题(日志)的表现形式。与基于时间和日志大小直接删除整个日志段的粗粒度方式不同,压缩是一种更加细粒度的方式,该方式是删除日志中每个键的旧数据。从一个很高的层面上来说,一个日志清理器(一个线程池)运行在后台,如果后面的日志中出现了相同的键,则日志清理器就会重新复制日志段文件并将该键对应的旧记录去除。图2-13阐明了日志压缩是如何为每个键保留最新消息的。

这种方式保证了给定键的最后一条记录在日志中。可以为每个主题指定日志保留策略,因此完全有可能某些主题使用基于时间的保留,而其他主题使用压缩。

默认情况下,日志清理功能是开启的。如果要对主题使用压缩,那么需要在创建主题时设置属性 log.cleanup.policy=compact

在Kafka Streams中使用应用状态存储时就要用到压缩,不过并不需要我们自己来创建相应的日志或主题——框架会处理。然而,理解压缩的原理是很重要的,日志压缩是一个宽泛的话题,我们仅谈论至此。如果想了解压缩方面的更多信息,参见Kafka官方文档。

注意

当使用 cleanup.policy 为压缩时,你可能好奇如何从日志中去除一条记录。对于一个压缩的主题,删除操作会为给定键设置一个 null 值,作为一个墓碑标记。任何值为 null 的键都确保先前与其键相同的记录被去除,之后墓碑标记自身也会被去除。

Kafka入门之旅

图2-13左边是压缩前的日志,可以看到具有不同值的重复键,这些值是用来更新给定键的。右边是压缩后的日志,保留了每个键的最新值,但日志变小了

本节的关键内容是:如果事件或消息是独立、单独的,那么就使用日志删除,如果要对事件或消息进行更新,那就使用日志压缩。

我们已经花了很多时间介绍Kafka内部是如何处理数据的,现在,让我们转移到Kafka外部,探讨如何通过生产者向Kafka发送消息,以及消费者如何从Kafka读取消息。

4生产者发送消息

回到ZMart对集中销售交易数据中心的需求,看看如何将购买交易数据发送到Kafka。在Kafka中,生产者是用于发送消息的客户端。图2-14重述ZMart的数据结构,突出显示生产者,以强调它们在数据流中适合的位置。

尽管ZMart有很多的销售交易,但现在我们只考虑购买一个单一物品:一本10.99美元的书。当消费者完成销售交易时,交易信息将被转换为一个键/值对并通过生产者发送到Kafka。

键是客户ID,即 123447777 ,值是一个JSON格式的值,即 "{\"item\":\"book\",\ "price\":10.99}" (这里已把双引号转义了,这样JSON可以被表示为 Java 中的字符串)。有了这种格式的数据,就可以使用生产者将数据发送到Kafka集群。代码清单2-3所示的示例代码可以在源代码/src/main/java/bbejeck.chapter_2/producer/SimpleProducer.java类中找到。

Kafka入门之旅

图2-14生产者用于向Kafka发送消息,它们并不知道哪个消费者会读取消息,也不知道消费者在什么时候会读取消息

代码清单2-3 SimpleProducer 示例

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.
➥   StringSerializer");
properties.put("value.serializer",
➥   "org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks", "1");
properties.put("retries", "3");
properties.put("compression.type", "snappy");
properties.put("partitioner.class",
➥  PurchaseKeyPartitioner.class.getName());  ⇽--- 生产者属
性配置

PurchaseKey key = new PurchaseKey("12334568", new Date());

try(Producer<PurchaseKey, String> producer = 
➥   new KafkaProducer<>(properties)) {  ⇽--- 创建一个KafkaProducer
   ProducerRecord<PurchaseKey, String> record = 
➥   new ProducerRecord<>("transactions", key, "{\"item\":\"book\",
     \"price\":10.99}");  ⇽--- 实例化ProducerRecord

   Callback callback = (metadata, exception) -> {
             if (exception != null) {
                System.out.println("Encountered exception "
➥   + exception);   ⇽--- 构造一个回调
            }
       };

    Future<RecordMetadata> sendFuture =
➥   producer.send(record, callback);   ⇽--- 发送记录,并将返回的Future赋值给一个变量
}

Kafka生产者是线程安全的。所有消息被异步发送到Kafka——一旦生产者将记录放到内部缓冲区,就立即返回 Producer.send 。缓冲区批量发送记录,具体取决于配置,如果在生产者缓冲区满时尝试发送消息,则可能会有阻塞。

这里描述的 Producer.send 方法接受一个 Callback 实例,一旦领导者代理确认收到记录,生产者就会触发 Callback.onComplete 方法, Callback.onComplete 方法中仅有一个参数为非空。在本例中,只关心在发生错误时打印输出异常堆栈信息,因此检验 异常 对象是否为空。一旦服务器确认收到记录,返回的 Future 就会产生一个 RecordMetadata 对象。

定义

在代码清单2-3中, Producer.send 方法返回一个 Future 对象,一个 Future 对象代表一个异步操作的结果。更重要的是, Future 可以选择惰性地检索异步结果,而不是等它们完成。更多信息请参考Java文档“Interface Future ”(接口Future )。

4.1生产者属性

当创建 KafkaProducer 实例时,传递了一个包含生产者配置信息的 java.util. Properties 参数。 KafkaProducer 的配置并不复杂,但在设置时需要考虑一些关键属性,例如,可以在配置中指定自定义的分区器。这里要介绍的属性太多了,因此我们只看一下代码清单2-3中使用的属性。

    • 服务器引导 —— bootstrap.servers 是一个用逗号分隔的 host:port 值列表。最终,生产者将使用集群中的所有代理。此外,此列表用于初始连接到Kafka集群。
    • 序列化器 —— key.serializervalue.serializer 通知Kafka如何将键和值转化为字节数组。在内部,Kafka使用键和值的字节数组,因此在将消息通过网络发送之前需要向Kafka提供正确的序列化器,以将对象转换为字节数组。
    • 确认应答 —— acks 指定生产者认为在一条记录发送完成之前需要等待的从代理返回的最小确认数。 acks 的有效值为 all01 。当值为 all 时,生产者需要等待一个代理接收到所有追随者代理都已提交记录的确认。当值为 1时 ,代理将记录写入其日志,但不用等待所有的追随者代理来确认提交了记录。当值为 0 时,意味着生产者不用等待任何确认——这基本上是“即发即弃”。
    • 重试 ——如果发送一批消息失败, retries 指定失败后尝试重发的次数。如果记录的顺序很重要,那么应该考虑设置 max.in.flight.requests.connection1 ,以防止失败的记录在重试发送之前第二批记录成功发送的情景。
    • 压缩类型 ——如果使用数据压缩的话, compression.type 配置项用来指定要采用的压缩算法。如果设置了 compression.typecompression.type 会通知生产者在发送数据前对本批次的数据进行压缩。注意,是对整个批次进行压缩,而不是单条记录。
    • 分区器类 —— partitioner.class 指定实现 Partitioner 接口的类的名称。 partitioner.class 与我们在2.3.7节中介绍的自定义分区器有关。

更多生产者相关的配置信息请参见Kafka官方文档。

4.2指定分区和时间戳

当创建一个 ProducerRecord 对象时,可以选择指定分区、时间戳或者两者都指定。在代码清单2-3中实例化 ProducerRecord 时,使用了4个重载构造方法中的一个。其他构造方法允许设置分区和时间戳,或者只设置分区,代码如下:

ProducerRecord(String topic, Integer partition, String key, String value)
ProducerRecord(String topic, Integer partition, 
               Long timestamp, String key, 
               String value)

4.3指定分区

在2.3.5节中,我们讨论了Kafka分区的重要性。我们也讨论了 DefaultPartitioner 的工作原理以及如何提供一个自定义分区器。为什么要显式设置分区?可能有多种业务上的原因,下面是其中一个例子。

假设传入的记录都有键,但是记录被分发到哪个分区并不重要,因为消费者有逻辑来处理该键包含的任何数据。此外,键的分布可能不均匀,但你希望确保所有的分区接收到的数据量大致相同,代码清单2-4给出的是一个粗略的实现方案。

代码清单2-4手动设置分区

AtomicInteger partitionIndex = new AtomicInteger(0);    ⇽--- 创建一个AtomicInteger实例变量
int currentPartition = Math.abs(partitionIndex.getAndIncrement())%
➥    numberPartitions;   ⇽--- 获取当前分区并将其作为参数
ProducerRecord<String, String> record = 
➥    new ProducerRecord<>("topic", currentPartition, "key", "value");

上面的代码调用 Math.abs ,因此对于 Math.abs 求得的整型值,如果该值超出 Integer. MAX_VALUE ,也不必关注。

定义

AtomicInteger 属于java.util.concurrent.atomic包,该包包含支持对单个变量进行无锁、线程安全的操作的类。若需要更多信息,请参考Java官方文档关于java.util.concurrent.atomic包的介绍。

4.4Kafka中的时间戳

Kafka从0.10版本开始在记录中增加了时间戳,在创建 ProducerRecord 对象时调用以下重载的构造函数设置了时间戳。

ProducerRecord(String topic, Integer partition,
➥ Long timestamp,K key,V value)

如果没有设置时间戳,那么生产者在将记录发送到Kafka代理之前将会使用系统当前的时钟时间。时间戳也受代理级别的配置项 log.message.timestamp.type 的影响,该配置项可以被设置为 CreateTime (默认类型)和 LogAppendTime 中的一种。与许多其他代理级别的配置一样,代理级别的配置将作为所有主题的默认值,但是在创建主题时可以为每个主题指定不同的值。如果时间戳类型设置为 LogAppendTime ,并且在创建主题时没有覆盖代理级别对时间戳类型的配置,那么当将记录追加到日志时,代理将使用当前的时间覆盖时间戳,否则,使用来自 ProducerRecord 的时间戳。

两种时间戳类型该如何选择呢? LogAppendTime 被认为是“处理时间”,而 CreateTime 被认为是“事件时间”,选择哪一种类型取决于具体的业务需求。这就要确定你是否需要知道Kafka什么时候处理记录,或者真实的事件发生在什么时候。在后面的章节,将会看到时间戳对于控制Kafka Streams中的数据流所起的重要作用。

5消费者读取消息

我们已经知道了生产者的工作原理,现在是时候来看看Kafka的消费者。假设你正在构建一个原型应用程序用于展示ZMart最近的销售统计数据。对于这个示例,将消费先前生产者示例中发送的消息。因为这个原型处于早期阶段,所以此时要做的就是消费消息并将消息打印到控制台。

注意

因为本书所探讨的Kafka Streams的版本要求Kafka的版本为0.10.2或者更高版本,所以我们仅讨论新的消费者,它是在Kafka 0.9版本中发布的。

KafkaConsumer 是用来从Kafka消费消息的客户端。 KafkaConsumer 类很容易使用,但是有一些操作事项需要重视。图2-15展示了ZMart的体系架构,突出了消费者在数据流中所起的作用。

Kafka入门之旅

图2-15 这些是从Kafka读取消息的消费者,正如生产者不知道消费者一样,消费者从Kafka读取消息时也不知道是谁生产的消息

5.1管理偏移量

KafkaProducer 基本上是无状态的,然而 KafkaConsumer 需要周期性地提交从Kafka消费的消息的偏移量来管理一些状态。偏移量唯一标识消息,并表示消息在日志中的起始位置。消费者需要周期性地提交它们已接收到的消息的偏移量。

对一个消费者来说,提交一个偏移量有两个含义。

    • 提交意味着消费者已经完全处理了消息。
    • 提交也表示在发生故障或者重启时该消费者消费的起始位置。

如果创建了一个新消费者实例或者发生了某些故障,并且最后提交的偏移量不可用,那么消费者从何处开始消费取决于具体的配置。

auto.offset.reset="earliest"
auto.offset.reset="latest"
auto.offset.reset="none"

从图2-16可以看到选择不同的 auto.offset.reset 设置的影响。如果设置为 earliest ,那么收到消息的起始偏移量是 0 ;如果设置为 latest ,那么取得消息的起始偏移量为11。

Kafka入门之旅

图2-16将 auto.offset.reset 设置为 earliestlatest的 图形对比表示。设置为 earliest ,消费者将会得到所有未被删除的消息;设置为 latest 意味着消费者需要等待下一条可用消息到达

接下来,我们需要讨论偏移量提交的选项,你可以自动提交也可以手动提交。

5.2自动提交偏移量

默认情况下,消费者使用的是自动提交偏移量,通过 enable.auto.commit 属性进行设置。还有一个与 enable.auto.commit 配合使用的配置项 auto.commit.interval.ms ,用来指定消费者提交偏移量的频率(默认值是5秒)。调整这个频率值要谨慎,如果设置太小,将会增加网络流量;如果设置太大,可能会导致在发生故障或重启时消费者收到大量重复数据。

5.3手动提交偏移量

手动提交偏移量有两种方式——同步和异步。同步提交方式的代码如下:

consumer.commitSync()
consumer.commitSync(Map<TopicPartition, OffsetAndMetadata>)

无参的 commitSync() 方法在上一次检索(轮询)成功返回所有的偏移量之前会一直阻塞,此方法适用于所有订阅的主题和分区。另一个方法需要一个 Map<TopicPartitonOffsetAndMetadata> 类型的参数,它只会提交 Map 中指定的偏移量、分区和主题。

异步提交也有与同步提交类似的方法, consumer.commitAsync() 方法是完全异步的,提交后立即返回。其中一个重载方法是无参的,两个 consumer.commitAsync 方法都可选择地提供一个 OffsetCommitCallback 回调对象,它在提交成功或者失败时被调用。通过提供回调实例可以实现异步处理或者异常处理。使用手工提交的好处是可以直接控制记录何时被视为已处理。

5.4创建消费者

创建一个消费者与创建一个生产者类似,提供一个以 java.util.Properties 形式的Java对象的配置,然后返回一个 KafkaConsumer 实例。该实例订阅由主题名称列表提供或者由正则表达式指定的主题。通常,会在一个循环中以指定毫秒级的间隔周期性地运行消费者轮询。

轮询的结果是一个 ConsumerRecords<KV> 对象, ConsumerRecords 实现了 Iterable 接口,每次调用 next() 方法返回一个包括消息的元数据以及实际的键和值的 ConsumerRecord 对象。

在处理完上一次轮询调用返回的所有 ConsumerRecord 对象之后,又会返回到循环的顶部,再次轮询指定的同期。实际上,期望消费者以这种轮询方式无限期地运行,除非发生错误或者应用程序需要关闭和重启(这就是提交的偏移量要发挥作用的地方——在重启时,消费者从停止的地方继续消费)。

5.5消费者和分区

通常需要多个消费者实例——主题的每个分区都有一个消费者实例。可以让一个消费者从多个分区中读取数据,但是通常的做法是使用一个线程数与分区数相等的线程池,每个线程运行一个消费者,每个消费者被分配到一个分区。

这种每个分区一个消费者的模式最大限度地提高了吞吐量,但如果将消费者分散在多个应用程序或者服务器上时,那么所有实例的线程总数不要超过主题的分区总数。任何超过分区总数的线程都将是空闲的。如果一个消费者发生故障,领导者代理将会把分配给该故障消费者的分区重新分配给另一个活跃的消费者。

注意

这个例子展示了一个消费者订阅一个主题的情况,但是这种情况仅是为了阐述的目的。大家可以让一个消费者订阅任意数量的主题。

领导者代理将主题的分区分配给具有相同 group.id 的所有可用的消费者, group.id 是一个配置项,用来标示消费者属于哪一个消费者组——这样一来,消费者就不需要位于同一台机器上。事实上,最好让消费者分散在几台机器上。这样,当一台服务器发生故障时,领导者代理可以将主题分区重新分配给一台正常运行的机器上的消费者。

5.6再平衡

在2.5.5节中描述的向消费者添加和移除主题分区(topic-partition)分配的过程被称为再平衡。分配给消费者的主题分区不是静态的,而是动态变化的。当添加一些具有相同消费者组ID的消费者时,将会从活跃的消费者中获取一些当前的主题分区,并将它们分配给新的消费者。这个重新分配的过程持续进行,直到将每个分区都分配给一个正在读取数据的消费者。

在达到这个平衡点之后,任何额外的消费者都将处于空闲状态。当消费者不管由于什么原因离开消费者组时,分配给它们的主题分区被重新分配给其他消费者。

5.7更细粒度的消费者分配

在2.5.5节中,我们描述了使用线程池及多个消费者(在同一个消费者组)订阅同一个主题。尽管Kafka会平衡所有消费者的主题分区负载,但是主题和分区的分配并不是确定性的,你并不知道每个消费者将收到哪个主题分区对。

KafkaConsumer 有一个允许订阅特定主题和分区的方法,代码如下:

TopicPartition fooTopicPartition_0 = new TopicPartition("foo", 0);
TopicPartition barTopicPartition_0 = new TopicPartition("bar", 0);

consumer.assign(Arrays.asList(fooTopicPartition_0, barTopicPartition_0));

在手动进行主题分区分配时,需要权衡以下两点。

consumer.assign

5.8消费者示例

代码清单2-5给出的是ZMart原型消费者的代码,该消费者消费交易数据并打印到控制台。完整代码可以在源代码src/main/java/bbejeck.chapter_2/consumer/ThreadedConsumerExample.java类中找到。

代码清单2-5 ThreadedConsumerExample 示例

public void startConsuming() {
        executorService = Executors.newFixedThreadPool(numberPartitions);
        Properties properties = getConsumerProps();
        for (int i = 0; i < numberPartitions; i++) {
           Runnable consumerThread = getConsumerThread(properties);   ⇽--- 创建一个消费者线程
           executorService.submit(consumerThread);
        }
    }

    private Runnable getConsumerThread(Properties properties) {
        return () -> {
            Consumer<String, String> consumer = null;
            try {
                consumer = new KafkaConsumer<>(properties);
               consumer.subscribe(Collections.singletonList(  ⇽--- 订阅主题
➥   "test-topic"));
                while (!doneConsuming) {
                   ConsumerRecords<String, String> records =   ⇽--- 5秒钟轮询一次
➥   consumer.poll(5000);
                    for (ConsumerRecord<String, String> record : records) {
                        String message = String.format("Consumed: key = 
➥   %s value = %s with offset = %d partition = %d",
                                record.key(), record.value(),
                                record.offset(), record.partition());
                        System.out.println(message);   ⇽--- 打印格式化的消息
                    }

                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (consumer != null) {
                   consumer.close();  ⇽--- 关闭消费者,否则会导致资源泄露
                }
            }
        };
    }

这个例子省略了类的其他代码——它不会独立存在。可以在本章的源代码中找到完整的示例。

6安装和运行Kafka

当我写本书时,Kafka的最新版本是1.0.0。因为Kafka是一个Scala项目,所以每次发布有两个版本:一个用于Scala 2.11;另一个用于Scala 2.12。本书使用Scala 2.12版本的Kafka。尽管大家可以下载发行版,本书源代码中也包括Kafka的二进制发行版,它将与本书阐述和描述的Kafka Streams一起工作。要安装Kafka,从本书repo管理的源代码中提取.tgz文件,放到自己机器上的某个目录中。

注意

 
Kafka的二进制版本包括Apache ZooKeeper,因此不需要额外的安装工作。

6.1Kafka本地配置

如果接受Kafka的默认配置,那么本地运行Kafka需要配置的地方就很少。默认情况下,Kafka使用9092端口,ZooKeeper使用2181端口。假设本地没有应用程序使用这些端口,那么一切就绪了。

Kafka将日志写入/tmp/kafka-logs目录下,ZooKeeper使用/tmp/zookeeper目录存储日志。根据自身服务器情况,可能需要更改这些目录的权限或所有权,抑或是需要修改写日志的位置。

为了修改Kafka日志目录, cd 命令进入Kafka安装路径的config目录,打开server. properties文件,找到 log.dirs 配置项,修改该配置项的值为任何你想使用的路径。在同一个目录下,打开zookeeper.properties文件,可以修改 dataDir 配置项的值。

稍后我们将会在本书中详细介绍Kafka的配置,但现在所需要做的配置仅此而已。需要注意的是,这些说的“日志”是Kafka和ZooKeeper的真实数据,并不是用于跟踪应用行为的应用层面的日志。应用日志位于Kafka安装目录的logs目录下。

6.2运行Kafka

Kafka启动很简单,由于ZooKeeper对于Kafka集群正确运行(ZooKeeper决定领导者代理、保存主题信息、对集群中各成员执行健康检查等)是必不可少的,因此在启动Kafka之前需要先启动ZooKeeper。

注意

从现在开始,所有对目录的引用均假设当前工作在Kafka安装目录下。如果使用的是Windows机器,目录是Kafka安装目录下的/bin/windows。

1.运行ZooKeeper

要启动ZooKeeper,打开命令提示符,输入以下命令:

bin/zookeeper-server-start.sh  config/zookeeper.properties

该命令执行后,在屏幕上会看到很多信息,但结尾会看到与图2-17所示类似的信息。

Kafka入门之旅

图2-17当ZooKeeper启动时,在控制台可以看到的输出信息

2.启动Kafka

打开另一个命令提示符,输入以下命令,启动Kafka:

bin/Kafka-server-start.sh  config/server.properties

同样,会在屏幕上看到滚动的文本。当Kafka完全启动时,会看到与图2-18所示类似的信息。

Kafka入门之旅

图2-18Kafka启动时的输出信息

提示

ZooKeeper对Kafka运行必不可少,因此在关闭时要调换顺序:先关闭Kafka,再关闭ZooKeeper。要关闭Kafka,可以在Kafka运行终端按下Ctrl+C,或在另一个终端执行 kafka-server-stop.sh 脚本。除了关闭脚本是 zookeeper-server-stop.sh ,关闭ZooKeeper的操作与关闭Kafka的操作相同。

6.3发送第一条消息

既然Kafka已启动并开始运行了,现在是时候使用Kafka来发送消息和接收消息了。但是,在发送消息前,需要先为生产者定义一个发送消息的主题。

1.第一个主题

在Kafka中创建一个主题很简单,仅需要运行一个带有一些配置参数的脚本。配置很简单,但是这些配置的设置有广泛的性能影响。

默认情况下,Kafka被配置为自动创建主题,这意味着如果尝试向一个不存在的主题发送或读取消息,那么Kafka代理就会创建一个主题(使用server.properties文件中的默认配置)。即使在开发中,依靠代理创建主题也不是一个好的做法,因为第一次尝试生产或消费会失败,这是由于需要时间来传播关于主题存在的元数据信息。需要确保总是主动地创建主题。

2.创建一个主题

要创建主题,需要运行kafka-topics.sh脚本。打开一个终端窗口,运行以下命令:

bin/kafka-topics.sh--create--topic first-topic--replication-factor1 
➥ --partitions1 --zookeeper localhost:2181

当脚本执行后,在终端控制台应该会看到类似如图2-19所示的信息。

Kafka入门之旅

图2-19这是创建主题的结果,事先创建主题很重要,可以提供特定主题的配置。否则,自动创建主题将使用默认配置或者server.properties文件中的配置

前面命令中的大多数配置标记的含义都显而易见,但还是让我们快速了解一下其中的两个配置。

    • replication-factor ——此标记确定领导者代理在集群中分发消息的副本数。在这种情况下,如果副本因子为 1 ,那么就不会复制,Kafka中保存的仅是原始消息。副本因子为 1 对于快速演示或者原型是可以的,但在实践中,几乎总是希望副本因子为 23, 以便在服务器发生故障时保证数据可用性。
    • partitions ——此标记用于指定主题将用到的分区数。同样,这里只有一个分区是可以的,但是如果想要更高的负载,当然就需要更多的分区。确定合适的分区数不是一门精确的科学。

3.发送一条消息

在Kafka中发送消息通常需要编写一个生产者客户端,但Kafka也自带了一个名为 kafka- console-producer 的方便脚本,允许从终端窗口发送消息。在这个例子中我们将使用控制台生产者,但是在2.4.1节中,我们已经介绍了如何使用 KafkaProducer

运行以下命令(图2-20中展示的也是)发送第一条消息:

# 假设在bin目录下运行该命令
./kafka-console-producer.sh--topic first-topic--broker-list localhost:9092

配置控制台生产者有几个选项,但这里我们仅使用必需的配置:消息送达的主题以及连接到Kafka的一个Kafka代理列表(对于本例,只是本地一台机器)。

启动控制台生产者是一个“阻塞脚本”,因此在执行前面的命令之后,输入一些文本并按回车键。可以发送你想要发送的任何数量的消息。但本例为了演示,可以输入一条消息“the quick brown fox jumped over the lazy dog.”,并按回车键,然后按Ctrl+C让生产者退出。

Kafka入门之旅

图2-20控制台生产者是用来快速测试配置和确保端到端功能的一个很好工具

4.读取一条消息

Kafka也提供了一个控制台消费者用来从命令行读取消息。控制台消费者类似于控制台生产者:一旦启动,将持续从主题中读取消息直到脚本被终止(通过Ctrl+C)。

运行以下命令,启动控制台消费者:

bin/kafka-console-consumer.sh--topic first-topic
➥ --bootstrap-server localhost:9092 --from-beginning

在启动控制台消费者之后,在终端控制台可以看到与图2-21所示类似的信息。

Kafka入门之旅

图2-21控制台消费者是一个方便的工具,可以快速地感知数据

是否正在流动以及消息是否包含预期的信息

--from-beginning 参数指定将会收到来自那个主题的任何未被删除的消息。控制台消费者还没有提交偏移量,因此若没有设置 --from-beginning ,那么只会收到控制台消费者启动之后所发送的消息。

我们已完成了Kafka的旋风之旅,并生产和消费了第一条消息。如果你还没有阅读本章第一部分,现在是时候回到本章起始处去学习Kafka工作原理的细节。

7小结

    • Kafka是一个消息代理,它接收消息并以一种简单快速的方式存储它们,以响应消费者的请求。消息从不会推送到消费者,Kafka中的消息保留完全独立于消息被消费的时间和频率。
    • Kafka使用分区来实现高吞吐量,并提供一种按键分组并保证相同键的消息有序的方法。
    • 生产者用来向Kafka发送消息。
    • 空键意味着以轮询的方式分配分区,否则,生产者使用键的散列值与分区总数取模的方法分配分区。
    • 消费者用来从Kafka读取消息。
    • 尝试均匀地将主题分区分配给一个消费者组中的消费者。

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Perl高效编程

Perl高效编程

霍尔 / 胜春、王晖、张东亮、蒋永清 / 人民邮电出版社 / 2011-5 / 65.00元

《Perl高效编程(第2版)》,本书是Perl编程领域的“圣经级”著作。它提供了一百多个详实的应用案例,足以涵盖编程过程中经常遇到的方方面面,由此详细阐释出各种高效且简洁的写法。一起来看看 《Perl高效编程》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

URL 编码/解码
URL 编码/解码

URL 编码/解码