《从0到1学习Flink》—— Flink 写入数据到 Kafka

栏目: 服务器 · Apache · 发布时间: 6年前

内容简介:之前文章

《从0到1学习Flink》—— Flink 写入数据到 Kafka

之前文章 《从0到1学习Flink》—— Flink 写入数据到 ElasticSearch 写了如何将 Kafka 中的数据存储到 ElasticSearch 中,里面其实就已经用到了 Flink 自带的 Kafka source connector(FlinkKafkaConsumer)。存入到 ES 只是其中一种情况,那么如果我们有多个地方需要这份通过 Flink 转换后的数据,是不是又要我们继续写个 sink 的插件呢?确实,所以 Flink 里面就默认支持了不少 sink,比如也支持 Kafka sink connector(FlinkKafkaProducer),那么这篇文章我们就讲讲如何将数据写入到 Kafka。

《从0到1学习Flink》—— Flink 写入数据到 Kafka

准备

添加依赖

Flink 里面支持 Kafka 0.8、0.9、0.10、0.11 ,以后有时间可以分析下源码的实现。

《从0到1学习Flink》—— Flink 写入数据到 Kafka

这里我们需要安装下 Kafka,请对应添加对应的 Flink Kafka connector 依赖的版本,这里我们使用的是 0.11 版本:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

Kafka 安装

这里就不写这块内容了,可以参考我以前的文章Kafka 安装及快速入门。

这里我们演示把其他 Kafka 集群中 topic 数据原样写入到自己本地起的 Kafka 中去。

配置文件

kafka.brokers=xxx:9092,xxx:9092,xxx:9092
kafka.group.id=metrics-group-test
kafka.zookeeper.connect=xxx:2181
metrics.topic=xxx
stream.parallelism=5
kafka.sink.brokers=localhost:9092
kafka.sink.topic=metric-test
stream.checkpoint.interval=1000
stream.checkpoint.enable=false
stream.sink.parallelism=5

目前我们先看下本地 Kafka 是否有这个 metric-test topic 呢?需要执行下这个命令:

bin/kafka-topics.sh --list --zookeeper localhost:2181

《从0到1学习Flink》—— Flink 写入数据到 Kafka

可以看到本地的 Kafka 是没有任何 topic 的,如果等下我们的程序运行起来后,再次执行这个命令出现 metric-test topic,那么证明我的程序确实起作用了,已经将其他集群的 Kafka 数据写入到本地 Kafka 了。

程序代码

Main.java

public class Main {
    public static void main(String[] args) throws Exception{
        final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
        StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
        DataStreamSource<Metrics> data = KafkaConfigUtil.buildSource(env);

        data.addSink(new FlinkKafkaProducer011<Metrics>(
                parameterTool.get("kafka.sink.brokers"),
                parameterTool.get("kafka.sink.topic"),
                new MetricSchema()
                )).name("flink-connectors-kafka")
                .setParallelism(parameterTool.getInt("stream.sink.parallelism"));

        env.execute("flink learning connectors kafka");
    }
}

运行结果

启动程序,查看运行结果,不段执行上面命令,查看是否有新的 topic 出来:

《从0到1学习Flink》—— Flink 写入数据到 Kafka

执行命令可以查看该 topic 的信息:

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic metric-test

《从0到1学习Flink》—— Flink 写入数据到 Kafka

分析

上面代码我们使用 Flink Kafka Producer 只传了三个参数:brokerList、topicId、serializationSchema(序列化)

《从0到1学习Flink》—— Flink 写入数据到 Kafka

其实也可以传入多个参数进去,现在有的参数用的是默认参数,因为这个内容比较多,后面可以抽出一篇文章单独来讲。

总结

本篇文章写了 Flink 读取其他 Kafka 集群的数据,然后写入到本地的 Kafka 上。我在 Flink 这层没做什么数据转换,只是原样的将数据转发了下,如果你们有什么其他的需求,是可以在 Flink 这层将数据进行各种转换操作,比如这篇文章中的一些转换: 《从0到1学习Flink》—— Flink Data transformation(转换) ,然后将转换后的数据发到 Kafka 上去。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

数字化生存

数字化生存

(美)Nicholas Negroponte(尼古拉·尼葛洛庞帝) / 胡泳、范海燕 / 电子工业出版社 / 2017-1-1 / 68.00

《数字化生存》描绘了数字科技为我们的生活、工作、教育和娱乐带来的各种冲击和其中值得深思的问题,是跨入数字化新世界的*指南。英文版曾高居《纽约时报》畅销书排行榜。 “信息的DNA”正在迅速取代原子而成为人类生活中的基本交换物。尼葛洛庞帝向我们展示出这一变化的巨大影响。电视机与计算机屏幕的差别变得只是大小不同而已。从前所说的“大众”传媒正演变成个人化的双向交流。信息不再被“推给”消费者,相反,人们或他......一起来看看 《数字化生存》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

在线进制转换器
在线进制转换器

各进制数互转换器

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换