Flink实践 | DataStream Connectors之Kafka

栏目: 编程工具 · 发布时间: 6年前

内容简介:孙金城,淘宝花名"金竹",Apache Flink Committer,阿里巴巴高级技术专家。目前就职于阿里巴巴计算平台事业部,自2015年以来一直投入于基于Apache Flink的新一代大数据计算平台实时计算的设计研发工作。本文主要介绍 Kafka 在 Apache Flink 中的使用,以一个简单的示例,向大家介绍在 Apache Flink 中如何使用 Kafka。

Flink实践 | DataStream Connectors之Kafka

孙金城,淘宝花名"金竹",Apache Flink Committer,阿里巴巴高级技术专家。目前就职于阿里巴巴计算平台事业部,自2015年以来一直投入于基于Apache Flink的新一代大数据计算平台实时计算的设计研发工作。

本文主要介绍 Kafka 在 Apache Flink 中的使用,以一个简单的示例,向大家介绍在 Apache Flink 中如何使用 Kafka。

1.Kafka 简介

Apache Kafka 是一个分布式发布-订阅消息传递系统。 它最初由 LinkedIn 公司开发,LinkedIn 于 2010 年贡献给了 Apache 基金会并成为顶级开源项目。Kafka 用于构建实时数据管道和流式应用程序。它具有水平扩展性、容错性、极快的速度,目前也得到了广泛的应用。

Kafka 不但是分布式消息系统而且也支持流式计算,所以在介绍 Kafka 在 Apache Flink 中的应用之前,先以一个 Kafka 的简单示例直观了解什么是 Kafka。

1.1 安装

本篇不是系统的,详尽的介绍 Kafka,而是想让大家直观认识 Kafka,以便在 Apahe Flink 中进行很好的应用,所以我们以最简单的方式安装 Kafka。

  • 下载二进制包

curl -L -O http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
  • 解压安装

Kafka 安装只需要将下载的 tgz 解压即可,如下:

jincheng:kafka jincheng.sunjc$ tar -zxf kafka_2.11-2.1.0.tgz 
jincheng:kafka jincheng.sunjc$ cd kafka_2.11-2.1.0
jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ ls
LICENSE NOTICE bin config libs site-docs

其中 bin 包含了所有 Kafka 的管理命令,如接下来我们要启动的 Kafka 的 Server。

  • 启动 Kafka Server

Kafka 是一个发布订阅系统,消息订阅首先要有个服务存在。我们启动一个 Kafka Server 实例。 Kafka 需要使用 ZooKeeper,要进行投产部署我们需要安装 ZooKeeper 集群,这不在本篇的介绍范围内,所以我们利用 Kafka 提供的脚本,安装一个只有一个节点的 ZooKeeper实例。如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/zookeeper-server-start.sh config/zookeeper.properties &

[2019-01-13 09:06:19,985] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
....
....
[2019-01-13 09:06:20,061] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

启动之后,ZooKeeper 会绑定 2181 端口(默认)。接下来我们启动 Kafka Server,如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-server-start.sh config/server.properties
[2019-01-13 09:09:16,937] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2019-01-13 09:09:17,267] INFO starting (kafka.server.KafkaServer)
[2019-01-13 09:09:17,267] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2019-01-13 09:09:17,284] INFO [ZooKeeperClient] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
...
...
[2019-01-13 09:09:18,253] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

如果上面一切顺利,Kafka 的安装就完成了。

1.2 创建 Topic

Kafka 是消息订阅系统,首先创建可以被订阅的 Topic,我们创建一个名为 flink-tipic 的Topic,在一个新的 terminal 中,执行如下命令:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic

Created topic "flink-tipic".

在 Kafka Server 的 terminal 中也会输出如下成功创建信息:

...
[2019-01-13 09:13:31,156] INFO Created log for partition flink-tipic-0 in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version -> 2.1-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
...

上面显示了 flink-topic 的基本属性配置,如消息压缩方式,消息格式,备份数量等等。

除了看日志,我们可以用命令显示的查询我们是否成功的创建了 flink-topic,如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --list --zookeeper localhost:2181

flink-tipic

如果输出 flink-tipic ,那么说明我们的 Topic 成功创建了。

那么 Topic 是保存在哪里?Kafka 是怎样进行消息的发布和订阅的呢?为直观,我们看如下 Kafka 架构示意图简单理解一下:

Flink实践 | DataStream Connectors之Kafka

简单介绍一下,Kafka 利用 ZooKeeper 来存储集群信息,也就是上面我们启动的 Kafka Server 实例,一个集群中可以有多个 Kafka Server 实例,Kafka Server 叫做 Broker,我们创建的 Topic可以在一个或多个 Broker 中。Kafka 利用 Push 模式发送消息,利用 Pull 方式拉取消息。

