内容简介:Jiangjie QinFlink为job,task和operator定义了一些标准指标。它还支持各种方案中的自定义指标。但是,到目前为止,连接器没有标准或传统的度量标准定义。目前,每个连接器都定义了自己的指标。这使操作和监视变得复杂。不可否认,不同的连接器可能具有不同的度量标准,但是一些常用的度量标准可能是标准化的。此FLIP提供了一组标准连接器指标,如果适用,每个连接器应发出这些指标。将来,Flink生态系统中的其他项目可能依赖于此度量标准约定。因此,在报告度量标准时,连接器实现应遵循约定。
Jiangjie Qin
Motivation
Flink为job,task和operator定义了一些标准指标。它还支持各种方案中的自定义指标。但是,到目前为止,连接器没有标准或传统的度量标准定义。目前,每个连接器都定义了自己的指标。这使操作和监视变得复杂。不可否认,不同的连接器可能具有不同的度量标准,但是一些常用的度量标准可能是标准化的。此FLIP提供了一组标准连接器指标,如果适用,每个连接器应发出这些指标。
将来,Flink生态系统中的其他项目可能依赖于此度量标准约定。因此,在报告度量标准时,连接器实现应遵循约定。
Public Interfaces
我们建议为连接器引入一组常规/标准指标。
重要的是要提到这一点:
-
连接器实现不必报告所有已定义的度量标准。但是,如果连接器报告下面定义的相同语义的度量,则实现应遵循约定。
-
以下度量标准约定不是完整列表。随着时间的推移将添加更传统的度量。
-
直方图指标通常非常昂贵。由于其性能影响,我们故意将其排除在此FLIP中。有关详细信息,请参阅后续工作部分。
Source Metrics
Name |
Type |
Unit |
Description |
numBytesIn |
Counter |
Bytes |
The total number of input bytes since the source started |
numBytesInPerSecond |
Meter |
Bytes/Sec |
The input bytes per second |
numRecordsIn |
Counter |
Records |
(Existing operator metric) The total number of input records since the source started |
numRecordsInPerSecond |
Meter |
Records/Sec |
(Existing operator metric) The input records per second |
numRecordsInErrors | Counter | Records | The total number of record that failed to consume |
currentFetchLatency | Gauge | ms | The latency occurred before Flink fetched the record. This metric is an instantaneous value recorded for the last processed record. This metric is provided because latency histogram could be expensive. The instantaneous latency value is usually a good enough indication of the latency. fetchLatency = FetchTime - EventTime |
currentLatency | Gauge | ms | The latency occurred before the record is emitted by the source connector. This metric is an instantaneous value recorded for the last processed record. This metric is provided because latency histogram could be expensive. The instantaneous latency value is usually a good enough indication of the latency. latency = EmitTime - EventTime |
idleTime |
Gauge |
ms |
The time in milliseconds that the source has not processed any record. idleTime = CurrentTime - LastRecordProcessTime |
pendingBytes | Gauge | Bytes | The number of bytes that have not been fetched by the source. e.g. the remaining bytes in a file after the file descriptor reading position. |
pendingRecords | Gauge | Records | The number of records that have not been fetched by the source. e.g. the available records after the consumer offset in a Kafka partition. |
|
Counter |
Bytes |
The total number of input bytes since the source started |
numBytesInPerSecond |
Meter |
Bytes/Sec |
The input bytes per second |
numRecordsIn |
Counter |
Records |
(Existing operator metric) The total number of input records since the source started |
numRecordsInPerSecond |
Meter |
Records/Sec |
(Existing operator metric) The input records per second |
numRecordsInErrors | Counter | Records | The total number of record that failed to consume |
currentFetchLatency | Gauge | ms | The latency occurred before Flink fetched the record. This metric is an instantaneous value recorded for the last processed record. This metric is provided because latency histogram could be expensive. The instantaneous latency value is usually a good enough indication of the latency. fetchLatency = FetchTime - EventTime |
currentLatency | Gauge | ms | The latency occurred before the record is emitted by the source connector. This metric is an instantaneous value recorded for the last processed record. This metric is provided because latency histogram could be expensive. The instantaneous latency value is usually a good enough indication of the latency. latency = EmitTime - EventTime |
idleTime |
Gauge |
ms |
The time in milliseconds that the source has not processed any record. idleTime = CurrentTime - LastRecordProcessTime |
pendingBytes | Gauge | Bytes | The number of bytes that have not been fetched by the source. e.g. the remaining bytes in a file after the file descriptor reading position. |
pendingRecords | Gauge | Records | The number of records that have not been fetched by the source. e.g. the available records after the consumer offset in a Kafka partition. |
Sink Metrics
Name |
Type |
Unit |
Description |
numBytesOut |
Counter |
Bytes |
The total number of output bytes since the source started |
numBytesOutPerSecond |
Meter |
Bytes/Sec |
The output bytes per second |
numRecordsOut |
Counter |
Records |
(Existing operator metric) The total number of output records since the source started |
numRecordsOutPerSecond |
Meter |
Records/Sec |
(Existing operator metric) The output records per second |
numRecordsOutErrors | Counter | Record | The total number of records failed to send |
currentSendTime | Gauge | ms | The time it takes to send the last record. This metric is an instantaneous value recorded for the last processed record. |
Scope
每个源和接收器的度量标准组将与普通操作员范围相同,即默认为
<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>
其他连接器特定度量标准也应使用相同的范围。
Native Connector Metrics
如果连接器具有其原始度量标准,则仍应保留原始度量标准名称,即使某些原始度量标准使用标准度量标准名称公开。
Future Work
Opt in/out metrics
在本FLIP中,我们故意将一些有用但可能很昂贵的指标留在范围之外。例如:
Name |
Type |
Unit |
Description |
recordSize |
Histogram |
Bytes |
The size of a record. |
fetchLatency |
Histogram |
ms |
The latency occurred before Flink fetched the record. fetchLatency = FetchTime - EventTime |
latency |
Histogram |
ms |
The latency occurred before the record is emitted by the source connector. latency = EmitTime - EventTime |
sendTime |
Histogram |
ms |
The time it takes to send a record |
我们计划通过引入可选指标将这些指标添加到约定中,以允许用户按需选择加入/退出这些昂贵的指标。这将在单独的FLIP中讨论。
Proposed Changes
-
将建议的指标添加到现有连接器。
-
如有必要,请将旧指标标记为已弃用。
-
如果需要,请更正连接器的范围和度量标准名称。
Compatibility, Deprecation, and Migration Plan
此FLIP建议向连接器添加新度量标准,并在必要时将旧的重复度量标记为已弃用。
在一些版本之后,我们希望删除一些与新标准指标重复的旧指标。但是没有严格的时间表。
Test Plan
将创建标准连接器度量标准测试套件,以确保正确实现连接器名称。
FLIP-40: Flink Driver
Shuiqiang Chen
Motivation
正如在交互式编程的讨论中提到的,用户应用程序可能由多个作业组成,需要很长时间才能完成。当前,当Flink运行具有多个作业的应用程序时,应用程序将在负责提交作业的本地进程中运行。直到整个应用程序完成,该本地进程才会退出。用户必须密切关注本地进程,以防它因连接丢失、会话超时、本地操作系统问题等而被终止。
为了解决这个问题,我们想引入Flink Driver。用户可以使用Driver模式提交应用程序。将提交Flink Driver作业以处理用户应用程序中的作业提交。Driver模式本身不一定绑定到交互式编程。但是,由于大多数使用交互式编程的应用程序都是长期运行的,因此Driver模式在这种情况下特别有用。
Goals
Driver模式的目标:
-
在Flink Driver(运行用户程序的主要功能的Flink作业)中执行用户应用程序,这样用户就不需要长时间运行本地进程。
-
支持Flink当前支持的所有应用提交方式,独立集群或纱线/Kubernetes集群等。
-
当应用程序在Driver模式下运行时,可以查询应用程序状态。
Public Interfaces
所有公共接口更改都在./Flink run命令行上。
-
添加一个新选项-D/-Driver以启用Driver模式。
-
如果使用Driver模式,则为Driver作业添加以下新配置
-Dhm |
--driverheapmemory <arg> |
Driver task heap memory in MB |
-Ddm |
--driverdirectmemory <arg> |
Driver task direct memory in MB |
-Dc |
--drivercpus <arg> |
Driver task cpu cores. |
-Dnm |
--drivernativememory <arg> |
Driver task native memory in MB |
Proposed Changes
Current status of running applications
Flink提供了一个命令行接口(Command-Line Interface,CLI)来运行打包为JAR文件的应用程序。根据用户是否提供现有Flink集群,存在两种情况:per作业模式和会话模式:
per-job
.bin/flink run application.jar
-
对于应用程序中的每个作业,将使用作业图部署Flink集群。一旦部署完成,Flink集群将运行作业图。作业完成后,集群将被销毁并回收
-
仅当用户应用程序中只有一个JobGraph时,Pre-job模式才起作用。
session-mode
.bin//flink run application.jar -m JM_ADDRESS
-
用户使用配置的作业管理器主机:端口或群集ID检索集群客户端,然后使用ClusterClient将作业提交到该集群。作业完成后,Flink 集群 保持不变。
在通过命令行运行Flink应用程序时,用户还可以指定附加/分离模式的选项。如果用户应用程序仅包含单个作业,则行为如下:
-
默认情况下,使用附加模式。命令行进程将等到用户应用程序中的作业完成后再退出。
-
如果将-d(detached)指定为命令行参数,则提交用户应用程序中的作业后,命令行进程将退出,但不会等待作业完成。
如果用户应用程序中有多个作业,则附加/分离的行为实际上没有很好地定义,有时可能会令人困惑。此外,用户还可以通过配置选项-s来指定用于恢复作业的保存点路径。更多选项及其用法可在附录中找到。
Run applications with driver mode
与当前状态类似,只有当用户通过命令行运行其应用程序时,Driver模式才可用。我们想要引入一个新的选项-D/-Driver to./bin/Flink run。
以yarn模式为例,用户可以执行以下命令提交启用Flink Driver的应用程序:
bin/flink run -m yarn-cluster -D -c {ENTRY_CLASS} {PATH_TO_APPLICATION_JAR}
Driver mode behavior
由于具有Driver模式的应用程序可能提交多个作业,因此不再支持当前的每作业模式。因此,需要Flink集群才能在应用程序中运行作业。
当用户应用程序在Driver模式下运行时,将首先创建Flink Driver作业以承载用户应用程序的主要功能。然后,该作业将作为一个并行度设置为1的普通Flink作业提交给定的Flink集群。然后,该Driver作业将调用用户主函数,并将用户应用程序中的作业提交给运行Driver作业本身的同一Flink集群。
下面是Flink Driver的整体情况:
由于所有作业都在同一集群中运行,因此用户可以使用REST API查询作业状态并获取应用程序进度。由Driver作业提交的作业名称将具有用户定义的作业名称的前缀。
FLIP-43: Savepoint Connector
Seth Wiesman
-
Current state: Under Discussion
-
Discussion thread:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-FLIP-43-Savepoint-Connector-td29233.html
Mot ivation
Flink为用户功能提供状态抽象,以保证流的容错处理。用户可以使用非分区和分区状态。
分区状态接口提供对不同类型状态的访问,这些状态都是作用于当前输入元素的键的。这种类型的状态仅在keyed stream中可用,该流是通过stream.keyBy()创建的。
目前,所有这种状态都是Flink的内部状态,用于在故障情况下提供处理保证(例如,恰好一次处理)。从外部访问状态的唯一方法是通过Queryable状态,但这仅限于只读,一次操作一个键。
保存点连接器使用Flink的批处理DataSet API提供读取、写入和修改保存点的强大功能。
这对以下内容很有用:
-
分析有趣模式的状态
-
通过检查状态差异来排除故障或审核作业
-
新应用程序的引导状态
-
修改保存点,例如:
-
改变最大并行度
-
打破架构更改
-
纠正无效状态
Abstraction
要了解如何在批处理上下文中与保存点进行最佳交互,必须有一个清晰的心理模型,说明Flink状态中的数据如何与传统的关系数据库相关联。
可以将数据库视为一个或多个名称空间,每个名称空间包含一组表。这些表又包含其值在它们之间具有某种内在关系的列,例如在同一个键下作用域。
保存点表示特定时间点的Flink作业的状态,该作业由许多运算符组成。这些运算符包含各种状态,分区或键控状态,以及非分区或运算符状态。
MapStateDescriptor<Integer, Double> CURRENCY_RATES=new MapStateDescriptor<>("rates",Types.INT,Types.DOUBLE); class CurrencyConverter extends BroadcastProcessFunction<Transaction, CurrencyRate, Transaction> { public void processElement(Transaction value, ReadOnlyContext ctx, Collector<Transaction> out) throws Exception { Double rate = ctx.getBroadcastState(CURRENCY_RATES).get(value.currencyId); if (rate != null) { value.amount *= rate; } out.collect(value); } public void processBroadcastElement( CurrencyRate value, Context ctx, Collector<Transaction> out) throws Exception { ctx.getBroadcastState(CURRENCY_RATES).put(value.currencyId, value.rate); } } class Summarize extends RichFlatMapFunction<Transaction, Summary> { transient ValueState<Double> totalState; transient ValueState<Integer> countState; public void open(Configuration configuration) throws Exception { totalState = getRuntimeContext().getState(new ValueStateDescriptor("total", Types.DOUBLE)); countState = getRuntimeContext().getState(new ValueStateDescriptor("count", Types.INT)); } public void flatMap(Transaction value, Collector<Summary> out) throws Exception { Summary summary = new Summary(); summary.total = value.amount; summary.count = 1; Double currentTotal = totalState.value(); if (currentTotal != null) { summary.total += currentTotal; } Integer currentCount = countState.value(); if (currentCount != null) { summary.count += currentCount; } countState.update(summary.count); out.collect(summary); } } DataStream<Transaction> transactions = ... BroadcastStream<CurrencyRate> rates=... transactions.connect(rates) .process(new CurrencyConverter()) .uid("currency_converter") .keyBy(transaction->transaction.accountId) .flatMap(new Summarize()) .uid("summarize")
此作业包含多个操作符以及各种状态。在分析该状态时,我们可以首先通过运算符(通过设置其uid来命名)对数据进行作用域。在每个运算符中,我们可以查看已注册的state。CurrencyConverter具有广播状态,这是一种未分区操作员状态。通常,操作符状态中的任何两个元素之间没有关系,因此我们可以将每个值看作是它自己的行。将此与包含两个键控状态的“汇总”进行对比。因为这两个状态的作用域都在同一个键下,所以我们可以安全地假设这两个值之间存在某种关系。因此,键控状态最好理解为每个操作符包含一个表,该表包含一个“键”列和n个值列,每个注册状态对应一个列。所有这些都意味着可以使用以下伪 SQL 命令来描述此作业的状态: CREATE NAMESPACE currency_converter;
CREATE TABLE currency_converter.rates( value Tuple2<Integer, Double> ); CREATE NAMESPACE summarize; CREATE TABLE summarize.keyed_state( key INTEGER PRIMARY KEY, total DOUBLE, count INTEGER );
通常,保存点↔数据库关系可以总结为:
-
保存点是数据库。
-
运算符是由其uid命名的命名空间。
-
每个操作符状态代表一个表。
-
操作符状态中的每个元素表示该表中的一行。
-
每个包含键控状态的运算符都有一个“keyed_state”表。
-
每个keyed_state表都有一个键列映射操作符的键值。
-
每个注册状态表示表中的一列。
-
表中的每一行都映射到一个键。
Public Interfaces
Reading an existing savepoint
加载现有保存点:
ExecutionEnvironment bEnv=ExecutionEnvironment.getExecutionEnvironment(); ExistingSavepoint savepoint=Savepoint.load(bEnv,"hdfs://path/",stateBackend);
读取运算符状态时,只需指定运算符uid,状态名称和类型信息。
DataSet<Integer> listState=savepoint.readListState("uid","state",Types.INT); DataSet<Integer> unionState=savepoint.readUnionState("uid","state",Types.INT); DataSet<Tuple2<Integer, Integer>>broadcastState=savepoint.readBroadcastState("uid","state",Types.INT,Types.INT);
此外,如果状态使用自定义序列化,则可以提供自定义类型序列化程序。
DataSet<Integer> listState = savepoint.readListState("uid", "state”, Types.INT, new MyCustomIntSerializer());
当读取键控状态时,用户指定KeyedStateReaderFunction以允许读取任意列和复杂状态类型,例如ListState,MapState和AggregatingState以及定时器。这意味着如果运算符包含有状态过程函数,例如:
class StatefulFunctionWithTime extends KeyedProcessFunction<Integer, Integer, Void> { ValueState<Integer> state; @Override public void open(Configuration parameters) { state = getRuntimeContext().getState(stateDescriptor); } @Override public void processElement(Integer value, Context ctx, Collector<Void> out) throws Exception { state.update(value + 1); ctx.timerService().registerEventTimeTimer(value); } }
然后,用户可以通过首先定义输出类型和相应的KeyedStateReaderFunction来准备此状态。
class MyPojo { Integer key; Integer value; Set<Long> timers; } class ReaderFunction extends KeyedStateReaderFunction<Integer, MyPojo> { ValueState<Integer> state; @Override public void open(Configuration parameters) { state = getRuntimeContext().getState(stateDescriptor); } @Override public void processKey( Integer key, Context ctx, Collector<MyPojo> out) throws Exception { MyPojo pojo = new MyPojo(); pojo.key = key; pojo.value = state.value(); pojo.timers = ctx.getEventTimeTimers(); out.collect(pojo); } } DataSet<MyPojo> keyedState = savepoint.readKeyedState("uid", new ReaderFunction());
Creating state / savepoint from scratch
定义如何使用给定的DataSet引导新运算符的状态:
BootstrapTransformation<Account> transformation=OperatorTransformation .bootstrapWith(data) .assignTimestamps(account->account.timestamp) .keyBy(acc->acc.id) .transform(new AccountBootstrapper()); class AccountBootstrapp6er extends KeyedStateBootstrapFunction<Integer, Account> { ValueState<Double> state; @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor<Double> descriptor = new ValueStateDescriptor<>("total", Types.DOUBLE); state = getRuntimeContext().getState(descriptor); } @Override public void processElement(Account value, Context ctx) throws Exception { state.update(value.amount); } }
创建一个新的保存点,指定状态后端类型,最大并行度,多个运算符和输出路径。
用给定的DataSet引导新运算符的状态:
Savepoint.create(stateBackend, maxParallelism) .withOperator("uid", transformation) .withOperator(...) .write(path)
Modifying an existing savepoint
基于现有保存点加载新保存点并添加/覆盖/删除运算符
ExistingSavepoint existingSavepoint = Savepoint.load(backendType, oldSavepointPath)
添加一个新的bootstrapped运算符
existingSavepoint .withOperator(“newUid”, transformation) .write(path)
删除/覆盖现有保存点中的运算符状态,并进行写入。修改后的保存点保留原始保存点的最大并行度。
existingSavepoint .removeOperator(oldOperatorUid) .withOperator(oldOperatorUid, transformation) .write(path)
Proposed Changes
实现的关键目标是仅使用可用的保存点API,以便实现简单且易于维护。随着保存点格式的更改或添加了诸如TTL或状态迁移等新功能,连接器将继续工作而无需修改。
Querying Timers
唯一的先决条件是对内部计时器服务进行微小的修改,该服务提供了对该时间点注册的密钥的时间戳的有效映射。有效查询已注册的计时器需要将密钥映射到已注册的时间戳。由于计时器服务驻留在每个记录执行路径中,因此我们不希望对计时器的管理方式进行任何更改。相反,将向InternalTimerService接口添加两个方法; forEachProcessingTimeTimer和forEachEventTimeTimer。这些方法允许在读取之前将所有已注册的定时器复制到数据结构中,以支持有效的查询,而无需触及任何每个记录的代码路径。
State Input
从现有保存点读取状态是围绕一系列输入格式构建的,其中每个分割对应于数据流执行图中的单个执行顶点。这意味着如果请求十个输入拆分,则状态的分区相同,就好像该保存点在具有十行并行性的数据流应用程序中恢复一样(使用StateAssignmentOperation中的方法)。在打开时,每个拆分将恢复本地状态后端并遍历所有已恢复的数据。
Writing New Savepoints
Savepoint编写基于三个接口:
-
StateBootstrapFunction用于写入非分区的运算符状态
-
BroadcastStateBootstrapFunction用于写入广播状态
-
KeyedStateBootstrapFunction用于写入键控运算符状态
每个接口的结构与处理函数类似,除了它们不包含收集器,因为写入是数据流中的终端操作。接口由相应的StreamOperator支持,但运算符不包含任何特殊逻辑。实际的快照发生在名为BoundedStreamTask的StreamTask的新子类中。此类与OneInputStreamTask相同,除了:
-
输入由迭代器而不是网络堆栈供电
-
处理完所有数据后,它将拍摄子任务的快照
这意味着重用所有检查点逻辑,与输入格式类似,库将免费支持所有保存点功能。最后,BoundedStreamTask将在DataSet#mapPartition中运行,该mapPartition接收引导数据并输出快照的OperatorSubtaskState。然后,可以聚合快照句柄并将其写为保存点元数据文件。
Appendix A: Why use the DataSet API
随着社区内所有正在进行的工作以改进Flink对Table API和最终BoundedStream API的批处理支持,出现了为什么现在使用DataSet API的问题。理论上有三个其他API可以在以下基础上构建此功能:
-
BoundedStream
-
目前不存在
-
数据流
-
已考虑使用DataStream API,但缺少需要核心运行时更改且似乎超出范围的关键功能。
-
Table API
-
当前的表运行器需要使用DataSet API实现批处理应用程序的源/接收器
-
由阿里巴巴提供的新表运行正在积极开发中,需要使用DataStream API实现批处理表源/接收器
同时,我们也很欣赏使用DataSet API构建的任何新功能都需要在实现适当的BoundedStream API并且不推荐使用DataSet时进行更新。这就是为什么savepoint连接器将其功能包装在上面显示的API中并且不暴露任何内部结构(如输入和输出格式)的原因。从用户的角度来看,唯一会改变的是对readListState的调用将返回BoundedStream <>而不是DataSet <>。在内部,DataSet的使用是微不足道的,因为核心功能是从flink-streaming-java模块中公开的保存点api派生的,迁移应该只是改变类型的问题。
大家工作学习遇到HBase技术问题,把问题发布到HBase技术社区论坛 http://hbase.group ,欢迎大家论坛上面提问留言讨论。想了解更多HBase技术关注HBase技术社区公众号(微信号:hbasegroup),非常欢迎大家积极投稿。
本群为HBase+Spark技术交流讨论,整合最优质的专家资源和技术资料会定期开展线下技术沙龙,专家技术直播,专家答疑活动
点击链接钉钉入群: https://dwz.cn/Fvqv066s 或扫码进群
本群为Cassandra技术交流讨论,整合最优质的专家资源和技术资料会定期开展线下技术沙龙,专家技术直播,专家答疑活动
Cassandra 社区钉钉大群: https://c.tb.cn/F3.ZRTY0o
Cassandra 技术社区微信公众号:
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。