聊聊flink的CsvTableSource

栏目: 编程工具 · 发布时间: 6年前

内容简介:本文主要研究一下flink的CsvTableSourceflink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/TableSource.scalaflink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/BatchTableSource.scala

本文主要研究一下flink的CsvTableSource

TableSource

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/TableSource.scala

trait TableSource[T] {

  /** Returns the [[TypeInformation]] for the return type of the [[TableSource]].
    * The fields of the return type are mapped to the table schema based on their name.
    *
    * @return The type of the returned [[DataSet]] or [[DataStream]].
    */
  def getReturnType: TypeInformation[T]

  /**
    * Returns the schema of the produced table.
    *
    * @return The [[TableSchema]] of the produced table.
    */
  def getTableSchema: TableSchema

  /**
    * Describes the table source.
    *
    * @return A String explaining the [[TableSource]].
    */
  def explainSource(): String =
    TableConnectorUtil.generateRuntimeName(getClass, getTableSchema.getFieldNames)
}
  • TableSource定义了三个方法,分别是getReturnType、getTableSchema、explainSource

BatchTableSource

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/BatchTableSource.scala

trait BatchTableSource[T] extends TableSource[T] {

  /**
    * Returns the data of the table as a [[DataSet]].
    *
    * NOTE: This method is for internal use only for defining a [[TableSource]].
    *       Do not use it in Table API programs.
    */
  def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
}
  • BatchTableSource继承了TableSource,它定义了getDataSet方法

StreamTableSource

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/StreamTableSource.scala

trait StreamTableSource[T] extends TableSource[T] {

  /**
    * Returns the data of the table as a [[DataStream]].
    *
    * NOTE: This method is for internal use only for defining a [[TableSource]].
    *       Do not use it in Table API programs.
    */
  def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
}
  • StreamTableSource继承了TableSource,它定义了getDataStream方法

CsvTableSource

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/CsvTableSource.scala