1.3 发送消息

如何向已经存在的 Topic 中发送消息呢,当然我们可以 API 的方式编写代码发送消息。同时,还可以利用命令方式来便捷的发送消息,如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-topic
>Kafka test msg
>Kafka connector

上面我们发送了两条消息 Kafka test msg   和  Kafka connector  到   flink-topic   Topic中。

1.4 读取消息

如果读取指定 Topic 的消息呢?同样可以 API 和命令两种方式都可以完成,我们以命令方式读取 flink-topic 的消息,如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink-topic --from-beginning
Kafka test msg
Kafka connector

其中 --from-beginning   描述了我们从 Topic 开始位置读取消息。

2.Flink Kafka Connector

前面我们以最简单的方式安装了 Kafka 环境,那么我们以上面的环境介绍 Flink Kafka Connector 的使用。

Apache Flink 中提供了多个版本的 Kafka Connector,本篇以 Flink-1.7.0 版本为例进行介绍。

2.1 mvn 依赖

要使用 Kakfa Connector 需要在我们的 pom 中增加对 Kafka Connector 的依赖,如下:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.7.0</version>
</dependency>

Flink Kafka Consumer 需要知道如何将 Kafka 中的二进制数据转换为 Java / Scala 对象。   DeserializationSchema 允许用户指定这样的模式。 为每个 Kafka 消息调用 T deserialize(byte [] message)方法,从 Kafka 传递值。

2.2 Examples

我们示例读取 Kafka 的数据,再将数据做简单处理之后写入到 Kafka 中。我们需要再创建一个用于写入的 Topic,如下:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-topic-output

所以示例中我们 Source 利用 flink-topic ,Sink用 flink-topic-output

2.2.1 Simple ETL

我们假设 Kafka 中存储的就是一个简单的字符串,所以我们需要一个用于对字符串进行 serialize deserialize 的实现,也就是我们要定义一个实现 DeserializationSchema SerializationSchema   的序列化和反序列化的类。因为我们示例中是字符串,所以我们自定义一个 KafkaMsgSchema 实现类,然后在编写 Flink 主程序。

  • KafkaMsgSchema - 完整代码

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;

public class KafkaMsgSchema implements DeserializationSchema<String>, SerializationSchema<String> {
private static final long serialVersionUID = 1L;
private transient Charset charset;

public KafkaMsgSchema() {
// 默认UTF-8编码
this(Charset.forName("UTF-8"));
}

public KafkaMsgSchema(Charset charset) {
this.charset = Preconditions.checkNotNull(charset);
}

public Charset getCharset() {
return this.charset;
}

public String deserialize(byte[] message) {
// 将Kafka的消息反序列化为 java 对象
return new String(message, charset);
}

public boolean isEndOfStream(String nextElement) {
// 流永远不结束
return false;
}

public byte[] serialize(String element) {
// 将java对象序列化为Kafka的消息
return element.getBytes(this.charset);
}

public TypeInformation<String> getProducedType() {
// 定义产生的数据Typeinfo
return BasicTypeInfo.STRING_TYPE_INFO;
}

private void writeObject(ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
out.writeUTF(this.charset.name());
}

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
String charsetName = in.readUTF();
this.charset = Charset.forName(charsetName);
}
}
  • 主程序 - 完整代码

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;

import java.util.Properties;

public class KafkaExample {
public static void main(String[] args) throws Exception {
// 用户参数获取
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
// Stream 环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Source的topic
String sourceTopic = "flink-topic";
// Sink的topic
String sinkTopic = "flink-topic-output";
// broker 地址
String broker = "localhost:9092";

// 属性参数 - 实际投产可以在命令行传入
Properties p = parameterTool.getProperties();
p.putAll(parameterTool.getProperties());
p.put("bootstrap.servers", broker);

env.getConfig().setGlobalJobParameters(parameterTool);

// 创建消费者
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(
sourceTopic,
new KafkaMsgSchema(),
p);
// 设置读取最早的数据
// consumer.setStartFromEarliest();

// 读取Kafka消息
DataStream<String> input = env.addSource(consumer);


// 数据处理
DataStream<String> result = input.map(new MapFunction<String, String>() {
public String map(String s) throws Exception {
String msg = "Flink study ".concat(s);
System.out.println(msg);
return msg;
}
});

// 创建生产者
FlinkKafkaProducer producer = new FlinkKafkaProducer<String>(
sinkTopic,
new KeyedSerializationSchemaWrapper<String>(new KafkaMsgSchema()),
p,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);

// 将数据写入Kafka指定Topic中
result.addSink(producer);

// 执行job
env.execute("Kafka Example");
}
}

运行主程序如下:

Flink实践 | DataStream Connectors之Kafka

