聊聊flink的CsvTableSink

栏目: 编程工具 · 发布时间: 6年前

内容简介:本文主要研究一下flink的CsvTableSinkflink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/TableSink.scalaflink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/BatchTableSink.scala

本文主要研究一下flink的CsvTableSink

TableSink

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/TableSink.scala

trait TableSink[T] {

  /**
    * Returns the type expected by this [[TableSink]].
    *
    * This type should depend on the types returned by [[getFieldNames]].
    *
    * @return The type expected by this [[TableSink]].
    */
  def getOutputType: TypeInformation[T]

  /** Returns the names of the table fields. */
  def getFieldNames: Array[String]

  /** Returns the types of the table fields. */
  def getFieldTypes: Array[TypeInformation[_]]

  /**
    * Return a copy of this [[TableSink]] configured with the field names and types of the
    * [[Table]] to emit.
    *
    * @param fieldNames The field names of the table to emit.
    * @param fieldTypes The field types of the table to emit.
    * @return A copy of this [[TableSink]] configured with the field names and types of the
    *         [[Table]] to emit.
    */
  def configure(fieldNames: Array[String],
                fieldTypes: Array[TypeInformation[_]]): TableSink[T]
}
  • TableSink定义了getOutputType、getFieldNames、getFieldTypes、configure三个方法

BatchTableSink

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/BatchTableSink.scala

trait BatchTableSink[T] extends TableSink[T] {

  /** Emits the DataSet. */
  def emitDataSet(dataSet: DataSet[T]): Unit
}
  • BatchTableSink继承了TableSink,定义了emitDataSet方法

StreamTableSink

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/StreamTableSink.scala

trait StreamTableSink[T] extends TableSink[T] {

  /** Emits the DataStream. */
  def emitDataStream(dataStream: DataStream[T]): Unit

}
  • StreamTableSink继承了TableSink,定义了emitDataStream方法

TableSinkBase

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/TableSinkBase.scala

trait TableSinkBase[T] extends TableSink[T] {

  private var fieldNames: Option[Array[String]] = None
  private var fieldTypes: Option[Array[TypeInformation[_]]] = None

  /** Return a deep copy of the [[TableSink]]. */
  protected def copy: TableSinkBase[T]

  /**
    * Return the field names of the [[Table]] to emit. */
  def getFieldNames: Array[String] = {
    fieldNames match {
      case Some(n) => n
      case None => throw new IllegalStateException(
        "TableSink must be configured to retrieve field names.")
    }
  }

  /** Return the field types of the [[Table]] to emit. */
  def getFieldTypes: Array[TypeInformation[_]] = {
    fieldTypes match {
      case Some(t) => t
      case None => throw new IllegalStateException(
        "TableSink must be configured to retrieve field types.")
    }
  }

  /**
    * Return a copy of this [[TableSink]] configured with the field names and types of the
    * [[Table]] to emit.
    *
    * @param fieldNames The field names of the table to emit.
    * @param fieldTypes The field types of the table to emit.
    * @return A copy of this [[TableSink]] configured with the field names and types of the
    *         [[Table]] to emit.
    */
  final def configure(fieldNames: Array[String],
                      fieldTypes: Array[TypeInformation[_]]): TableSink[T] = {

    val configuredSink = this.copy
    configuredSink.fieldNames = Some(fieldNames)
    configuredSink.fieldTypes = Some(fieldTypes)

    configuredSink
  }
}
  • TableSinkBase继承了TableSink,它实现了getFieldNames、getFieldTypes、configure方法

CsvTableSink

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/CsvTableSink.scala

