聊聊flink Table Schema的定义

栏目: 编程工具 · 发布时间: 6年前

内容简介:本文主要研究一下flink Table Schema的定义flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/StreamTableEnvironment.scalaflink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/descriptors/StreamTableDescriptor.scala

本文主要研究一下flink Table Schema的定义

实例

定义字段及类型

.withSchema(
  new Schema()
    .field("MyField1", Types.INT)     // required: specify the fields of the table (in this order)
    .field("MyField2", Types.STRING)
    .field("MyField3", Types.BOOLEAN)
)
  • 通过field定义字段名及字段类型

定义字段属性

.withSchema(
  new Schema()
    .field("MyField1", Types.SQL_TIMESTAMP)
      .proctime()      // optional: declares this field as a processing-time attribute
    .field("MyField2", Types.SQL_TIMESTAMP)
      .rowtime(...)    // optional: declares this field as a event-time attribute
    .field("MyField3", Types.BOOLEAN)
      .from("mf3")     // optional: original field in the input that is referenced/aliased by this field
)
  • 通过proctime定义processing-time,通过rowtime定义event-time,通过from定义引用或别名

定义Rowtime属性

// Converts an existing LONG or SQL_TIMESTAMP field in the input into the rowtime attribute.
.rowtime(
  new Rowtime()
    .timestampsFromField("ts_field")    // required: original field name in the input
)

// Converts the assigned timestamps from a DataStream API record into the rowtime attribute
// and thus preserves the assigned timestamps from the source.
// This requires a source that assigns timestamps (e.g., Kafka 0.10+).
.rowtime(
  new Rowtime()
    .timestampsFromSource()
)

// Sets a custom timestamp extractor to be used for the rowtime attribute.
// The extractor must extend `org.apache.flink.table.sources.tsextractors.TimestampExtractor`.
.rowtime(
  new Rowtime()
    .timestampsFromExtractor(...)
)
  • 通过timestampsFromField、timestampsFromSource、timestampsFromExtractor指定rowtime

定义watermark strategies

// Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
// observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp
// are not late.
.rowtime(
  new Rowtime()
    .watermarksPeriodicAscending()
)

// Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
// Emits watermarks which are the maximum observed timestamp minus the specified delay.
.rowtime(
  new Rowtime()
    .watermarksPeriodicBounded(2000)    // delay in milliseconds
)

// Sets a built-in watermark strategy which indicates the watermarks should be preserved from the
// underlying DataStream API and thus preserves the assigned watermarks from the source.
.rowtime(
  new Rowtime()
    .watermarksFromSource()
)
  • 通过watermarksPeriodicAscending、watermarksPeriodicBounded、watermarksFromSource指定watermark strategies

StreamTableEnvironment.connect

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/StreamTableEnvironment.scala

abstract class StreamTableEnvironment(
    private[flink] val execEnv: StreamExecutionEnvironment,
    config: TableConfig)
  extends TableEnvironment(config) {

  //......

  def connect(connectorDescriptor: ConnectorDescriptor): StreamTableDescriptor = {
    new StreamTableDescriptor(this, connectorDescriptor)
  }

  //......
}
  • StreamTableEnvironment的connect方法创建StreamTableDescriptor

StreamTableDescriptor

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/descriptors/StreamTableDescriptor.scala

class StreamTableDescriptor(
    tableEnv: StreamTableEnvironment,
    connectorDescriptor: ConnectorDescriptor)
  extends ConnectTableDescriptor[StreamTableDescriptor](
    tableEnv,
    connectorDescriptor)
  with StreamableDescriptor[StreamTableDescriptor] {

  //......
}
  • StreamTableDescriptor继承了ConnectTableDescriptor

ConnectTableDescriptor

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/descriptors/ConnectTableDescriptor.scala