我测试操作的过程如下:

  • 启动 flink-topic flink-topic-output 的消费拉取;

  • 通过命令向 flink-topic 中添加测试消息 only for test ;

  • 通过命令打印验证添加的测试消息   only for test ;

  • 最简单的 FlinkJob   source->map->sink   对测试消息进行 map 处理: "Flink study ".concat(s) ;

  • 通过命令打印 sink 的数据;

2.2.2 内置 Schemas

Apache Flink 内部提供了如下 3 种内置的常用消息格式的 Schemas:

  • TypeInformationSerializationSchema (and TypeInformationKeyValueSerializationSchema) 它基于 Flink 的 TypeInformation 创建模式。 如果数据由 Flink 写入和读取,这将非常有用。

  • JsonDeserializationSchema (and JSONKeyValueDeserializationSchema)   它将序列化的JSON转换为 ObjectNode 对象,可以使用 objectNode.get(“field”)作为(Int / String / ...)()从中访问字段。 KeyValue objectNode 包含“key”和“value”字段,其中包含所有字段以及可选的"metadata"字段,该字段公开此消息的偏移量/分区/主题。

  • AvroDeserializationSchema   它使用静态提供的模式读取使用 Avro 格式序列化的数据。 它可以从 Avro 生成的类(AvroDeserializationSchema.forSpecific(...))推断出模式,或者它可以与 GenericRecords 一起使用手动提供的模式(使用AvroDeserializationSchema.forGeneric(...))

要使用内置的 Schemas 需要添加如下依赖:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.7.0</version>
</dependency>

2.2.3 读取位置配置

我们在消费 Kafka 数据时候,可能需要指定消费的位置, FlinkKafkaConsumer 提供很多便利的位置设置,如下:

  • consumer.setStartFromEarliest() - 从最早的记录开始;

  • consumer.setStartFromLatest() - 从最新记录开始;

  • consumer.setStartFromTimestamp(...); // 从指定的 epoch 时间戳(毫秒)开始;

  • consumer.setStartFromGroupOffsets(); // 默认行为,从上次消费的偏移量进行继续消费。

上面的位置指定可以精确到每个分区,比如如下代码:

Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L); // 第一个分区从23L开始
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);// 第二个分区从31L开始
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);// 第三个分区从43L开始

consumer.setStartFromSpecificOffsets(specificStartOffsets);

对于没有指定的分区还是默认的 setStartFromGroupOffsets 方式。

2.2.4 Topic 发现

Kafka 支持 Topic 自动发现,也就是用正则的方式创建 FlinkKafkaConsumer ,比如:

// 创建消费者
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>( java.util.regex.Pattern.compile(sourceTopic.concat("-[0-9]")),
new KafkaMsgSchema(),
p);

在上面的示例中,当作业开始运行时,消费者将订阅名称与指定正则表达式匹配的所有Topic(以 sourceTopic 的值开头并以单个数字结尾)。

2.3 定义 Watermark(Window)

对 Kafka Connector 的应用不仅限于上面的简单数据提取,我们更多时候是期望对 Kafka 数据进行 Event-time 的窗口操作,那么就需要在 Flink Kafka Source 中定义 Watermark。

要定义 Event-time,首先是 Kafka 数据里面携带时间属性,假设我们数据是 String#Long 的格式,如 only for test#1000 。那么我们将 Long 作为时间列。

  • KafkaWithTsMsgSchema - 完整代码

要想解析上面的 Kafka 的数据格式,我们需要开发一个自定义的 Schema,比如叫 KafkaWithTsMsgSchema ,将 String#Long 解析为一个 Java 的 Tuple2<String, Long> ,完整代码如下:

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;

public class KafkaWithTsMsgSchema implements DeserializationSchema<Tuple2<String, Long>>, SerializationSchema<Tuple2<String, Long>> {
private static final long serialVersionUID = 1L;
private transient Charset charset;

public KafkaWithTsMsgSchema() {
this(Charset.forName("UTF-8"));
}

public KafkaWithTsMsgSchema(Charset charset) {
this.charset = Preconditions.checkNotNull(charset);
}

public Charset getCharset() {
return this.charset;
}

public Tuple2<String, Long> deserialize(byte[] message) {
String msg = new String(message, charset);
String[] dataAndTs = msg.split("#");
if(dataAndTs.length == 2){
return new Tuple2<String, Long>(dataAndTs[0], Long.parseLong(dataAndTs[1].trim()));
}else{
// 实际生产上需要抛出runtime异常
System.out.println("Fail due to invalid msg format.. ["+msg+"]");
return new Tuple2<String, Long>(msg, 0L);
}
}

@Override
public boolean isEndOfStream(Tuple2<String, Long> stringLongTuple2) {
return false;
}

public byte[] serialize(Tuple2<String, Long> element) {
return "MAX - ".concat(element.f0).concat("#").concat(String.valueOf(element.f1)).getBytes(this.charset);
}

private void writeObject(ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
out.writeUTF(this.charset.name());
}

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
String charsetName = in.readUTF();
this.charset = Charset.forName(charsetName);
}