class CsvTableSink(
    path: String,
    fieldDelim: Option[String],
    numFiles: Option[Int],
    writeMode: Option[WriteMode])
  extends TableSinkBase[Row] with BatchTableSink[Row] with AppendStreamTableSink[Row] {

  /**
    * A simple [[TableSink]] to emit data as CSV files.
    *
    * @param path The output path to write the Table to.
    * @param fieldDelim The field delimiter, ',' by default.
    */
  def this(path: String, fieldDelim: String = ",") {
    this(path, Some(fieldDelim), None, None)
  }

  /**
    * A simple [[TableSink]] to emit data as CSV files.
    *
    * @param path The output path to write the Table to.
    * @param fieldDelim The field delimiter.
    * @param numFiles The number of files to write to.
    * @param writeMode The write mode to specify whether existing files are overwritten or not.
    */
  def this(path: String, fieldDelim: String, numFiles: Int, writeMode: WriteMode) {
    this(path, Some(fieldDelim), Some(numFiles), Some(writeMode))
  }

  override def emitDataSet(dataSet: DataSet[Row]): Unit = {
    val csvRows = dataSet.map(new CsvFormatter(fieldDelim.getOrElse(",")))

    if (numFiles.isDefined) {
      csvRows.setParallelism(numFiles.get)
    }

    val sink = writeMode match {
      case None => csvRows.writeAsText(path)
      case Some(wm) => csvRows.writeAsText(path, wm)
    }

    if (numFiles.isDefined) {
      sink.setParallelism(numFiles.get)
    }

    sink.name(TableConnectorUtil.generateRuntimeName(this.getClass, getFieldNames))
  }

  override def emitDataStream(dataStream: DataStream[Row]): Unit = {
    val csvRows = dataStream.map(new CsvFormatter(fieldDelim.getOrElse(",")))

    if (numFiles.isDefined) {
      csvRows.setParallelism(numFiles.get)
    }

    val sink = writeMode match {
      case None => csvRows.writeAsText(path)
      case Some(wm) => csvRows.writeAsText(path, wm)
    }

    if (numFiles.isDefined) {
      sink.setParallelism(numFiles.get)
    }

    sink.name(TableConnectorUtil.generateRuntimeName(this.getClass, getFieldNames))
  }

  override protected def copy: TableSinkBase[Row] = {
    new CsvTableSink(path, fieldDelim, numFiles, writeMode)
  }

  override def getOutputType: TypeInformation[Row] = {
    new RowTypeInfo(getFieldTypes: _*)
  }
}

/**
  * Formats a [[Row]] into a [[String]] with fields separated by the field delimiter.
  *
  * @param fieldDelim The field delimiter.
  */
class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] {
  override def map(row: Row): String = {

    val builder = new StringBuilder

    // write first value
    val v = row.getField(0)
    if (v != null) {
      builder.append(v.toString)
    }

    // write following values
    for (i <- 1 until row.getArity) {
      builder.append(fieldDelim)
      val v = row.getField(i)
      if (v != null) {
        builder.append(v.toString)
      }
    }
    builder.mkString
  }
}

/**
  * Formats a [[Row]] into a [[String]] with fields separated by the field delimiter.
  *
  * @param fieldDelim The field delimiter.
  */
class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] {
  override def map(row: Row): String = {

    val builder = new StringBuilder

    // write first value
    val v = row.getField(0)
    if (v != null) {
      builder.append(v.toString)
    }

    // write following values
    for (i <- 1 until row.getArity) {
      builder.append(fieldDelim)
      val v = row.getField(i)
      if (v != null) {
        builder.append(v.toString)
      }
    }
    builder.mkString
  }
}
  • CsvTableSink继承了TableSinkBase,实现了BatchTableSink及AppendStreamTableSink接口,而AppendStreamTableSink则继承了StreamTableSink
  • emitDataSet及emitDataStream都使用了CsvFormatter,它是一个MapFunction,将Row类型转换为String
  • CsvTableSink有一个名为writeMode的可选参数,WriteMode是一个枚举,它有NO_OVERWRITE及OVERWRITE两个枚举值,用于写csv文件时指定是否要覆盖已有的同名文件

小结

  • TableSink定义了getOutputType、getFieldNames、getFieldTypes、configure三个方法;BatchTableSink继承了TableSink,定义了emitDataSet方法;StreamTableSink继承了TableSink,定义了emitDataStream方法;TableSinkBase继承了TableSink,它实现了getFieldNames、getFieldTypes、configure方法
  • CsvTableSink继承了TableSinkBase,实现了BatchTableSink及AppendStreamTableSink接口,而AppendStreamTableSink则继承了StreamTableSink;emitDataSet及emitDataStream都使用了CsvFormatter,它是一个MapFunction,将Row类型转换为String
  • CsvTableSink有一个名为writeMode的可选参数,WriteMode是一个枚举,它有NO_OVERWRITE及OVERWRITE两个枚举值,用于写csv文件时指定是否要覆盖已有的同名文件

doc


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

从零开始学微信公众号运营推广

从零开始学微信公众号运营推广

叶龙 / 清华大学出版社 / 2017-6-1 / 39.80

本书是丛书的第2本,具体内容如下。 第1章 运营者入门——选择、注册和认证 第2章 变现和赚钱——如何从0到100万 第3章 决定打开率——标题的取名和优化 第4章 决定美观度——图片的选取和优化 第5章 决定停留率——正文的编辑和优化 第6章 决定欣赏率——版式的编辑和优化 第7章 数据的分析——用户内容的精准营销 书中从微信运营入门开始,以商业变......一起来看看 《从零开始学微信公众号运营推广》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具