聊聊flink的CsvTableSink

栏目: 编程工具 · 发布时间: 6年前

内容简介:本文主要研究一下flink的CsvTableSinkflink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/TableSink.scalaflink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/BatchTableSink.scala

本文主要研究一下flink的CsvTableSink

TableSink

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/TableSink.scala

trait TableSink[T] {

  /**
    * Returns the type expected by this [[TableSink]].
    *
    * This type should depend on the types returned by [[getFieldNames]].
    *
    * @return The type expected by this [[TableSink]].
    */
  def getOutputType: TypeInformation[T]

  /** Returns the names of the table fields. */
  def getFieldNames: Array[String]

  /** Returns the types of the table fields. */
  def getFieldTypes: Array[TypeInformation[_]]

  /**
    * Return a copy of this [[TableSink]] configured with the field names and types of the
    * [[Table]] to emit.
    *
    * @param fieldNames The field names of the table to emit.
    * @param fieldTypes The field types of the table to emit.
    * @return A copy of this [[TableSink]] configured with the field names and types of the
    *         [[Table]] to emit.
    */
  def configure(fieldNames: Array[String],
                fieldTypes: Array[TypeInformation[_]]): TableSink[T]
}
  • TableSink定义了getOutputType、getFieldNames、getFieldTypes、configure三个方法

BatchTableSink

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/BatchTableSink.scala

trait BatchTableSink[T] extends TableSink[T] {

  /** Emits the DataSet. */
  def emitDataSet(dataSet: DataSet[T]): Unit
}
  • BatchTableSink继承了TableSink,定义了emitDataSet方法

StreamTableSink

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/StreamTableSink.scala

trait StreamTableSink[T] extends TableSink[T] {

  /** Emits the DataStream. */
  def emitDataStream(dataStream: DataStream[T]): Unit

}
  • StreamTableSink继承了TableSink,定义了emitDataStream方法

TableSinkBase

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/TableSinkBase.scala

trait TableSinkBase[T] extends TableSink[T] {

  private var fieldNames: Option[Array[String]] = None
  private var fieldTypes: Option[Array[TypeInformation[_]]] = None

  /** Return a deep copy of the [[TableSink]]. */
  protected def copy: TableSinkBase[T]

  /**
    * Return the field names of the [[Table]] to emit. */
  def getFieldNames: Array[String] = {
    fieldNames match {
      case Some(n) => n
      case None => throw new IllegalStateException(
        "TableSink must be configured to retrieve field names.")
    }
  }

  /** Return the field types of the [[Table]] to emit. */
  def getFieldTypes: Array[TypeInformation[_]] = {
    fieldTypes match {
      case Some(t) => t
      case None => throw new IllegalStateException(
        "TableSink must be configured to retrieve field types.")
    }
  }

  /**
    * Return a copy of this [[TableSink]] configured with the field names and types of the
    * [[Table]] to emit.
    *
    * @param fieldNames The field names of the table to emit.
    * @param fieldTypes The field types of the table to emit.
    * @return A copy of this [[TableSink]] configured with the field names and types of the
    *         [[Table]] to emit.
    */
  final def configure(fieldNames: Array[String],
                      fieldTypes: Array[TypeInformation[_]]): TableSink[T] = {

    val configuredSink = this.copy
    configuredSink.fieldNames = Some(fieldNames)
    configuredSink.fieldTypes = Some(fieldTypes)

    configuredSink
  }
}
  • TableSinkBase继承了TableSink,它实现了getFieldNames、getFieldTypes、configure方法

CsvTableSink

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/CsvTableSink.scala