@Override
public TypeInformation<Tuple2<String, Long>> getProducedType() {
return new TupleTypeInfo<Tuple2<String, Long>>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
}
}
  • Watermark 生成

提取时间戳和创建 Watermark,需要实现一个自定义的时间提取和 Watermark 生成器。在Apache Flink 内部有 2 种方式如下:

  • AssignerWithPunctuatedWatermarks - 每条记录都产生Watermark。

  • AssignerWithPeriodicWatermarks - 周期性的生成Watermark。

我们以 AssignerWithPunctuatedWatermarks 为例写一个自定义的时间提取和 Watermark 生成器。代码如下:

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;

import javax.annotation.Nullable;

public class KafkaAssignerWithPunctuatedWatermarks
implements AssignerWithPunctuatedWatermarks<Tuple2<String, Long>>
{
@Nullable
@Override
public Watermark checkAndGetNextWatermark(Tuple2<String, Long> o, long l) {
// 利用提取的时间戳创建Watermark
return new Watermark(l);
}

@Override
public long extractTimestamp(Tuple2<String, Long> o, long l) {
// 提取时间戳
return o.f1;
}
}
  • 主程序 - 完整程序

我们计算一个大小为 1 秒的 Tumble 窗口,计算窗口内最大的值。完整的程序如下:

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;

import java.util.Properties;

public class KafkaWithEventTimeExample {
public static void main(String[] args) throws Exception {
// 用户参数获取
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
// Stream 环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置 Event-time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// Source的topic
String sourceTopic = "flink-topic";
// Sink的topic
String sinkTopic = "flink-topic-output";
// broker 地址
String broker = "localhost:9092";

// 属性参数 - 实际投产可以在命令行传入
Properties p = parameterTool.getProperties();
p.putAll(parameterTool.getProperties());
p.put("bootstrap.servers", broker);

env.getConfig().setGlobalJobParameters(parameterTool);
// 创建消费者
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<Tuple2<String, Long>>(
sourceTopic,
new KafkaWithTsMsgSchema(),
p);

// 读取Kafka消息
TypeInformation<Tuple2<String, Long>> typeInformation = new TupleTypeInfo<Tuple2<String, Long>>(
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);

DataStream<Tuple2<String, Long>> input = env
.addSource(consumer).returns(typeInformation)
// 提取时间戳,并生产Watermark
.assignTimestampsAndWatermarks(new KafkaAssignerWithPunctuatedWatermarks());

// 数据处理
DataStream<Tuple2<String, Long>> result = input
.windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
.max(0);

// 创建生产者
FlinkKafkaProducer producer = new FlinkKafkaProducer<Tuple2<String, Long>>(
sinkTopic,
new KeyedSerializationSchemaWrapper<Tuple2<String, Long>>(new KafkaWithTsMsgSchema()),
p,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);

// 将数据写入Kafka指定Topic中
result.addSink(producer);

// 执行job
env.execute("Kafka With Event-time Example");
}
}

测试运行如下:

Flink实践 | DataStream Connectors之Kafka

简单解释一下,我们输入数如下:

Flink实践 | DataStream Connectors之Kafka

我们看的 5000000~7000000 之间的数据,其中 B#5000000C#5000100 E#5000120 是同一个窗口的内容。计算 MAX 值,按字符串比较,最大的消息就是输出的 E#5000120

2.4 Kafka 携带 Timestamps

在 Kafka-0.10+ 消息可以携带 timestamps,也就是说不用单独的在 msg 中显示添加一个数据列作为 timestamps。只有在写入和读取都用 Flink 时候简单一些。一般情况用上面的示例方式已经足够了。

3.小结

本篇重点是向大家介绍 Kafka 如何在 Flink 中进行应用,开篇介绍了 Kafka 的简单安装和收发消息的命令演示,然后以一个简单的数据提取和一个 Event-time 的窗口示例让大家直观的感受如何在 Apache Flink 中使用 Kafka。

Flink实践 | DataStream Connectors之Kafka


以上所述就是小编给大家介绍的《Flink实践 | DataStream Connectors之Kafka》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Sovereign Individual

The Sovereign Individual

James Dale Davidson、William Rees-Mogg / Free Press / 1999-08-26 / USD 16.00

Two renowned investment advisors and authors of the bestseller The Great Reckoning bring to light both currents of disaster and the potential for prosperity and renewal in the face of radical changes ......一起来看看 《The Sovereign Individual》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试