学习kafka教程(三)

栏目: 后端 · 发布时间: 5年前

内容简介:欢迎关注公众号:n平方本文主要介绍【Kafka流通过构建Kafka生产者和消费者库,并利用Kafka的本地功能来提供数据并行性、分布式协调、容错和操作简单性,从而简化了应用程序开发。

欢迎关注公众号:n平方

本文主要介绍【 Kafka Streams的架构和使用

目标

  • 了解kafka streams的架构。
  • 掌握kafka streams编程。

架构分析

总体

Kafka流通过构建Kafka生产者和消费者库,并利用Kafka的本地功能来提供数据并行性、分布式协调、容错和操作简单性,从而简化了应用程序开发。

下图展示了一个使用Kafka Streams库的应用程序的结构。

学习kafka教程(三)

流分区和任务

Kafka的消息传递层对数据进行分区,以存储和传输数据。Kafka流划分数据进行处理。在这两种情况下,这种分区都支持数据局部性、灵活性、可伸缩性、高性能和容错性。Kafka流使用分区和任务的概念作为基于Kafka主题分区的并行模型的逻辑单元。Kafka流与Kafka在并行性上下文中有着紧密的联系:

  • 每个流分区都是一个完全有序的数据记录序列,并映射到Kafka主题分区。
  • 流中的数据记录映射到来自该主题的Kafka消息。
  • 数据记录的键值决定了Kafka流和Kafka流中数据的分区,即,如何将数据路由到主题中的特定分区。

应用程序的处理器拓扑通过将其分解为多个任务进行扩展。

更具体地说,Kafka流基于应用程序的输入流分区创建固定数量的任务,每个任务分配一个来自输入流的分区列表(例如,kafka的topic)。分配给任务的分区永远不会改变,因此每个任务都是应用程序并行性的固定单元。

然后,任务可以基于分配的分区实例化自己的处理器拓扑;它们还为每个分配的分区维护一个缓冲区,并从这些记录缓冲区一次处理一条消息。

因此,流任务可以独立并行地处理,而无需人工干预。

理解Kafka流不是一个资源管理器,而是一个“运行”其流处理应用程序运行的任何地方的库。应用程序的多个实例要么在同一台机器上执行,要么分布在多台机器上,库可以自动将任务分配给运行应用程序实例的那些实例。分配给任务的分区从未改变;如果应用程序实例失败,它分配的所有任务将在其他实例上自动重新启动,并继续从相同的流分区使用。

下图显示了两个任务,每个任务分配一个输入流分区。

学习kafka教程(三)

线程模型

Kafka流允许用户配置库用于在应用程序实例中并行处理的线程数。每个线程可以独立地使用其处理器拓扑执行一个或多个任务。

例如,下图显示了一个流线程运行两个流任务。

学习kafka教程(三)

启动更多的流线程或应用程序实例仅仅相当于复制拓扑并让它处理Kafka分区的不同子集,从而有效地并行处理。值得注意的是,线程之间不存在共享状态,因此不需要线程间的协调。这使得跨应用程序实例和线程并行运行拓扑变得非常简单。Kafka主题分区在各种流线程之间的分配是由Kafka流利用Kafka的协调功能透明地处理的。

如上所述,使用Kafka流扩展您的流处理应用程序很容易:您只需要启动应用程序的其他实例,Kafka流负责在应用程序实例中运行的任务之间分配分区。您可以启动与输入Kafka主题分区一样多的应用程序线程,以便在应用程序的所有运行实例中,每个线程(或者更确切地说,它运行的任务)至少有一个输入分区要处理。

本地状态存储

Kafka流提供了所谓的状态存储,流处理应用程序可以使用它来存储和查询数据,这是实现有状态操作时的一项重要功能。例如,Kafka Streams DSL在调用有状态操作符(如join()或aggregate())或打开流窗口时自动创建和管理这样的状态存储。

Kafka Streams应用程序中的每个流任务都可以嵌入一个或多个本地状态存储,这些存储可以通过api访问,以存储和查询处理所需的数据。Kafka流为这种本地状态存储提供容错和自动恢复功能。

下图显示了两个流任务及其专用的本地状态存储。

学习kafka教程(三)

容错

Kafka流构建于Kafka中本地集成的容错功能之上。Kafka分区是高度可用和复制的;因此,当流数据持久化到Kafka时,即使应用程序失败并需要重新处理它,流数据也是可用的。Kafka流中的任务利用Kafka消费者客户端提供的容错功能来处理失败。如果任务在失败的机器上运行,Kafka流将自动在应用程序的一个剩余运行实例中重新启动该任务。

此外,Kafka流还确保本地状态存储对于故障也是健壮的。对于每个状态存储,它维护一个复制的changelog Kafka主题,其中跟踪任何状态更新。这些变更日志主题也被分区,这样每个本地状态存储实例,以及访问该存储的任务,都有自己专用的变更日志主题分区。在changelog主题上启用了日志压缩,这样可以安全地清除旧数据,防止主题无限增长。如果任务在一台失败的机器上运行,并在另一台机器上重新启动,Kafka流通过在恢复对新启动的任务的处理之前重播相应的更改日志主题,确保在失败之前将其关联的状态存储恢复到内容。因此,故障处理对最终用户是完全透明的。

编程实例

管道(输入输出)实例

就是控制台输入到kafka中,经过处理输出。

package com.example.kafkastreams.demo;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class PipeDemo {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();

        builder.stream("streams-plaintext-input").to("streams-pipe-output");

        final Topology topology = builder.build();

        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }

}

分词实例

就是将你输入的字符串进行分词输出。

package com.example.kafkastreams.demo;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class LineSplitDemo {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
                .to("streams-linesplit-output");

        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);


        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);

    }
}

词汇统计实例

将你输入的字符串进行按单词统计输出。

package com.example.kafkastreams.demo;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class WordCountDemo {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
                .groupBy((key, value) -> value)
                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
                .toStream()
                .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);


        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

最后

本人水平有限,欢迎各位建议以及指正。顺便关注一下公众号呗,会经常更新文章的哦。

学习kafka教程(三)


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Design systems

Design systems

Not all design systems are equally effective. Some can generate coherent user experiences, others produce confusing patchwork designs. Some inspire teams to contribute to them, others are neglected. S......一起来看看 《Design systems》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具