内容简介:flink与kafka整合是很常见的一种实时处理场景,尤其是kafka 0.11版本以后生产者支持了事务,使得flink与kafka整合能实现完整的端到端的仅一次处理,虽然这样会有checkpoint周期的数据延迟,但是这个仅一次处理也是很诱人的。可见的端到端的使用案例估计就是前段时间oppo的案例分享吧。关注浪尖微信公众号(bigdatatip)输入1.flink sql与kafka整合方式介绍flink SQL与kafka整合有多种方式,浪尖就在这里总结一下:
flink与kafka整合是很常见的一种实时处理场景,尤其是kafka 0.11版本以后生产者支持了事务,使得flink与kafka整合能实现完整的端到端的仅一次处理,虽然这样会有checkpoint周期的数据延迟,但是这个仅一次处理也是很诱人的。可见的端到端的使用案例估计就是前段时间oppo的案例分享吧。关注浪尖微信公众号(bigdatatip)输入 oppo 即可获得。
1.flink sql与kafka整合方式介绍
flink SQL与kafka整合有多种方式,浪尖就在这里总结一下:
1.datastream转table
通过addsource和addsink API,整合,生成Datastream后注册为表,然后 sql 分析。
主要接口有两种形式
<span><span>1.</span>直接注册为表</span>
<span>// register the DataStream <span>as</span> Table <span>"myTable"</span> <span>with</span> fields <span>"f0"</span>, <span>"f1"</span></span>
<span>tableEnv.registerDataStream(<span>"myTable"</span>, stream);</span>
<span><br /></span>
<span>// register the DataStream <span>as</span> table <span>"myTable2"</span> <span>with</span> fields <span>"myLong"</span>, <span>"myString"</span></span>
<span>tableEnv.registerDataStream(<span>"myTable2"</span>, stream, <span>"myLong, myString"</span>);</span>
<span><br /></span>
<span><span>2.</span>转换为table</span>
<span>DataStream<Tuple2<Long, String>> stream = ...</span>
<span><br /></span>
<span>// Convert the DataStream into a Table <span>with</span> default fields <span>"f0"</span>, <span>"f1"</span></span>
<span>Table table1 = tableEnv.fromDataStream(stream);</span>
<span><br /></span>
<span>// Convert the DataStream into a Table <span>with</span> fields <span>"myLong"</span>, <span>"myString"</span></span>
<span>Table table2 = tableEnv.fromDataStream(stream, <span>"myLong, myString"</span>);</span>
2.tablesource和tablesink
通过tablesource和tablesink接口,也可以直接注册为输入和输出表。
Kafka010JsonTableSource和Kafka010JsonTableSink
3.自定义catalog
通过自定义catalog的形式,这种类型暂时不讲后面会有视频教程放到知识星球里。
<span>ExternalCatalog catalog = <span>new</span> InMemoryExternalCatalog();</span>
<span><br /></span>
<span style="">// register the ExternalCatalog catalog</span>
<span>tableEnv.registerExternalCatalog(<span>"InMemCatalog"</span>, catalog);</span>
4.connector方式
这种方式是本文要讲明白的一种方式,其余的会陆续分享到知识星球内部。
这种方式目前仅仅支持kafka,es,和file。
2.案例讲解
直接上案例吧,然后再去讲一下细节问题。
import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.TableEnvironment;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.Json;import org.apache.flink.table.descriptors.Kafka;import org.apache.flink.table.descriptors.Rowtime;import org.apache.flink.table.descriptors.Schema;public class kafka2kafka { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); tEnv.connect( new Kafka() .version("0.10") // "0.8", "0.9", "0.10", "0.11", and "universal" .topic("jsontest") .property("bootstrap.servers", "mt-mdh.local:9093") .property("group.id","test") .startFromLatest() ) .withFormat( new Json() .failOnMissingField(false) .deriveSchema() ) .withSchema( new Schema() .field("rowtime",Types.SQL_TIMESTAMP) .rowtime(new Rowtime() .timestampsFromField("eventtime") .watermarksPeriodicBounded(2000) ) .field("fruit", Types.STRING) .field("number", Types.INT) ) .inAppendMode() .registerTableSource("source"); tEnv.connect( new Kafka() .version("0.10") // "0.8", "0.9", "0.10", "0.11", and "universal" .topic("test") .property("acks", "all") .property("retries", "0") .property("batch.size", "16384") .property("linger.ms", "10") .property("bootstrap.servers", "mt-mdh.local:9093") .sinkPartitionerFixed() ).inAppendMode() .withFormat( new Json().deriveSchema() ) .withSchema( new Schema() .field("fruit", Types.STRING) .field("total", Types.INT) .field("time", Types.SQL_TIMESTAMP) ) .registerTableSink("sink"); tEnv.sqlUpdate("insert into sink select fruit,sum(number),TUMBLE_END(rowtime, INTERVAL '5' SECOND) from source group by fruit,TUMBLE(rowtime, INTERVAL '5' SECOND)"); env.execute(); }}
这个例子是按照事件时间开窗,统计对fruit求和。从这个例子里可以看到要使用connector还是比较麻烦的,配置项目比较多,下面我们就拆分介绍一下。细节内容可以阅读官网(https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#type-strings)
1.配置数据源
<span>.connect(</span>
<span> <span>new</span> Kafka()</span>
<span> .version(<span>"0.11"</span>) <span>// required: valid connector versions are</span></span>
<span> <span>// "0.8", "0.9", "0.10", "0.11", and "universal"</span></span>
<span> .topic(<span>"..."</span>) <span>// required: topic name from which the table is read</span></span>
<span><br /></span>
<span> <span>// optional: connector specific properties</span></span>
<span> .property(<span>"zookeeper.connect"</span>, <span>"localhost:2181"</span>)</span>
<span> .property(<span>"bootstrap.servers"</span>, <span>"localhost:9092"</span>)</span>
<span> .property(<span>"group.id"</span>, <span>"testGroup"</span>)</span>
<span><br /></span>
<span> <span>// optional: select a startup mode for Kafka offsets</span></span>
<span> .startFromEarliest()</span>
<span> .startFromLatest()</span>
<span> .startFromSpecificOffsets(...)</span>
<span><br /></span>
<span> <span>// optional: output partitioning from Flink's partitions into Kafka's partitions</span></span>
<span> .sinkPartitionerFixed() <span>// each Flink partition ends up in at-most one Kafka partition (default)</span></span>
<span> .sinkPartitionerRoundRobin() <span>// a Flink partition is distributed to Kafka partitions round-robin</span></span>
<span> .sinkPartitionerCustom(MyCustom.class) <span>// use a custom FlinkKafkaPartitioner subclass</span></span>
<span>)</span>
2.数据的格式
目前支持CSV,JSON,AVRO三种格式。从json数据源里解析所需要的table字段,这个过程需要我们指定。总共有三种方式,如下:
<span>.withFormat(</span>
<span> <span>new</span> Json()</span>
<span> .failOnMissingField(<span>true</span>) <span>// optional: flag whether to fail if a field is missing or not, false by default</span></span>
<span><br /></span>
<span> <span>// required: define the schema either by using type information which parses numbers to corresponding types</span></span>
<span> .schema(Type.ROW(...))</span>
<span><br /></span>
<span> <span>// or by using a JSON schema which parses to DECIMAL and TIMESTAMP</span></span>
<span> .jsonSchema(</span>
<span> <span>"{"</span> +</span>
<span> <span>" type: 'object',"</span> +</span>
<span> <span>" properties: {"</span> +</span>
<span> <span>" lon: {"</span> +</span>
<span> <span>" type: 'number'"</span> +</span>
<span> <span>" },"</span> +</span>
<span> <span>" rideTime: {"</span> +</span>
<span> <span>" type: 'string',"</span> +</span>
<span> <span>" format: 'date-time'"</span> +</span>
<span> <span>" }"</span> +</span>
<span> <span>" }"</span> +</span>
<span> <span>"}"</span></span>
<span> )</span>
<span><br /></span>
<span> <span>// or use the table's schema</span></span>
<span> .deriveSchema()</span>
<span>)</span>
其实,最常用的是第三种,直接从我们指定的schema里逆推。
3.schema信息
除了配置schema信息之外,还可以配置时间相关的概念。
<span>.withSchema(</span>
<span> <span>new</span> Schema()</span>
<span> .field(<span>"MyField1"</span>, Types.SQL_TIMESTAMP)</span>
<span> .proctime() <span>// optional: declares this field as a processing-time attribute</span></span>
<span> .field(<span>"MyField2"</span>, Types.SQL_TIMESTAMP)</span>
<span> .rowtime(...) <span>// optional: declares this field as a event-time attribute</span></span>
<span> .field(<span>"MyField3"</span>, Types.BOOLEAN)</span>
<span> .<span>from</span>(<span>"mf3"</span>) <span>// optional: original field in the input that is referenced/aliased by this field</span></span>
<span>)</span>
4.输出的更新模式
更新模式有append模式,retract模式,update模式。
5.时间相关配置
在配置schema信息的时候可以配置时间相关的概念,比如事件时间,处理时间,还可以配置watermark相关的,甚至是自定义watermark。
对于事件时间,时间戳抽取支持:
<span><span>// Converts an existing LONG or SQL_TIMESTAMP field in the input into the rowtime attribute.</span></span>
<span>.rowtime(</span>
<span> <span>new</span> Rowtime()</span>
<span> .timestampsFromField(<span>"ts_field"</span>) <span>// required: original field name in the input</span></span>
<span>)</span>
<span><br /></span>
<span><span>// Converts the assigned timestamps from a DataStream API record into the rowtime attribute</span></span>
<span><span>// and thus preserves the assigned timestamps from the source.</span></span>
<span><span>// This requires a source that assigns timestamps (e.g., Kafka 0.10+).</span></span>
<span>.rowtime(</span>
<span> <span>new</span> Rowtime()</span>
<span> .timestampsFromSource()</span>
<span>)</span>
<span><br /></span>
<span><span>// Sets a custom timestamp extractor to be used for the rowtime attribute.</span></span>
<span><span>// The extractor must extend `org.apache.flink.table.sources.tsextractors.TimestampExtractor`.</span></span>
<span>.rowtime(</span>
<span> <span>new</span> Rowtime()</span>
<span> .timestampsFromExtractor(...)</span>
<span>)</span>
watermark生成策略支持
// Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
// observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp
// are not late.
.rowtime(
new Rowtime()
.watermarksPeriodicAscending()
)
// Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
// Emits watermarks which are the maximum observed timestamp minus the specified delay.
.rowtime(
new Rowtime()
.watermarksPeriodicBounded(2000) // delay in milliseconds
)
// Sets a built-in watermark strategy which indicates the watermarks should be preserved from the
// underlying DataStream API and thus preserves the assigned watermarks from the source.
.rowtime(
new Rowtime()
.watermarksFromSource()
)
3.总结
本文主要讲了flink sql与kafka结合的多种方式,对于datastream相关操作可以一般采用addsource和addsink的方式,对于想使用flink的朋友们,kafkajsontablesource和kafkajsontablesink在逐步废弃掉,可以采用connector和catalog的形式,尤其是后者在实现平台的过程中也是非常之靠谱好用的。
更多flink内容,欢迎加入浪尖知识星球,与750+好友一起学习。
以上所述就是小编给大家介绍的《Flink SQL 与 kafka 整合的那些事儿》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
HTML 编码/解码
HTML 编码/解码
HEX CMYK 转换工具
HEX CMYK 互转工具