class CsvTableSink(
    path: String,
    fieldDelim: Option[String],
    numFiles: Option[Int],
    writeMode: Option[WriteMode])
  extends TableSinkBase[Row] with BatchTableSink[Row] with AppendStreamTableSink[Row] {

  /**
    * A simple [[TableSink]] to emit data as CSV files.
    *
    * @param path The output path to write the Table to.
    * @param fieldDelim The field delimiter, ',' by default.
    */
  def this(path: String, fieldDelim: String = ",") {
    this(path, Some(fieldDelim), None, None)
  }

  /**
    * A simple [[TableSink]] to emit data as CSV files.
    *
    * @param path The output path to write the Table to.
    * @param fieldDelim The field delimiter.
    * @param numFiles The number of files to write to.
    * @param writeMode The write mode to specify whether existing files are overwritten or not.
    */
  def this(path: String, fieldDelim: String, numFiles: Int, writeMode: WriteMode) {
    this(path, Some(fieldDelim), Some(numFiles), Some(writeMode))
  }

  override def emitDataSet(dataSet: DataSet[Row]): Unit = {
    val csvRows = dataSet.map(new CsvFormatter(fieldDelim.getOrElse(",")))

    if (numFiles.isDefined) {
      csvRows.setParallelism(numFiles.get)
    }

    val sink = writeMode match {
      case None => csvRows.writeAsText(path)
      case Some(wm) => csvRows.writeAsText(path, wm)
    }

    if (numFiles.isDefined) {
      sink.setParallelism(numFiles.get)
    }

    sink.name(TableConnectorUtil.generateRuntimeName(this.getClass, getFieldNames))
  }

  override def emitDataStream(dataStream: DataStream[Row]): Unit = {
    val csvRows = dataStream.map(new CsvFormatter(fieldDelim.getOrElse(",")))

    if (numFiles.isDefined) {
      csvRows.setParallelism(numFiles.get)
    }

    val sink = writeMode match {
      case None => csvRows.writeAsText(path)
      case Some(wm) => csvRows.writeAsText(path, wm)
    }

    if (numFiles.isDefined) {
      sink.setParallelism(numFiles.get)
    }

    sink.name(TableConnectorUtil.generateRuntimeName(this.getClass, getFieldNames))
  }

  override protected def copy: TableSinkBase[Row] = {
    new CsvTableSink(path, fieldDelim, numFiles, writeMode)
  }

  override def getOutputType: TypeInformation[Row] = {
    new RowTypeInfo(getFieldTypes: _*)
  }
}

/**
  * Formats a [[Row]] into a [[String]] with fields separated by the field delimiter.
  *
  * @param fieldDelim The field delimiter.
  */
class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] {
  override def map(row: Row): String = {

    val builder = new StringBuilder

    // write first value
    val v = row.getField(0)
    if (v != null) {
      builder.append(v.toString)
    }

    // write following values
    for (i <- 1 until row.getArity) {
      builder.append(fieldDelim)
      val v = row.getField(i)
      if (v != null) {
        builder.append(v.toString)
      }
    }
    builder.mkString
  }
}

/**
  * Formats a [[Row]] into a [[String]] with fields separated by the field delimiter.
  *
  * @param fieldDelim The field delimiter.
  */
class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] {
  override def map(row: Row): String = {

    val builder = new StringBuilder

    // write first value
    val v = row.getField(0)
    if (v != null) {
      builder.append(v.toString)
    }

    // write following values
    for (i <- 1 until row.getArity) {
      builder.append(fieldDelim)
      val v = row.getField(i)
      if (v != null) {
        builder.append(v.toString)
      }
    }
    builder.mkString
  }
}
  • CsvTableSink继承了TableSinkBase,实现了BatchTableSink及AppendStreamTableSink接口,而AppendStreamTableSink则继承了StreamTableSink
  • emitDataSet及emitDataStream都使用了CsvFormatter,它是一个MapFunction,将Row类型转换为String
  • CsvTableSink有一个名为writeMode的可选参数,WriteMode是一个枚举,它有NO_OVERWRITE及OVERWRITE两个枚举值,用于写csv文件时指定是否要覆盖已有的同名文件

小结

  • TableSink定义了getOutputType、getFieldNames、getFieldTypes、configure三个方法;BatchTableSink继承了TableSink,定义了emitDataSet方法;StreamTableSink继承了TableSink,定义了emitDataStream方法;TableSinkBase继承了TableSink,它实现了getFieldNames、getFieldTypes、configure方法
  • CsvTableSink继承了TableSinkBase,实现了BatchTableSink及AppendStreamTableSink接口,而AppendStreamTableSink则继承了StreamTableSink;emitDataSet及emitDataStream都使用了CsvFormatter,它是一个MapFunction,将Row类型转换为String
  • CsvTableSink有一个名为writeMode的可选参数,WriteMode是一个枚举,它有NO_OVERWRITE及OVERWRITE两个枚举值,用于写csv文件时指定是否要覆盖已有的同名文件

doc


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Beginning Apache Struts

Beginning Apache Struts

Arnold Doray / Apress / 2006-02-20 / USD 44.99

Beginning Apache Struts will provide you a working knowledge of Apache Struts 1.2. This book is ideal for you Java programmers who have some JSP familiarity, but little or no prior experience with Ser......一起来看看 《Beginning Apache Struts》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具