abstract class ConnectTableDescriptor[D <: ConnectTableDescriptor[D]](
    private val tableEnv: TableEnvironment,
    private val connectorDescriptor: ConnectorDescriptor)
  extends TableDescriptor
  with SchematicDescriptor[D]
  with RegistrableDescriptor { this: D =>

  private var formatDescriptor: Option[FormatDescriptor] = None
  private var schemaDescriptor: Option[Schema] = None

  /**
    * Searches for the specified table source, configures it accordingly, and registers it as
    * a table under the given name.
    *
    * @param name table name to be registered in the table environment
    */
  override def registerTableSource(name: String): Unit = {
    val tableSource = TableFactoryUtil.findAndCreateTableSource(tableEnv, this)
    tableEnv.registerTableSource(name, tableSource)
  }

  /**
    * Searches for the specified table sink, configures it accordingly, and registers it as
    * a table under the given name.
    *
    * @param name table name to be registered in the table environment
    */
  override def registerTableSink(name: String): Unit = {
    val tableSink = TableFactoryUtil.findAndCreateTableSink(tableEnv, this)
    tableEnv.registerTableSink(name, tableSink)
  }

  /**
    * Searches for the specified table source and sink, configures them accordingly, and registers
    * them as a table under the given name.
    *
    * @param name table name to be registered in the table environment
    */
  override def registerTableSourceAndSink(name: String): Unit = {
    registerTableSource(name)
    registerTableSink(name)
  }

  /**
    * Specifies the format that defines how to read data from a connector.
    */
  override def withFormat(format: FormatDescriptor): D = {
    formatDescriptor = Some(format)
    this
  }

  /**
    * Specifies the resulting table schema.
    */
  override def withSchema(schema: Schema): D = {
    schemaDescriptor = Some(schema)
    this
  }

  // ----------------------------------------------------------------------------------------------

  /**
    * Converts this descriptor into a set of properties.
    */
  override def toProperties: util.Map[String, String] = {
    val properties = new DescriptorProperties()

    // this performs only basic validation
    // more validation can only happen within a factory
    if (connectorDescriptor.isFormatNeeded && formatDescriptor.isEmpty) {
      throw new ValidationException(
        s"The connector '$connectorDescriptor' requires a format description.")
    } else if (!connectorDescriptor.isFormatNeeded && formatDescriptor.isDefined) {
      throw new ValidationException(
        s"The connector '$connectorDescriptor' does not require a format description " +
          s"but '${formatDescriptor.get}' found.")
    }

    properties.putProperties(connectorDescriptor.toProperties)
    formatDescriptor.foreach(d => properties.putProperties(d.toProperties))
    schemaDescriptor.foreach(d => properties.putProperties(d.toProperties))

    properties.asMap()
  }
}
  • ConnectTableDescriptor提供了withSchema方法,返回Schema

Schema

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/descriptors/Schema.scala

class Schema extends Descriptor {

  // maps a field name to a list of properties that describe type, origin, and the time attribute
  private val tableSchema = mutable.LinkedHashMap[String, mutable.LinkedHashMap[String, String]]()

  private var lastField: Option[String] = None

  def schema(schema: TableSchema): Schema = {
    tableSchema.clear()
    lastField = None
    schema.getFieldNames.zip(schema.getFieldTypes).foreach { case (n, t) =>
      field(n, t)
    }
    this
  }

  def field(fieldName: String, fieldType: TypeInformation[_]): Schema = {
    field(fieldName, TypeStringUtils.writeTypeInfo(fieldType))
    this
  }

  def field(fieldName: String, fieldType: String): Schema = {
    if (tableSchema.contains(fieldName)) {
      throw new ValidationException(s"Duplicate field name $fieldName.")
    }

    val fieldProperties = mutable.LinkedHashMap[String, String]()
    fieldProperties += (SCHEMA_TYPE -> fieldType)

    tableSchema += (fieldName -> fieldProperties)

    lastField = Some(fieldName)
    this
  }

  def from(originFieldName: String): Schema = {
    lastField match {
      case None => throw new ValidationException("No field previously defined. Use field() before.")
      case Some(f) =>
        tableSchema(f) += (SCHEMA_FROM -> originFieldName)
        lastField = None
    }
    this
  }

