内容简介:本文主要研究一下flink的Table Formatsflink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/descriptors/ConnectTableDescriptor.scalaflink-table-common-1.7.1-sources.jar!/org/apache/flink/table/descriptors/FormatDescriptor.java
序
本文主要研究一下flink的Table Formats
实例
CSV Format
.withFormat( new Csv() .field("field1", Types.STRING) // required: ordered format fields .field("field2", Types.TIMESTAMP) .fieldDelimiter(",") // optional: string delimiter "," by default .lineDelimiter("\n") // optional: string delimiter "\n" by default .quoteCharacter('"') // optional: single character for string values, empty by default .commentPrefix('#') // optional: string to indicate comments, empty by default .ignoreFirstLine() // optional: ignore the first line, by default it is not skipped .ignoreParseErrors() // optional: skip records with parse error instead of failing by default )
- flink内置支持csv format,无需添加额外依赖
JSON Format
.withFormat( new Json() .failOnMissingField(true) // optional: flag whether to fail if a field is missing or not, false by default // required: define the schema either by using type information which parses numbers to corresponding types .schema(Type.ROW(...)) // or by using a JSON schema which parses to DECIMAL and TIMESTAMP .jsonSchema( "{" + " type: 'object'," + " properties: {" + " lon: {" + " type: 'number'" + " }," + " rideTime: {" + " type: 'string'," + " format: 'date-time'" + " }" + " }" + "}" ) // or use the table's schema .deriveSchema() )
- 可以使用schema或者jsonSchema或者deriveSchema来定义json format,需要额外添加flink-json依赖
Apache Avro Format
.withFormat( new Avro() // required: define the schema either by using an Avro specific record class .recordClass(User.class) // or by using an Avro schema .avroSchema( "{" + " \"type\": \"record\"," + " \"name\": \"test\"," + " \"fields\" : [" + " {\"name\": \"a\", \"type\": \"long\"}," + " {\"name\": \"b\", \"type\": \"string\"}" + " ]" + "}" ) )
- 可以使用recordClass或者avroSchema来定义Avro schema,需要添加flink-avro依赖
ConnectTableDescriptor
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/descriptors/ConnectTableDescriptor.scala
abstract class ConnectTableDescriptor[D <: ConnectTableDescriptor[D]]( private val tableEnv: TableEnvironment, private val connectorDescriptor: ConnectorDescriptor) extends TableDescriptor with SchematicDescriptor[D] with RegistrableDescriptor { this: D => private var formatDescriptor: Option[FormatDescriptor] = None private var schemaDescriptor: Option[Schema] = None //...... override def withFormat(format: FormatDescriptor): D = { formatDescriptor = Some(format) this } //...... }
- StreamTableEnvironment的connect方法创建StreamTableDescriptor;StreamTableDescriptor继承了ConnectTableDescriptor;ConnectTableDescriptor提供了withFormat方法,返回FormatDescriptor
FormatDescriptor
flink-table-common-1.7.1-sources.jar!/org/apache/flink/table/descriptors/FormatDescriptor.java
@PublicEvolving public abstract class FormatDescriptor extends DescriptorBase implements Descriptor { private String type; private int version; /** * Constructs a {@link FormatDescriptor}. * * @param type string that identifies this format * @param version property version for backwards compatibility */ public FormatDescriptor(String type, int version) { this.type = type; this.version = version; } @Override public final Map<String, String> toProperties() { final DescriptorProperties properties = new DescriptorProperties(); properties.putString(FormatDescriptorValidator.FORMAT_TYPE, type); properties.putInt(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION, version); properties.putProperties(toFormatProperties()); return properties.asMap(); } /** * Converts this descriptor into a set of format properties. Usually prefixed with * {@link FormatDescriptorValidator#FORMAT}. */ protected abstract Map<String, String> toFormatProperties(); }
- FormatDescriptor是个抽象类,Csv、Json、Avro都是它的子类
Csv
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/descriptors/Csv.scala
class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) { private var fieldDelim: Option[String] = None private var lineDelim: Option[String] = None private val schema: mutable.LinkedHashMap[String, String] = mutable.LinkedHashMap[String, String]() private var quoteCharacter: Option[Character] = None private var commentPrefix: Option[String] = None private var isIgnoreFirstLine: Option[Boolean] = None private var lenient: Option[Boolean] = None def fieldDelimiter(delim: String): Csv = { this.fieldDelim = Some(delim) this } def lineDelimiter(delim: String): Csv = { this.lineDelim = Some(delim) this } def schema(schema: TableSchema): Csv = { this.schema.clear() schema.getFieldNames.zip(schema.getFieldTypes).foreach { case (n, t) => field(n, t) } this } def field(fieldName: String, fieldType: TypeInformation[_]): Csv = { field(fieldName, TypeStringUtils.writeTypeInfo(fieldType)) this } def field(fieldName: String, fieldType: String): Csv = { if (schema.contains(fieldName)) { throw new ValidationException(s"Duplicate field name $fieldName.") } schema += (fieldName -> fieldType) this } def quoteCharacter(quote: Character): Csv = { this.quoteCharacter = Option(quote) this } def commentPrefix(prefix: String): Csv = { this.commentPrefix = Option(prefix) this } def ignoreFirstLine(): Csv = { this.isIgnoreFirstLine = Some(true) this } def ignoreParseErrors(): Csv = { this.lenient = Some(true) this } override protected def toFormatProperties: util.Map[String, String] = { val properties = new DescriptorProperties() fieldDelim.foreach(properties.putString(FORMAT_FIELD_DELIMITER, _)) lineDelim.foreach(properties.putString(FORMAT_LINE_DELIMITER, _)) val subKeys = util.Arrays.asList( DescriptorProperties.TABLE_SCHEMA_NAME, DescriptorProperties.TABLE_SCHEMA_TYPE) val subValues = schema.map(e => util.Arrays.asList(e._1, e._2)).toList.asJava properties.putIndexedFixedProperties( FORMAT_FIELDS, subKeys, subValues) quoteCharacter.foreach(properties.putCharacter(FORMAT_QUOTE_CHARACTER, _)) commentPrefix.foreach(properties.putString(FORMAT_COMMENT_PREFIX, _)) isIgnoreFirstLine.foreach(properties.putBoolean(FORMAT_IGNORE_FIRST_LINE, _)) lenient.foreach(properties.putBoolean(FORMAT_IGNORE_PARSE_ERRORS, _)) properties.asMap() } }
- Csv提供了field、fieldDelimiter、lineDelimiter、quoteCharacter、commentPrefix、ignoreFirstLine、ignoreParseErrors等方法
Json
flink-json-1.7.1-sources.jar!/org/apache/flink/table/descriptors/Json.java
public class Json extends FormatDescriptor { private Boolean failOnMissingField; private Boolean deriveSchema; private String jsonSchema; private String schema; public Json() { super(FORMAT_TYPE_VALUE, 1); } public Json failOnMissingField(boolean failOnMissingField) { this.failOnMissingField = failOnMissingField; return this; } public Json jsonSchema(String jsonSchema) { Preconditions.checkNotNull(jsonSchema); this.jsonSchema = jsonSchema; this.schema = null; this.deriveSchema = null; return this; } public Json schema(TypeInformation<Row> schemaType) { Preconditions.checkNotNull(schemaType); this.schema = TypeStringUtils.writeTypeInfo(schemaType); this.jsonSchema = null; this.deriveSchema = null; return this; } public Json deriveSchema() { this.deriveSchema = true; this.schema = null; this.jsonSchema = null; return this; } @Override protected Map<String, String> toFormatProperties() { final DescriptorProperties properties = new DescriptorProperties(); if (deriveSchema != null) { properties.putBoolean(FORMAT_DERIVE_SCHEMA, deriveSchema); } if (jsonSchema != null) { properties.putString(FORMAT_JSON_SCHEMA, jsonSchema); } if (schema != null) { properties.putString(FORMAT_SCHEMA, schema); } if (failOnMissingField != null) { properties.putBoolean(FORMAT_FAIL_ON_MISSING_FIELD, failOnMissingField); } return properties.asMap(); } }
- Json提供了schema、jsonSchema、deriveSchema三种方式来定义json format
Avro
flink-avro-1.7.1-sources.jar!/org/apache/flink/table/descriptors/Avro.java
public class Avro extends FormatDescriptor { private Class<? extends SpecificRecord> recordClass; private String avroSchema; public Avro() { super(AvroValidator.FORMAT_TYPE_VALUE, 1); } public Avro recordClass(Class<? extends SpecificRecord> recordClass) { Preconditions.checkNotNull(recordClass); this.recordClass = recordClass; return this; } public Avro avroSchema(String avroSchema) { Preconditions.checkNotNull(avroSchema); this.avroSchema = avroSchema; return this; } @Override protected Map<String, String> toFormatProperties() { final DescriptorProperties properties = new DescriptorProperties(); if (null != recordClass) { properties.putClass(AvroValidator.FORMAT_RECORD_CLASS, recordClass); } if (null != avroSchema) { properties.putString(AvroValidator.FORMAT_AVRO_SCHEMA, avroSchema); } return properties.asMap(); } }
- Avro提供了recordClass、avroSchema两种方式来定义avro format
小结
- StreamTableEnvironment的connect方法创建StreamTableDescriptor;StreamTableDescriptor继承了ConnectTableDescriptor
- ConnectTableDescriptor提供了withFormat方法,返回FormatDescriptor;FormatDescriptor是个抽象类,Csv、Json、Avro都是它的子类
- Csv提供了field、fieldDelimiter、lineDelimiter、quoteCharacter、commentPrefix、ignoreFirstLine、ignoreParseErrors等方法;Json提供了schema、jsonSchema、deriveSchema三种方式来定义json format;Avro提供了recordClass、avroSchema两种方式来定义avro format
doc
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
微信民族志、自媒体时代的知识生产与文化实践
赵旭东 / 中国社会科学出版社 / 2017-9 / 98.00元
进入二十一世纪以来,随着网络技术的发展,自媒体的悄然登场深度影响着我们的日常生活。中国社会中自媒体通讯方式的普及以及随之而有的一种文化书写的新形式——微信民族志的出现使原有文化秩序中时空意义发生转变的同时,也在重新塑造着以研究异文化为己任的人类学学科自身的成长、转型与发展。在此种情境之下,由中国人民大学人类学研究所、中国人民大学国家发展与战略研究院、中国人民大学社会学理论与方法研究中心、《探索与争......一起来看看 《微信民族志、自媒体时代的知识生产与文化实践》 这本书的介绍吧!