使用High Level Consumer连接Kafka

栏目: IT技术 · 发布时间: 6年前

内容简介:使用High Level Consumer连接Kafka

原文地址:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

为什么要使用High Level Consumer

通常情况下,从kafka读取消息的时候,开发者并不关心消息的offset,而只是想简单的获得数据而已。而High Level Consumer将大部分具体的操作都封装了起来,开发者可以很简单的从kafka读取消息。

对于High Level Consumer,首先要知道的就是它将每一个分区所对应的offset信息保存在了ZooKeeper中。这个offset所对应的结点名称就是进程在连接到kafka的时候所提供的名称。所以,这个名称也对应了一个Consumer组。

Consumer的名称是会一直保持在Kafka集群中的,所以,在开始新的代码之前,你必须确保旧的Consumer已经停止了(即进程终止)。如果没有关闭的话,当新的进程,也就是新的Consumer启动的时候,因为两个Consumer的名称相同,所以Kafka会把新进程的消费线程合并到已有的Consumer中,然后触发rebalance(负载均衡)操作。在rebalance中,Kafka会把可用的分区分配给可用的线程,因此,很可能将某些分区分配给了其他的进程。如果你没有关闭旧的进程,导致新旧进程同时存在,那么很有可能一部分的消息会流入到旧的进程中去。

High Level Consumer

使用High Level Consumer所需要知道的第一件事就是:它可以成为(也应该成为)一个多线程的应用。对于多线程的设计,主要是以Kafka中,对应Topic的分区数量来决定的。其中有这么几条规则:

  • 如果线程数多于分区数,那么,一部分线程永远不会接受到消息。
  • 如果线程数少于分区数,那么,一部分线程会接受到来自多个分区的消息。
  • 如果某一个线程接受了多个分区的消息,那么消息的接受顺序是无法确定的,因为kafka只能够保证同一个分区内的消息是按kafka接受到的先后顺序排列的。例如,你可能接受到5个来自分区10消息,6个来自分区11的消息,然后再接受到5个来自分区10的消息。这个时候,即使分区11仍然有未读取的数据,这个时候你依然有可能再接受到5个来自分区10的消息。
  • 如果动态的添加进程或者线程,会触发Kafka的rebalance操作,可能会改变已有的分区-线程分配方式。(即分区可能会被分配到其它的线程)

线程设计好了之后,你应当从Kafka获取到一个iterator。当没有新的消息的时候,这个iterate会阻塞当前线程。

下面是一个简单的多线程示例:

package com.test.groups;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

public class ConsumerTest implements Runnable {
    private KafkaStream m_stream;
    private int m_threadNumber;

    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
    }

    public void run() {
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
        while (it.hasNext())
            System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
        System.out.println("Shutting down Thread: " + m_threadNumber);
    }
}

其中,最重要的部分就是while (it.hasNext())这一部分。简单的来说,这段代码会持续的读取Kafka中的消息,直到被终止。

测试程序

和SimpleConsumer比起来,High Level Consumer已经封装了大部分的状态保存和错误处理逻辑。但是,你仍然需要设置一些路径来让Kafka存储部分运行信息。下面这个方法定义了创建High Level Consumer所需的一些基本属性:

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(props);
    }
  • ‘zookeeper.connect’用来定义Zookeeper的位置。Kafka使用Zookeeper来存储每一个Topic下每一个分区所对应的offset。
  • ‘group.id’是一个唯一的字符串id,用来代表当前的Consumer进程。
  • ‘zookeeper.session.timeout.ms’用来定义向Zookeeper发起请求(读或写)的超时时间。在这个时间内,Kafka不会尝试读取新的消息。
  • ‘zookeeper.sync.time.ms’代表Zookeeper中的Follower(跟随者)和Master之间所能够容忍的最大时间差。
  • ‘auto.commit.interval.ms’用来定义向Zookeeper中写入offset的频率。需要注意的是,提交的频率是根据时间来定义的,而不是根据读取消息的数量。如果在offset没有更新之前进程/线程发生了错误,重启之后,会重新接受到部分已读取的消息。

更多的设置信息可以从这里找到。

线程池

下面这个例子使用 Java 中的java.util.concurrent包来管理线程,这样就使得创建线程池的过程变得十分的简单。