class CsvTableSource private (
    private val path: String,
    private val fieldNames: Array[String],
    private val fieldTypes: Array[TypeInformation[_]],
    private val selectedFields: Array[Int],
    private val fieldDelim: String,
    private val rowDelim: String,
    private val quoteCharacter: Character,
    private val ignoreFirstLine: Boolean,
    private val ignoreComments: String,
    private val lenient: Boolean)
  extends BatchTableSource[Row]
  with StreamTableSource[Row]
  with ProjectableTableSource[Row] {

  def this(
    path: String,
    fieldNames: Array[String],
    fieldTypes: Array[TypeInformation[_]],
    fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER,
    rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
    quoteCharacter: Character = null,
    ignoreFirstLine: Boolean = false,
    ignoreComments: String = null,
    lenient: Boolean = false) = {

    this(
      path,
      fieldNames,
      fieldTypes,
      fieldTypes.indices.toArray, // initially, all fields are returned
      fieldDelim,
      rowDelim,
      quoteCharacter,
      ignoreFirstLine,
      ignoreComments,
      lenient)

  }

  def this(path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]) = {
    this(path, fieldNames, fieldTypes, CsvInputFormat.DEFAULT_FIELD_DELIMITER,
      CsvInputFormat.DEFAULT_LINE_DELIMITER, null, false, null, false)
  }

  if (fieldNames.length != fieldTypes.length) {
    throw new TableException("Number of field names and field types must be equal.")
  }

  private val selectedFieldTypes = selectedFields.map(fieldTypes(_))
  private val selectedFieldNames = selectedFields.map(fieldNames(_))

  private val returnType: RowTypeInfo = new RowTypeInfo(selectedFieldTypes, selectedFieldNames)

  override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
    execEnv.createInput(createCsvInput(), returnType).name(explainSource())
  }

  /** Returns the [[RowTypeInfo]] for the return type of the [[CsvTableSource]]. */
  override def getReturnType: RowTypeInfo = returnType

  override def getDataStream(streamExecEnv: StreamExecutionEnvironment): DataStream[Row] = {
    streamExecEnv.createInput(createCsvInput(), returnType).name(explainSource())
  }

  /** Returns the schema of the produced table. */
  override def getTableSchema = new TableSchema(fieldNames, fieldTypes)

  /** Returns a copy of [[TableSource]] with ability to project fields */
  override def projectFields(fields: Array[Int]): CsvTableSource = {

    val selectedFields = if (fields.isEmpty) Array(0) else fields

    new CsvTableSource(
      path,
      fieldNames,
      fieldTypes,
      selectedFields,
      fieldDelim,
      rowDelim,
      quoteCharacter,
      ignoreFirstLine,
      ignoreComments,
      lenient)
  }

  private def createCsvInput(): RowCsvInputFormat = {
    val inputFormat = new RowCsvInputFormat(
      new Path(path),
      selectedFieldTypes,
      rowDelim,
      fieldDelim,
      selectedFields)

    inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine)
    inputFormat.setLenient(lenient)
    if (quoteCharacter != null) {
      inputFormat.enableQuotedStringParsing(quoteCharacter)
    }
    if (ignoreComments != null) {
      inputFormat.setCommentPrefix(ignoreComments)
    }

    inputFormat
  }

  override def equals(other: Any): Boolean = other match {
    case that: CsvTableSource => returnType == that.returnType &&
        path == that.path &&
        fieldDelim == that.fieldDelim &&
        rowDelim == that.rowDelim &&
        quoteCharacter == that.quoteCharacter &&
        ignoreFirstLine == that.ignoreFirstLine &&
        ignoreComments == that.ignoreComments &&
        lenient == that.lenient
    case _ => false
  }

  override def hashCode(): Int = {
    returnType.hashCode()
  }

  override def explainSource(): String = {
    s"CsvTableSource(" +
      s"read fields: ${getReturnType.getFieldNames.mkString(", ")})"
  }
}
  • CsvTableSource同时实现了BatchTableSource及StreamTableSource接口;getDataSet方法使用ExecutionEnvironment.createInput创建DataSet;getDataStream方法使用StreamExecutionEnvironment.createInput创建DataStream
  • ExecutionEnvironment.createInput及StreamExecutionEnvironment.createInput接收的InputFormat为RowCsvInputFormat,通过createCsvInput创建而来
  • getTableSchema方法返回的TableSchema通过fieldNames及fieldTypes创建;getReturnType方法返回的RowTypeInfo通过selectedFieldTypes及selectedFieldNames创建;explainSource方法这里返回的是s"CsvTableSource(" + s"read fields: ${getReturnType.getFieldNames.mkString(", ")})"

小结

  • TableSource定义了三个方法,分别是getReturnType、getTableSchema、explainSource;BatchTableSource继承了TableSource,它定义了getDataSet方法;StreamTableSource继承了TableSource,它定义了getDataStream方法
  • CsvTableSource同时实现了BatchTableSource及StreamTableSource接口;getDataSet方法使用ExecutionEnvironment.createInput创建DataSet;getDataStream方法使用StreamExecutionEnvironment.createInput创建DataStream
  • ExecutionEnvironment.createInput及StreamExecutionEnvironment.createInput接收的InputFormat为RowCsvInputFormat,通过createCsvInput创建而来;getTableSchema方法返回的TableSchema通过fieldNames及fieldTypes创建;getReturnType方法返回的RowTypeInfo通过selectedFieldTypes及selectedFieldNames创建;explainSource方法这里返回的是s"CsvTableSource(" + s"read fields: ${getReturnType.getFieldNames.mkString(", ")})"

doc


以上所述就是小编给大家介绍的《聊聊flink的CsvTableSource》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Inside Larry's and Sergey's Brain

Inside Larry's and Sergey's Brain

Richard Brandt / Portfolio / 17 Sep 2009 / USD 24.95

You’ve used their products. You’ve heard about their skyrocketing wealth and “don’t be evil” business motto. But how much do you really know about Google’s founders, Larry Page and Sergey Brin? Inside......一起来看看 《Inside Larry's and Sergey's Brain》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具