Flink SQL 与 kafka 整合的那些事儿

栏目: 数据库 · 发布时间: 5年前

内容简介:flink与kafka整合是很常见的一种实时处理场景,尤其是kafka 0.11版本以后生产者支持了事务,使得flink与kafka整合能实现完整的端到端的仅一次处理,虽然这样会有checkpoint周期的数据延迟,但是这个仅一次处理也是很诱人的。可见的端到端的使用案例估计就是前段时间oppo的案例分享吧。关注浪尖微信公众号(bigdatatip)输入1.flink sql与kafka整合方式介绍flink SQL与kafka整合有多种方式,浪尖就在这里总结一下:

flink与kafka整合是很常见的一种实时处理场景,尤其是kafka 0.11版本以后生产者支持了事务,使得flink与kafka整合能实现完整的端到端的仅一次处理,虽然这样会有checkpoint周期的数据延迟,但是这个仅一次处理也是很诱人的。可见的端到端的使用案例估计就是前段时间oppo的案例分享吧。关注浪尖微信公众号(bigdatatip)输入 oppo 即可获得。

1.flink sql与kafka整合方式介绍

flink SQL与kafka整合有多种方式,浪尖就在这里总结一下:

1.datastream转table

通过addsource和addsink API,整合,生成Datastream后注册为表,然后 sql 分析。

主要接口有两种形式

<span><span>1.</span>直接注册为表</span>

<span>// register the DataStream <span>as</span> Table <span>&quot;myTable&quot;</span> <span>with</span> fields <span>&quot;f0&quot;</span>, <span>&quot;f1&quot;</span></span>

<span>tableEnv.registerDataStream(<span>&quot;myTable&quot;</span>, stream);</span>

<span><br /></span>

<span>// register the DataStream <span>as</span> table <span>&quot;myTable2&quot;</span> <span>with</span> fields <span>&quot;myLong&quot;</span>, <span>&quot;myString&quot;</span></span>

<span>tableEnv.registerDataStream(<span>&quot;myTable2&quot;</span>, stream, <span>&quot;myLong, myString&quot;</span>);</span>

<span><br /></span>

<span><span>2.</span>转换为table</span>

<span>DataStream&lt;Tuple2&lt;Long, String&gt;&gt; stream = ...</span>

<span><br /></span>

<span>// Convert the DataStream into a Table <span>with</span> default fields <span>&quot;f0&quot;</span>, <span>&quot;f1&quot;</span></span>

<span>Table table1 = tableEnv.fromDataStream(stream);</span>

<span><br /></span>

<span>// Convert the DataStream into a Table <span>with</span> fields <span>&quot;myLong&quot;</span>, <span>&quot;myString&quot;</span></span>

<span>Table table2 = tableEnv.fromDataStream(stream, <span>&quot;myLong, myString&quot;</span>);</span>

2.tablesource和tablesink

通过tablesource和tablesink接口,也可以直接注册为输入和输出表。

Kafka010JsonTableSource和Kafka010JsonTableSink

3.自定义catalog

通过自定义catalog的形式,这种类型暂时不讲后面会有视频教程放到知识星球里。

<span>ExternalCatalog catalog = <span>new</span> InMemoryExternalCatalog();</span>

<span><br /></span>

<span style="">// register the ExternalCatalog catalog</span>

<span>tableEnv.registerExternalCatalog(<span>&quot;InMemCatalog&quot;</span>, catalog);</span>

4.connector方式

这种方式是本文要讲明白的一种方式,其余的会陆续分享到知识星球内部。

这种方式目前仅仅支持kafka,es,和file。

2.案例讲解

直接上案例吧,然后再去讲一下细节问题。

