内容简介:本文基于Apache Flink 1.7。代码阅读体验不好可以点击“查看原文”查看。结合上一篇文章,Source 是 Flink 程序的输入,Sink 就是 Flink 程序处理完Source后数据的输出,比如将输出写到文件、sockets、外部系统、或者仅仅是显示(在大数据生态中,很多类似的,比如Flume里也是对应的Source/Channel/Sink),Flink 提供了多种数据输出方式,下面逐一介绍。
本文基于Apache Flink 1.7。代码阅读体验不好可以点击“查看原文”查看。
结合上一篇文章,Source 是 Flink 程序的输入,Sink 就是 Flink 程序处理完Source后数据的输出,比如将输出写到文件、sockets、外部系统、或者仅仅是显示(在大数据生态中,很多类似的,比如Flume里也是对应的Source/Channel/Sink),Flink 提供了多种数据输出方式,下面逐一介绍。
概念
Flink 预定义 Sinks
-
基于文件的:如
writeAsText()
、writeAsCsv()
、writeUsingOutputFormat
、FileOutputFormat
。 -
写到socket:
writeToSocket
。 -
用于显示的:
print
、printToErr
。 -
自定义Sink:
addSink
。
对于 write*
来说,主要用于测试程序,Flink 没有实现这些方法的检查点机制,也就没有 exactly-once 支持。所以,为了保证 exactly-once ,需要使用 flink-connector-filesystem ,同时,自定义的 addSink
也可以支持。
Connectors
connectors 用于给接入第三方数据提供接口,现在支持的connectors 包括:
-
Apache Kafka
-
Apache Cassandra
-
Elasticsearch
-
Hadoop FileSystem
-
RabbitMQ
-
Apache NiFi
另外,通过 Apache Bahir ,可以支持Apache ActiveMQ、Apache Flume、 Redis 、Akka之类的Sink。
容错
为了保证端到端的 exactly-once ,Sink 需要实现checkpoint 机制,下图(图片来自于官网)所示的Sink 实现了这点
实战
Elasticsearch Connector
下面我们将使用 Elasticsearch Connector 作为Sink 为例示范Sink的使用。Elasticsearch Connector 提供了 at least once 语义支持,at lease once 支持需要用到Flink的checkpoint 机制。
要使用Elasticsearch Connector 需要根据Elasticsearch 版本添加依赖,如下图所示(图片来自官网)。
在这里,我们使用的Elasticsearch 版本是5.6.9,Scala 版本2.11。
添加如下依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch5_2.11</artifactId> <version>${flink.version}</version> </dependency>
先看ElasticsearchSink 源码,我们需要定义 ElasticsearchSinkFunction <T> 以及可选的 ActionRequestFailureHandler,ActionRequestFailureHandler 用来处理失败的请求。
public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, TransportClient> { private static final long serialVersionUID = 1L; public ElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpFailureHandler()); } public ElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction, ActionRequestFailureHandler failureHandler) { super(new Elasticsearch5ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction, failureHandler); } }
下面看完整的例子:
package learn.sourcesAndsinks import java.net.{InetAddress, InetSocketAddress} import java.util import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Requests object BasicSinks { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // 定义stream val stream: DataStream[String] = env.fromCollection(List("aaa", "bbb", "ccc")) // Elasticsearch 相关配置,ES 用 docker 起的,所以cluster.name 是默认的docker-cluster val config = new util.HashMap[String, String]() config.put("cluster.name", "docker-cluster") config.put("bulk.flush.max.actions", "1") val transportAddress = new util.ArrayList[InetSocketAddress]() transportAddress.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)) stream.addSink(new ElasticsearchSink( config, transportAddress, new ElasticsearchSinkFunction[String] { def createIndexRequest(element: String): IndexRequest = { val json = new util.HashMap[String, String]() json.put("data", element) return Requests.indexRequest() .index("my-index") .`type`("my-type") .source(json) } def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) = { indexer.add(createIndexRequest(element)) } }, // 忽略错误,示例用,不建议用于生产环境 new IgnoringFailureHandler() )) env.execute() } }
如下图所示,是上面程序的结果。
上面实现了一个基础的Elasticsearch Sink,为了保证数据完整性,需要添加一些重试策略,这些主要跟 Elasticsearch 相关。
ES flush 相关配置 bulk.flush.max.actions bulk.flush.max.size.mb bulk.flush.interval.ms ES 错误重试配置 bulk.flush.backoff.enable bulk.flush.backoff.type bulk.flush.backoff.delay bulk.flush.backoff.retries
如果在此基础上还需要处理Elasticsearch 的报错,可以自己实现ActionRequestFailureHandler 方法。
总结
本文主要以 Flink Elasticsearch Connector 为例讲了Flink 里的Sink,后面会对Source 和 Sink 进行源码解读。
看到这里,请扫描下方二维码关注我,Happy Friday !
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- gRPC 的介绍以及实践
- YARN 的介绍及实践探索
- SourceMap 知多少:介绍与实践
- Flink(四):Source 介绍与实践
- Django 默认权限机制介绍及实践
- 混沌工程详细介绍:Netflix 持续交付实践探寻
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Defensive Design for the Web
37signals、Matthew Linderman、Jason Fried / New Riders / 2004-3-2 / GBP 18.99
Let's admit it: Things will go wrong online. No matter how carefully you design a site, no matter how much testing you do, customers still encounter problems. So how do you handle these inevitable bre......一起来看看 《Defensive Design for the Web》 这本书的介绍吧!