  def proctime(): Schema = {
    lastField match {
      case None => throw new ValidationException("No field defined previously. Use field() before.")
      case Some(f) =>
        tableSchema(f) += (SCHEMA_PROCTIME -> "true")
        lastField = None
    }
    this
  }

  def rowtime(rowtime: Rowtime): Schema = {
    lastField match {
      case None => throw new ValidationException("No field defined previously. Use field() before.")
      case Some(f) =>
        tableSchema(f) ++= rowtime.toProperties.asScala
        lastField = None
    }
    this
  }

  final override def toProperties: util.Map[String, String] = {
    val properties = new DescriptorProperties()
    properties.putIndexedVariableProperties(
      SCHEMA,
      tableSchema.toSeq.map { case (name, props) =>
        (Map(SCHEMA_NAME -> name) ++ props).asJava
      }.asJava
    )
    properties.asMap()
  }
}
  • Schem提供了field、from、proctime、rowtime方法用于定义Schema的相关属性

Rowtime

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/descriptors/Rowtime.scala

class Rowtime extends Descriptor {

  private var timestampExtractor: Option[TimestampExtractor] = None
  private var watermarkStrategy: Option[WatermarkStrategy] = None

  def timestampsFromField(fieldName: String): Rowtime = {
    timestampExtractor = Some(new ExistingField(fieldName))
    this
  }

  def timestampsFromSource(): Rowtime = {
    timestampExtractor = Some(new StreamRecordTimestamp)
    this
  }

  def timestampsFromExtractor(extractor: TimestampExtractor): Rowtime = {
    timestampExtractor = Some(extractor)
    this
  }

  def watermarksPeriodicAscending(): Rowtime = {
    watermarkStrategy = Some(new AscendingTimestamps)
    this
  }

  def watermarksPeriodicBounded(delay: Long): Rowtime = {
    watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay))
    this
  }

  def watermarksFromSource(): Rowtime = {
    watermarkStrategy = Some(PreserveWatermarks.INSTANCE)
    this
  }

  def watermarksFromStrategy(strategy: WatermarkStrategy): Rowtime = {
    watermarkStrategy = Some(strategy)
    this
  }

  final override def toProperties: java.util.Map[String, String] = {
    val properties = new DescriptorProperties()

    timestampExtractor.foreach(normalizeTimestampExtractor(_)
      .foreach(e => properties.putString(e._1, e._2)))
    watermarkStrategy.foreach(normalizeWatermarkStrategy(_)
      .foreach(e => properties.putString(e._1, e._2)))

    properties.asMap()
  }
}
  • Rowtime提供了timestampsFromField、timestampsFromSource、timestampsFromExtractor方法用于指定timestamps;提供了watermarksPeriodicAscending、watermarksPeriodicBounded、watermarksFromSource、watermarksFromStrategy方法用于指定watermark strategies

小结

  • StreamTableEnvironment的connect方法创建StreamTableDescriptor;StreamTableDescriptor继承了ConnectTableDescriptor;ConnectTableDescriptor提供了withSchema方法,返回Schema
  • Schem提供了field、from、proctime、rowtime方法用于定义Schema的相关属性;通过proctime定义processing-time,通过rowtime定义event-time,通过from定义引用或别名
  • Rowtime提供了timestampsFromField、timestampsFromSource、timestampsFromExtractor方法用于指定timestamps;提供了watermarksPeriodicAscending、watermarksPeriodicBounded、watermarksFromSource、watermarksFromStrategy方法用于指定watermark strategies

doc


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Web Analytics 2.0

Web Analytics 2.0

Avinash Kaushik / Sybex / 2009-10-26 / USD 39.99

The bestselling book Web Analytics: An Hour A Day was the first book in the analytics space to move beyond clickstream analysis. Web Analytics 2.0 will significantly evolve the approaches from the fir......一起来看看 《Web Analytics 2.0》 这本书的介绍吧!

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具