Flink SQL 与 kafka 整合的那些事儿

栏目: 数据库 · 发布时间: 5年前

内容简介:flink与kafka整合是很常见的一种实时处理场景,尤其是kafka 0.11版本以后生产者支持了事务,使得flink与kafka整合能实现完整的端到端的仅一次处理,虽然这样会有checkpoint周期的数据延迟,但是这个仅一次处理也是很诱人的。可见的端到端的使用案例估计就是前段时间oppo的案例分享吧。关注浪尖微信公众号(bigdatatip)输入1.flink sql与kafka整合方式介绍flink SQL与kafka整合有多种方式,浪尖就在这里总结一下:

flink与kafka整合是很常见的一种实时处理场景,尤其是kafka 0.11版本以后生产者支持了事务,使得flink与kafka整合能实现完整的端到端的仅一次处理,虽然这样会有checkpoint周期的数据延迟,但是这个仅一次处理也是很诱人的。可见的端到端的使用案例估计就是前段时间oppo的案例分享吧。关注浪尖微信公众号(bigdatatip)输入 oppo 即可获得。

1.flink sql与kafka整合方式介绍

flink SQL与kafka整合有多种方式,浪尖就在这里总结一下:

1.datastream转table

通过addsource和addsink API,整合,生成Datastream后注册为表,然后 sql 分析。

主要接口有两种形式

<span><span>1.</span>直接注册为表</span>

<span>// register the DataStream <span>as</span> Table <span>&quot;myTable&quot;</span> <span>with</span> fields <span>&quot;f0&quot;</span>, <span>&quot;f1&quot;</span></span>

<span>tableEnv.registerDataStream(<span>&quot;myTable&quot;</span>, stream);</span>

<span><br /></span>

<span>// register the DataStream <span>as</span> table <span>&quot;myTable2&quot;</span> <span>with</span> fields <span>&quot;myLong&quot;</span>, <span>&quot;myString&quot;</span></span>

<span>tableEnv.registerDataStream(<span>&quot;myTable2&quot;</span>, stream, <span>&quot;myLong, myString&quot;</span>);</span>

<span><br /></span>

<span><span>2.</span>转换为table</span>

<span>DataStream&lt;Tuple2&lt;Long, String&gt;&gt; stream = ...</span>

<span><br /></span>

<span>// Convert the DataStream into a Table <span>with</span> default fields <span>&quot;f0&quot;</span>, <span>&quot;f1&quot;</span></span>

<span>Table table1 = tableEnv.fromDataStream(stream);</span>

<span><br /></span>

<span>// Convert the DataStream into a Table <span>with</span> fields <span>&quot;myLong&quot;</span>, <span>&quot;myString&quot;</span></span>

<span>Table table2 = tableEnv.fromDataStream(stream, <span>&quot;myLong, myString&quot;</span>);</span>

2.tablesource和tablesink

通过tablesource和tablesink接口,也可以直接注册为输入和输出表。

Kafka010JsonTableSource和Kafka010JsonTableSink

3.自定义catalog

通过自定义catalog的形式,这种类型暂时不讲后面会有视频教程放到知识星球里。

<span>ExternalCatalog catalog = <span>new</span> InMemoryExternalCatalog();</span>

<span><br /></span>

<span style="">// register the ExternalCatalog catalog</span>

<span>tableEnv.registerExternalCatalog(<span>&quot;InMemCatalog&quot;</span>, catalog);</span>

4.connector方式

这种方式是本文要讲明白的一种方式,其余的会陆续分享到知识星球内部。

这种方式目前仅仅支持kafka,es,和file。

2.案例讲解

直接上案例吧,然后再去讲一下细节问题。

