内容简介:本文主要研究一下flink Table的Time Attributesflink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/sources/definedTimeAttributes.scala
序
本文主要研究一下flink Table的Time Attributes
Processing time
通过fromDataStream定义
DataStream<Tuple2<String, String>> stream = ...; // declare an additional logical field as a processing time attribute Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime"); WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
- 从DataStream创建Table的话,可以在fromDataStream里头进行定义Processing time
通过TableSource定义
// define a table source with a processing attribute public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute { @Override public TypeInformation<Row> getReturnType() { String[] names = new String[] {"Username" , "Data"}; TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()}; return Types.ROW(names, types); } @Override public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) { // create stream DataStream<Row> stream = ...; return stream; } @Override public String getProctimeAttribute() { // field with this name will be appended as a third field return "UserActionTime"; } } // register table source tEnv.registerTableSource("UserActions", new UserActionSource()); WindowedTable windowedTable = tEnv .scan("UserActions") .window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
- 通过TableSource创建Table的话,可以通过实现DefinedProctimeAttribute接口来定义Processing time
Event time
通过fromDataStream定义
// Option 1: // extract timestamp and assign watermarks based on knowledge of the stream DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...); // declare an additional logical field as an event time attribute Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.rowtime"); // Option 2: // extract timestamp from first field, and assign watermarks based on knowledge of the stream DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...); // the first field has been used for timestamp extraction, and is no longer necessary // replace first field with a logical event time attribute Table table = tEnv.fromDataStream(stream, "UserActionTime.rowtime, Username, Data"); // Usage: WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
- 从DataStream创建Table的话,可以在fromDataStream里头进行定义Event time;具体有两种方式,一种是额外定义一个字段,一种是覆盖原有的字段
通过TableSource定义
// define a table source with a rowtime attribute public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes { @Override public TypeInformation<Row> getReturnType() { String[] names = new String[] {"Username", "Data", "UserActionTime"}; TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()}; return Types.ROW(names, types); } @Override public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) { // create stream // ... // assign watermarks based on the "UserActionTime" attribute DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...); return stream; } @Override public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() { // Mark the "UserActionTime" attribute as event-time attribute. // We create one attribute descriptor of "UserActionTime". RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor( "UserActionTime", new ExistingField("UserActionTime"), new AscendingTimestamps()); List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr); return listRowtimeAttrDescr; } } // register the table source tEnv.registerTableSource("UserActions", new UserActionSource()); WindowedTable windowedTable = tEnv .scan("UserActions") .window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
- 通过TableSource创建Table的话,可以通过实现DefinedRowtimeAttributes接口来定义Event time
definedTimeAttributes
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/sources/definedTimeAttributes.scala
/** * Extends a [[TableSource]] to specify a processing time attribute. */ trait DefinedProctimeAttribute { /** * Returns the name of a processing time attribute or null if no processing time attribute is * present. * * The referenced attribute must be present in the [[TableSchema]] of the [[TableSource]] and of * type [[Types.SQL_TIMESTAMP]]. */ @Nullable def getProctimeAttribute: String } /** * Extends a [[TableSource]] to specify rowtime attributes via a * [[RowtimeAttributeDescriptor]]. */ trait DefinedRowtimeAttributes { /** * Returns a list of [[RowtimeAttributeDescriptor]] for all rowtime attributes of the table. * * All referenced attributes must be present in the [[TableSchema]] of the [[TableSource]] and of * type [[Types.SQL_TIMESTAMP]]. * * @return A list of [[RowtimeAttributeDescriptor]]. */ def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] } /** * Describes a rowtime attribute of a [[TableSource]]. * * @param attributeName The name of the rowtime attribute. * @param timestampExtractor The timestamp extractor to derive the values of the attribute. * @param watermarkStrategy The watermark strategy associated with the attribute. */ class RowtimeAttributeDescriptor( val attributeName: String, val timestampExtractor: TimestampExtractor, val watermarkStrategy: WatermarkStrategy) { /** Returns the name of the rowtime attribute. */ def getAttributeName: String = attributeName /** Returns the [[TimestampExtractor]] for the attribute. */ def getTimestampExtractor: TimestampExtractor = timestampExtractor /** Returns the [[WatermarkStrategy]] for the attribute. */ def getWatermarkStrategy: WatermarkStrategy = watermarkStrategy override def equals(other: Any): Boolean = other match { case that: RowtimeAttributeDescriptor => Objects.equals(attributeName, that.attributeName) && Objects.equals(timestampExtractor, that.timestampExtractor) && Objects.equals(watermarkStrategy, that.watermarkStrategy) case _ => false } override def hashCode(): Int = { Objects.hash(attributeName, timestampExtractor, watermarkStrategy) } }
- DefinedProctimeAttribute定义了getProctimeAttribute方法,返回String,用于定义Process time的字段名;DefinedRowtimeAttributes定义了getRowtimeAttributeDescriptors方法,返回的是RowtimeAttributeDescriptor的List,RowtimeAttributeDescriptor有3个属性,分别是attributeName、timestampExtractor及watermarkStrategy
小结
- 在从DataStream或者TableSource创建Table时可以指定Time Attributes,指定了之后就可以作为field来使用或者参与time-based的操作
- 针对Processing time,如果从DataStream创建Table的话,可以在fromDataStream里头进行定义;通过TableSource创建Table的话,可以通过实现DefinedProctimeAttribute接口来定义Processing time;DefinedProctimeAttribute定义了getProctimeAttribute方法,返回String,用于定义Process time的字段名
- 针对Event time,如果从DataStream创建Table的话,可以在fromDataStream里头进行定义;具体有两种方式,一种是额外定义一个字段,一种是覆盖原有的字段;通过TableSource创建Table的话,可以通过实现DefinedRowtimeAttributes接口来定义Event time;DefinedRowtimeAttributes定义了getRowtimeAttributeDescriptors方法,返回的是RowtimeAttributeDescriptor的List,RowtimeAttributeDescriptor有3个属性,分别是attributeName、timestampExtractor及watermarkStrategy
doc
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。