public void run(int a_numThreads) {
    Map topicCountMap = new HashMap();
    topicCountMap.put(topic, new Integer(a_numThreads));
    Mapbyte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    Listbyte[], byte[]>> streams = consumerMap.get(topic);


    // now launch all the threads
    //
    executor = Executors.newFixedThreadPool(a_numThreads);

    // now create an object to consume the messages
    //
    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
        executor.submit(new ConsumerTest(stream, threadNumber));
        threadNumber++;
    }
}

首先,我们创建一个Map,用来定义Kafka中每一个Topic所对应的线程数。consumer.createMessageStreams就是用来传递这一信息的方法,返回的结果是一个Map,里面包含每一个Topic下所对应的KafkaStream。(注意到,在示例中我们只请求了一个Topic,但是实际上,我们可以同时请求多个Topic,只需要在Map中添加对应的元素即可。)最后,我们创建了线程池,并且给每个线程分配了一个ConsumerTest来处理业务逻辑。

终止和错误处理

Kafka并不是每次读取完信息后就像zookeeper更新offset信息,而是等待一小段时间后再更新。由于这个延迟,很有可能你的业务逻辑已经处理了一条消息,但是这个事件并没有同步到zookeeper中去。因此,如果进程终止或者崩溃了,当你下次重启进程的时候,你可能会发现这条消息被重新接受了。

同时,当一些Broker掉线了,或者一些其它的因素导致分区的leader更改了,这个时候可能会导致部分消息被重放。

为了更好的避免这种情况,就必须确保client正确的退出了,而不是直接强制终止它(kill -9)。

在示例中,main方法中sleep了10秒钟,使得在其它线程运行的consumer能够消费数据10秒钟。因为自动更新offset的选项已经开启,他们会每秒提交一次offset。接着,shutdown方法被调用了,它会首先调用consumer的shutdown方法,然后给ExecutorService发送终止信息,最后,它会尝试等待ExecutorService完成所有的工作。这就提供了consumer线程一定的时间去处理数据流中剩余的消息。一旦所有消息都处理完了,consumer会使得每一个数据流所对应的iterator的hasNext()返回false,然后对应的线程就可以友好的退出了。另外,开启自动更新offset的选项,调用consumer.shutdown()会使得Kafka自动提交最后一次的offset。

try {
    Thread.sleep(10000);
} catch (InterruptedException ie) {

}
example.shutdown();

在实际运用中,更加常用的方法是:不规定sleep的时间,而是通过信号处理或者其它方式来捕获进程终止信息(如ctrl+c)并调用到shutdown方法。

运行

这个示例代码需要如下几个命令行参数:

  • ZooKeeper的连接地址以及端口
  • Consumer的唯一名称
  • 接受消息的目标Topic
  • 接受消息的线程数

例如:

server01.myco.com1:2181 group3 myTopic  4

这个命令将通过地址server01.myco.com和端口2181来连接zookeeper,然后向目标myTopic的所有分区发起请求,并通过4个线程消费信息。这个Consumer的名称是group3.

完整代码


package com.test.groups;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConsumerGroupExample {
    private final ConsumerConnector consumer;
    private final String topic;
    private  ExecutorService executor;

    public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig(a_zookeeper, a_groupId));
        this.topic = a_topic;
    }

    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
        try {
            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
            }
        } catch (InterruptedException e) {
            System.out.println("Interrupted during shutdown, exiting uncleanly");
        }
   }

    public void run(int a_numThreads) {
        Map topicCountMap = new HashMap();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Mapbyte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        Listbyte[], byte[]>> streams = consumerMap.get(topic);

        // now launch all the threads
        //
        executor = Executors.newFixedThreadPool(a_numThreads);

        // now create an object to consume the messages
        //
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerTest(stream, threadNumber));
            threadNumber++;
        }
    }

    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");

        return new ConsumerConfig(props);
    }

    public static void main(String[] args) {
        String zooKeeper = args[0];
        String groupId = args[1];
        String topic = args[2];
        int threads = Integer.parseInt(args[3]);

        ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
        example.run(threads);

        try {
            Thread.sleep(10000);
        } catch (InterruptedException ie) {

        }
        example.shutdown();
    }
}

以上所述就是小编给大家介绍的《使用High Level Consumer连接Kafka》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

标签: 使用high level consumer连接kafka

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Web 2.0 Heroes

Web 2.0 Heroes

Bradley L. Jones / Wiley / 2008-04-14 / USD 24.99

Web 2.0 may be an elusive concept, but one thing is certain: using the Web as merely a means of retrieving and displaying information is history. Today?s Web is immediate, interactive, innovative. It ......一起来看看 《Web 2.0 Heroes》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换