import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.TableEnvironment;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.Json;import org.apache.flink.table.descriptors.Kafka;import org.apache.flink.table.descriptors.Rowtime;import org.apache.flink.table.descriptors.Schema;public class kafka2kafka {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        env.setParallelism(1);        StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);        tEnv.connect(                new Kafka()                        .version("0.10")                        //   "0.8", "0.9", "0.10", "0.11", and "universal"                        .topic("jsontest")                        .property("bootstrap.servers", "mt-mdh.local:9093")                        .property("group.id","test")                        .startFromLatest()        )                .withFormat(                        new Json()                                .failOnMissingField(false)                                .deriveSchema()                )                .withSchema(                        new Schema()                                .field("rowtime",Types.SQL_TIMESTAMP)                                .rowtime(new Rowtime()                                        .timestampsFromField("eventtime")                                        .watermarksPeriodicBounded(2000)                                )                                .field("fruit", Types.STRING)                                .field("number", Types.INT)                )                .inAppendMode()                .registerTableSource("source");        tEnv.connect(                new Kafka()                        .version("0.10")                        //   "0.8", "0.9", "0.10", "0.11", and "universal"                        .topic("test")                        .property("acks", "all")                        .property("retries", "0")                        .property("batch.size", "16384")                        .property("linger.ms", "10")                        .property("bootstrap.servers", "mt-mdh.local:9093")                        .sinkPartitionerFixed()        ).inAppendMode()                .withFormat(                        new Json().deriveSchema()                )                .withSchema(                        new Schema()                                .field("fruit", Types.STRING)                                .field("total", Types.INT)                                .field("time", Types.SQL_TIMESTAMP)                )                .registerTableSink("sink");        tEnv.sqlUpdate("insert into sink select fruit,sum(number),TUMBLE_END(rowtime, INTERVAL '5' SECOND) from source group by fruit,TUMBLE(rowtime, INTERVAL '5' SECOND)");        env.execute();    }}

这个例子是按照事件时间开窗,统计对fruit求和。从这个例子里可以看到要使用connector还是比较麻烦的,配置项目比较多,下面我们就拆分介绍一下。细节内容可以阅读官网(https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#type-strings)

1.配置数据源

<span>.connect(</span>

<span> <span>new</span> Kafka()</span>

<span> .version(<span>&quot;0.11&quot;</span>) <span>// required: valid connector versions are</span></span>

<span> <span>// &quot;0.8&quot;, &quot;0.9&quot;, &quot;0.10&quot;, &quot;0.11&quot;, and &quot;universal&quot;</span></span>

<span> .topic(<span>&quot;...&quot;</span>) <span>// required: topic name from which the table is read</span></span>

<span><br /></span>

<span> <span>// optional: connector specific properties</span></span>

<span> .property(<span>&quot;zookeeper.connect&quot;</span>, <span>&quot;localhost:2181&quot;</span>)</span>

<span> .property(<span>&quot;bootstrap.servers&quot;</span>, <span>&quot;localhost:9092&quot;</span>)</span>

<span> .property(<span>&quot;group.id&quot;</span>, <span>&quot;testGroup&quot;</span>)</span>

<span><br /></span>

<span> <span>// optional: select a startup mode for Kafka offsets</span></span>

<span> .startFromEarliest()</span>

<span> .startFromLatest()</span>

<span> .startFromSpecificOffsets(...)</span>

<span><br /></span>

<span> <span>// optional: output partitioning from Flink's partitions into Kafka's partitions</span></span>

<span> .sinkPartitionerFixed() <span>// each Flink partition ends up in at-most one Kafka partition (default)</span></span>

<span> .sinkPartitionerRoundRobin() <span>// a Flink partition is distributed to Kafka partitions round-robin</span></span>

<span> .sinkPartitionerCustom(MyCustom.class) <span>// use a custom FlinkKafkaPartitioner subclass</span></span>

<span>)</span>

2.数据的格式

目前支持CSV,JSON,AVRO三种格式。从json数据源里解析所需要的table字段,这个过程需要我们指定。总共有三种方式,如下:

<span>.withFormat(</span>

<span> <span>new</span> Json()</span>

<span> .failOnMissingField(<span>true</span>) <span>// optional: flag whether to fail if a field is missing or not, false by default</span></span>

<span><br /></span>

<span> <span>// required: define the schema either by using type information which parses numbers to corresponding types</span></span>

<span> .schema(Type.ROW(...))</span>

<span><br /></span>

<span> <span>// or by using a JSON schema which parses to DECIMAL and TIMESTAMP</span></span>

<span> .jsonSchema(</span>

<span> <span>&quot;{&quot;</span> +</span>

<span> <span>&quot; type: 'object',&quot;</span> +</span>

<span> <span>&quot; properties: {&quot;</span> +</span>

<span> <span>&quot; lon: {&quot;</span> +</span>

<span> <span>&quot; type: 'number'&quot;</span> +</span>

<span> <span>&quot; },&quot;</span> +</span>

<span> <span>&quot; rideTime: {&quot;</span> +</span>

<span> <span>&quot; type: 'string',&quot;</span> +</span>

<span> <span>&quot; format: 'date-time'&quot;</span> +</span>

<span> <span>&quot; }&quot;</span> +</span>

<span> <span>&quot; }&quot;</span> +</span>

<span> <span>&quot;}&quot;</span></span>

<span> )</span>

<span><br /></span>

<span> <span>// or use the table's schema</span></span>

<span> .deriveSchema()</span>

<span>)</span>

其实,最常用的是第三种,直接从我们指定的schema里逆推。

3.schema信息

除了配置schema信息之外,还可以配置时间相关的概念。

<span>.withSchema(</span>

<span> <span>new</span> Schema()</span>

<span> .field(<span>&quot;MyField1&quot;</span>, Types.SQL_TIMESTAMP)</span>

<span> .proctime() <span>// optional: declares this field as a processing-time attribute</span></span>

<span> .field(<span>&quot;MyField2&quot;</span>, Types.SQL_TIMESTAMP)</span>

<span> .rowtime(...) <span>// optional: declares this field as a event-time attribute</span></span>

<span> .field(<span>&quot;MyField3&quot;</span>, Types.BOOLEAN)</span>

<span> .<span>from</span>(<span>&quot;mf3&quot;</span>) <span>// optional: original field in the input that is referenced/aliased by this field</span></span>

<span>)</span>

4.输出的更新模式

更新模式有append模式,retract模式,update模式。

.connect(...)
.inAppendMode() // otherwise: inUpsertMode() or inRetractMode()

5.时间相关配置

在配置schema信息的时候可以配置时间相关的概念,比如事件时间,处理时间,还可以配置watermark相关的,甚至是自定义watermark。

对于事件时间,时间戳抽取支持:

<span><span>// Converts an existing LONG or SQL_TIMESTAMP field in the input into the rowtime attribute.</span></span>

<span>.rowtime(</span>

<span> <span>new</span> Rowtime()</span>

<span> .timestampsFromField(<span>&quot;ts_field&quot;</span>) <span>// required: original field name in the input</span></span>

<span>)</span>

<span><br /></span>

<span><span>// Converts the assigned timestamps from a DataStream API record into the rowtime attribute</span></span>

<span><span>// and thus preserves the assigned timestamps from the source.</span></span>

<span><span>// This requires a source that assigns timestamps (e.g., Kafka 0.10+).</span></span>

<span>.rowtime(</span>

<span> <span>new</span> Rowtime()</span>

<span> .timestampsFromSource()</span>

<span>)</span>

<span><br /></span>

<span><span>// Sets a custom timestamp extractor to be used for the rowtime attribute.</span></span>

<span><span>// The extractor must extend `org.apache.flink.table.sources.tsextractors.TimestampExtractor`.</span></span>

<span>.rowtime(</span>

<span> <span>new</span> Rowtime()</span>

<span> .timestampsFromExtractor(...)</span>

<span>)</span>

watermark生成策略支持


 

// Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum

// observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp

// are not late.

.rowtime(

new Rowtime()

.watermarksPeriodicAscending()

)


// Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.

// Emits watermarks which are the maximum observed timestamp minus the specified delay.

.rowtime(

new Rowtime()

.watermarksPeriodicBounded(2000) // delay in milliseconds

)


// Sets a built-in watermark strategy which indicates the watermarks should be preserved from the

// underlying DataStream API and thus preserves the assigned watermarks from the source.

.rowtime(

new Rowtime()

.watermarksFromSource()

)

3.总结

本文主要讲了flink sql与kafka结合的多种方式,对于datastream相关操作可以一般采用addsource和addsink的方式,对于想使用flink的朋友们,kafkajsontablesource和kafkajsontablesink在逐步废弃掉,可以采用connector和catalog的形式,尤其是后者在实现平台的过程中也是非常之靠谱好用的。

更多flink内容,欢迎加入浪尖知识星球,与750+好友一起学习。

Flink SQL 与 kafka 整合的那些事儿


以上所述就是小编给大家介绍的《Flink SQL 与 kafka 整合的那些事儿》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Alone Together

Alone Together

Sherry Turkle / Basic Books / 2011-1-11 / USD 28.95

Consider Facebookit’s human contact, only easier to engage with and easier to avoid. Developing technology promises closeness. Sometimes it delivers, but much of our modern life leaves us less connect......一起来看看 《Alone Together》 这本书的介绍吧!

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

html转js在线工具
html转js在线工具

html转js在线工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具