import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.TableEnvironment;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.Json;import org.apache.flink.table.descriptors.Kafka;import org.apache.flink.table.descriptors.Rowtime;import org.apache.flink.table.descriptors.Schema;public class kafka2kafka {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        env.setParallelism(1);        StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);        tEnv.connect(                new Kafka()                        .version("0.10")                        //   "0.8", "0.9", "0.10", "0.11", and "universal"                        .topic("jsontest")                        .property("bootstrap.servers", "mt-mdh.local:9093")                        .property("group.id","test")                        .startFromLatest()        )                .withFormat(                        new Json()                                .failOnMissingField(false)                                .deriveSchema()                )                .withSchema(                        new Schema()                                .field("rowtime",Types.SQL_TIMESTAMP)                                .rowtime(new Rowtime()                                        .timestampsFromField("eventtime")                                        .watermarksPeriodicBounded(2000)                                )                                .field("fruit", Types.STRING)                                .field("number", Types.INT)                )                .inAppendMode()                .registerTableSource("source");        tEnv.connect(                new Kafka()                        .version("0.10")                        //   "0.8", "0.9", "0.10", "0.11", and "universal"                        .topic("test")                        .property("acks", "all")                        .property("retries", "0")                        .property("batch.size", "16384")                        .property("linger.ms", "10")                        .property("bootstrap.servers", "mt-mdh.local:9093")                        .sinkPartitionerFixed()        ).inAppendMode()                .withFormat(                        new Json().deriveSchema()                )                .withSchema(                        new Schema()                                .field("fruit", Types.STRING)                                .field("total", Types.INT)                                .field("time", Types.SQL_TIMESTAMP)                )                .registerTableSink("sink");        tEnv.sqlUpdate("insert into sink select fruit,sum(number),TUMBLE_END(rowtime, INTERVAL '5' SECOND) from source group by fruit,TUMBLE(rowtime, INTERVAL '5' SECOND)");        env.execute();    }}

这个例子是按照事件时间开窗,统计对fruit求和。从这个例子里可以看到要使用connector还是比较麻烦的,配置项目比较多,下面我们就拆分介绍一下。细节内容可以阅读官网(https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#type-strings)

1.配置数据源

<span>.connect(</span>

<span> <span>new</span> Kafka()</span>

<span> .version(<span>&quot;0.11&quot;</span>) <span>// required: valid connector versions are</span></span>

<span> <span>// &quot;0.8&quot;, &quot;0.9&quot;, &quot;0.10&quot;, &quot;0.11&quot;, and &quot;universal&quot;</span></span>

<span> .topic(<span>&quot;...&quot;</span>) <span>// required: topic name from which the table is read</span></span>

<span><br /></span>

<span> <span>// optional: connector specific properties</span></span>

<span> .property(<span>&quot;zookeeper.connect&quot;</span>, <span>&quot;localhost:2181&quot;</span>)</span>

<span> .property(<span>&quot;bootstrap.servers&quot;</span>, <span>&quot;localhost:9092&quot;</span>)</span>

<span> .property(<span>&quot;group.id&quot;</span>, <span>&quot;testGroup&quot;</span>)</span>

<span><br /></span>

<span> <span>// optional: select a startup mode for Kafka offsets</span></span>

<span> .startFromEarliest()</span>

<span> .startFromLatest()</span>

<span> .startFromSpecificOffsets(...)</span>

<span><br /></span>

<span> <span>// optional: output partitioning from Flink's partitions into Kafka's partitions</span></span>

<span> .sinkPartitionerFixed() <span>// each Flink partition ends up in at-most one Kafka partition (default)</span></span>

<span> .sinkPartitionerRoundRobin() <span>// a Flink partition is distributed to Kafka partitions round-robin</span></span>

<span> .sinkPartitionerCustom(MyCustom.class) <span>// use a custom FlinkKafkaPartitioner subclass</span></span>

<span>)</span>

2.数据的格式

目前支持CSV,JSON,AVRO三种格式。从json数据源里解析所需要的table字段,这个过程需要我们指定。总共有三种方式,如下:

<span>.withFormat(</span>

<span> <span>new</span> Json()</span>

<span> .failOnMissingField(<span>true</span>) <span>// optional: flag whether to fail if a field is missing or not, false by default</span></span>

<span><br /></span>

<span> <span>// required: define the schema either by using type information which parses numbers to corresponding types</span></span>

<span> .schema(Type.ROW(...))</span>

<span><br /></span>

<span> <span>// or by using a JSON schema which parses to DECIMAL and TIMESTAMP</span></span>

<span> .jsonSchema(</span>

<span> <span>&quot;{&quot;</span> +</span>

<span> <span>&quot; type: 'object',&quot;</span> +</span>

<span> <span>&quot; properties: {&quot;</span> +</span>

<span> <span>&quot; lon: {&quot;</span> +</span>

<span> <span>&quot; type: 'number'&quot;</span> +</span>

<span> <span>&quot; },&quot;</span> +</span>

<span> <span>&quot; rideTime: {&quot;</span> +</span>

<span> <span>&quot; type: 'string',&quot;</span> +</span>

<span> <span>&quot; format: 'date-time'&quot;</span> +</span>

<span> <span>&quot; }&quot;</span> +</span>

<span> <span>&quot; }&quot;</span> +</span>

<span> <span>&quot;}&quot;</span></span>

<span> )</span>

<span><br /></span>

<span> <span>// or use the table's schema</span></span>

<span> .deriveSchema()</span>

<span>)</span>

其实,最常用的是第三种,直接从我们指定的schema里逆推。

3.schema信息

除了配置schema信息之外,还可以配置时间相关的概念。

<span>.withSchema(</span>

<span> <span>new</span> Schema()</span>

<span> .field(<span>&quot;MyField1&quot;</span>, Types.SQL_TIMESTAMP)</span>

<span> .proctime() <span>// optional: declares this field as a processing-time attribute</span></span>

<span> .field(<span>&quot;MyField2&quot;</span>, Types.SQL_TIMESTAMP)</span>

<span> .rowtime(...) <span>// optional: declares this field as a event-time attribute</span></span>

<span> .field(<span>&quot;MyField3&quot;</span>, Types.BOOLEAN)</span>

<span> .<span>from</span>(<span>&quot;mf3&quot;</span>) <span>// optional: original field in the input that is referenced/aliased by this field</span></span>

<span>)</span>

4.输出的更新模式

更新模式有append模式,retract模式,update模式。

.connect(...)
.inAppendMode() // otherwise: inUpsertMode() or inRetractMode()

5.时间相关配置

在配置schema信息的时候可以配置时间相关的概念,比如事件时间,处理时间,还可以配置watermark相关的,甚至是自定义watermark。

对于事件时间,时间戳抽取支持:

<span><span>// Converts an existing LONG or SQL_TIMESTAMP field in the input into the rowtime attribute.</span></span>

<span>.rowtime(</span>

<span> <span>new</span> Rowtime()</span>

<span> .timestampsFromField(<span>&quot;ts_field&quot;</span>) <span>// required: original field name in the input</span></span>

<span>)</span>

<span><br /></span>

<span><span>// Converts the assigned timestamps from a DataStream API record into the rowtime attribute</span></span>

<span><span>// and thus preserves the assigned timestamps from the source.</span></span>

<span><span>// This requires a source that assigns timestamps (e.g., Kafka 0.10+).</span></span>

<span>.rowtime(</span>

<span> <span>new</span> Rowtime()</span>

<span> .timestampsFromSource()</span>

<span>)</span>

<span><br /></span>

<span><span>// Sets a custom timestamp extractor to be used for the rowtime attribute.</span></span>

<span><span>// The extractor must extend `org.apache.flink.table.sources.tsextractors.TimestampExtractor`.</span></span>

<span>.rowtime(</span>

<span> <span>new</span> Rowtime()</span>

<span> .timestampsFromExtractor(...)</span>

<span>)</span>

watermark生成策略支持


 

// Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum

// observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp

// are not late.

.rowtime(

new Rowtime()

.watermarksPeriodicAscending()

)


// Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.

// Emits watermarks which are the maximum observed timestamp minus the specified delay.

.rowtime(

new Rowtime()

.watermarksPeriodicBounded(2000) // delay in milliseconds

)


// Sets a built-in watermark strategy which indicates the watermarks should be preserved from the

// underlying DataStream API and thus preserves the assigned watermarks from the source.

.rowtime(

new Rowtime()

.watermarksFromSource()

)

3.总结

本文主要讲了flink sql与kafka结合的多种方式,对于datastream相关操作可以一般采用addsource和addsink的方式,对于想使用flink的朋友们,kafkajsontablesource和kafkajsontablesink在逐步废弃掉,可以采用connector和catalog的形式,尤其是后者在实现平台的过程中也是非常之靠谱好用的。

更多flink内容,欢迎加入浪尖知识星球,与750+好友一起学习。

Flink SQL 与 kafka 整合的那些事儿


以上所述就是小编给大家介绍的《Flink SQL 与 kafka 整合的那些事儿》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

持续交付

持续交付

Jez Humble、David Farley / 乔梁 / 人民邮电出版社 / 2011-10 / 89.00元

Jez Humble编著的《持续交付(发布可靠软件的系统方法)》讲述如何实现更快、更可靠、低成本的自动化软件交付,描述了如何通过增加反馈,并改进开发人员、测试人员、运维人员和项目经理之间的协作来达到这个目标。《持续交付(发布可靠软件的系统方法)》由三部分组成。第一部分阐述了持续交付背后的一些原则,以及支持这些原则的实践。第二部分是本书的核心,全面讲述了部署流水线。第三部分围绕部署流水线的投入产出讨......一起来看看 《持续交付》 这本书的介绍吧!

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具