聊聊flink的JDBCOutputFormat

栏目: 数据库 · 发布时间: 7年前

内容简介:flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.javaflink-core-1.7.0-sources.jar!/org/apache/flink/types/Row.javaflink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java

flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java

/**
 * OutputFormat to write Rows into a JDBC database.
 * The OutputFormat has to be configured using the supplied OutputFormatBuilder.
 *
 * @see Row
 * @see DriverManager
 */
public class JDBCOutputFormat extends RichOutputFormat<Row> {
	private static final long serialVersionUID = 1L;
	static final int DEFAULT_BATCH_INTERVAL = 5000;

	private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class);

	private String username;
	private String password;
	private String drivername;
	private String dbURL;
	private String query;
	private int batchInterval = DEFAULT_BATCH_INTERVAL;

	private Connection dbConn;
	private PreparedStatement upload;

	private int batchCount = 0;

	private int[] typesArray;

	public JDBCOutputFormat() {
	}

	@Override
	public void configure(Configuration parameters) {
	}

	/**
	 * Connects to the target database and initializes the prepared statement.
	 *
	 * @param taskNumber The number of the parallel instance.
	 * @throws IOException Thrown, if the output could not be opened due to an
	 * I/O problem.
	 */
	@Override
	public void open(int taskNumber, int numTasks) throws IOException {
		try {
			establishConnection();
			upload = dbConn.prepareStatement(query);
		} catch (SQLException sqe) {
			throw new IllegalArgumentException("open() failed.", sqe);
		} catch (ClassNotFoundException cnfe) {
			throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
		}
	}

	private void establishConnection() throws SQLException, ClassNotFoundException {
		Class.forName(drivername);
		if (username == null) {
			dbConn = DriverManager.getConnection(dbURL);
		} else {
			dbConn = DriverManager.getConnection(dbURL, username, password);
		}
	}

	/**
	 * Adds a record to the prepared statement.
	 *
	 * <p>When this method is called, the output format is guaranteed to be opened.
	 *
	 * <p>WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to
	 * insert a null value but it's not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null))
	 *
	 * @param row The records to add to the output.
	 * @see PreparedStatement
	 * @throws IOException Thrown, if the records could not be added due to an I/O problem.
	 */
	@Override
	public void writeRecord(Row row) throws IOException {

		if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) {
			LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
		}
		try {

			if (typesArray == null) {
				// no types provided
				for (int index = 0; index < row.getArity(); index++) {
					LOG.warn("Unknown column type for column {}. Best effort approach to set its value: {}.", index + 1, row.getField(index));
					upload.setObject(index + 1, row.getField(index));
				}
			} else {
				// types provided
				for (int index = 0; index < row.getArity(); index++) {

					if (row.getField(index) == null) {
						upload.setNull(index + 1, typesArray[index]);
					} else {
						// casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html
						switch (typesArray[index]) {
							case java.sql.Types.NULL:
								upload.setNull(index + 1, typesArray[index]);
								break;
							case java.sql.Types.BOOLEAN:
							case java.sql.Types.BIT:
								upload.setBoolean(index + 1, (boolean) row.getField(index));
								break;
							case java.sql.Types.CHAR:
							case java.sql.Types.NCHAR:
							case java.sql.Types.VARCHAR:
							case java.sql.Types.LONGVARCHAR:
							case java.sql.Types.LONGNVARCHAR:
								upload.setString(index + 1, (String) row.getField(index));
								break;
							case java.sql.Types.TINYINT:
								upload.setByte(index + 1, (byte) row.getField(index));
								break;
							case java.sql.Types.SMALLINT:
								upload.setShort(index + 1, (short) row.getField(index));
								break;
							case java.sql.Types.INTEGER:
								upload.setInt(index + 1, (int) row.getField(index));
								break;
							case java.sql.Types.BIGINT:
								upload.setLong(index + 1, (long) row.getField(index));
								break;
							case java.sql.Types.REAL:
								upload.setFloat(index + 1, (float) row.getField(index));
								break;
							case java.sql.Types.FLOAT:
							case java.sql.Types.DOUBLE:
								upload.setDouble(index + 1, (double) row.getField(index));
								break;
							case java.sql.Types.DECIMAL:
							case java.sql.Types.NUMERIC:
								upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.getField(index));
								break;
							case java.sql.Types.DATE:
								upload.setDate(index + 1, (java.sql.Date) row.getField(index));
								break;
							case java.sql.Types.TIME:
								upload.setTime(index + 1, (java.sql.Time) row.getField(index));
								break;
							case java.sql.Types.TIMESTAMP:
								upload.setTimestamp(index + 1, (java.sql.Timestamp) row.getField(index));
								break;
							case java.sql.Types.BINARY:
							case java.sql.Types.VARBINARY:
							case java.sql.Types.LONGVARBINARY:
								upload.setBytes(index + 1, (byte[]) row.getField(index));
								break;
							default:
								upload.setObject(index + 1, row.getField(index));
								LOG.warn("Unmanaged sql type ({}) for column {}. Best effort approach to set its value: {}.",
									typesArray[index], index + 1, row.getField(index));
								// case java.sql.Types.SQLXML
								// case java.sql.Types.ARRAY:
								// case java.sql.Types.JAVA_OBJECT:
								// case java.sql.Types.BLOB:
								// case java.sql.Types.CLOB:
								// case java.sql.Types.NCLOB:
								// case java.sql.Types.DATALINK:
								// case java.sql.Types.DISTINCT:
								// case java.sql.Types.OTHER:
								// case java.sql.Types.REF:
								// case java.sql.Types.ROWID:
								// case java.sql.Types.STRUC
						}
					}
				}
			}
			upload.addBatch();
			batchCount++;
		} catch (SQLException e) {
			throw new RuntimeException("Preparation of JDBC statement failed.", e);
		}

		if (batchCount >= batchInterval) {
			// execute batch
			flush();
		}
	}

	void flush() {
		try {
			upload.executeBatch();
			batchCount = 0;
		} catch (SQLException e) {
			throw new RuntimeException("Execution of JDBC statement failed.", e);
		}
	}

	int[] getTypesArray() {
		return typesArray;
	}

	/**
	 * Executes prepared statement and closes all resources of this instance.
	 *
	 * @throws IOException Thrown, if the input could not be closed properly.
	 */
	@Override
	public void close() throws IOException {
		if (upload != null) {
			flush();
			// close the connection
			try {
				upload.close();
			} catch (SQLException e) {
				LOG.info("JDBC statement could not be closed: " + e.getMessage());
			} finally {
				upload = null;
			}
		}

		if (dbConn != null) {
			try {
				dbConn.close();
			} catch (SQLException se) {
				LOG.info("JDBC connection could not be closed: " + se.getMessage());
			} finally {
				dbConn = null;
			}
		}
	}

	public static JDBCOutputFormatBuilder buildJDBCOutputFormat() {
		return new JDBCOutputFormatBuilder();
	}

	//......
}
复制代码
PreparedStatement
默认5000

Row

flink-core-1.7.0-sources.jar!/org/apache/flink/types/Row.java

/**
 * A Row can have arbitrary number of fields and contain a set of fields, which may all be
 * different types. The fields in Row can be null. Due to Row is not strongly typed, Flink's
 * type extraction mechanism can't extract correct field types. So that users should manually
 * tell Flink the type information via creating a {@link RowTypeInfo}.
 *
 * <p>
 * The fields in the Row can be accessed by position (zero-based) {@link #getField(int)}. And can
 * set fields by {@link #setField(int, Object)}.
 * <p>
 * Row is in principle serializable. However, it may contain non-serializable fields,
 * in which case serialization will fail.
 *
 */
@PublicEvolving
public class Row implements Serializable{

	private static final long serialVersionUID = 1L;

	/** The array to store actual values. */
	private final Object[] fields;

	/**
	 * Create a new Row instance.
	 * @param arity The number of fields in the Row
	 */
	public Row(int arity) {
		this.fields = new Object[arity];
	}

	/**
	 * Get the number of fields in the Row.
	 * @return The number of fields in the Row.
	 */
	public int getArity() {
		return fields.length;
	}

	/**
	 * Gets the field at the specified position.
	 * @param pos The position of the field, 0-based.
	 * @return The field at the specified position.
	 * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields.
	 */
	public Object getField(int pos) {
		return fields[pos];
	}

	/**
	 * Sets the field at the specified position.
	 *
	 * @param pos The position of the field, 0-based.
	 * @param value The value to be assigned to the field at the specified position.
	 * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields.
	 */
	public void setField(int pos, Object value) {
		fields[pos] = value;
	}

	@Override
	public String toString() {
		StringBuilder sb = new StringBuilder();
		for (int i = 0; i < fields.length; i++) {
			if (i > 0) {
				sb.append(",");
			}
			sb.append(StringUtils.arrayAwareToString(fields[i]));
		}
		return sb.toString();
	}

	@Override
	public boolean equals(Object o) {
		if (this == o) {
			return true;
		}
		if (o == null || getClass() != o.getClass()) {
			return false;
		}

		Row row = (Row) o;

		return Arrays.deepEquals(fields, row.fields);
	}

	@Override
	public int hashCode() {
		return Arrays.deepHashCode(fields);
	}

	/**
	 * Creates a new Row and assigns the given values to the Row's fields.
	 * This is more convenient than using the constructor.
	 *
	 * <p>For example:
	 *
	 * <pre>
	 *     Row.of("hello", true, 1L);}
	 * </pre>
	 * instead of
	 * <pre>
	 *     Row row = new Row(3);
	 *     row.setField(0, "hello");
	 *     row.setField(1, true);
	 *     row.setField(2, 1L);
	 * </pre>
	 *
	 */
	public static Row of(Object... values) {
		Row row = new Row(values.length);
		for (int i = 0; i < values.length; i++) {
			row.setField(i, values[i]);
		}
		return row;
	}

	/**
	 * Creates a new Row which copied from another row.
	 * This method does not perform a deep copy.
	 *
	 * @param row The row being copied.
	 * @return The cloned new Row
	 */
	public static Row copy(Row row) {
		final Row newRow = new Row(row.fields.length);
		System.arraycopy(row.fields, 0, newRow.fields, 0, row.fields.length);
		return newRow;
	}

	/**
	 * Creates a new Row with projected fields from another row.
	 * This method does not perform a deep copy.
	 *
	 * @param fields fields to be projected
	 * @return the new projected Row
	 */
	public static Row project(Row row, int[] fields) {
		final Row newRow = new Row(fields.length);
		for (int i = 0; i < fields.length; i++) {
			newRow.fields[i] = row.fields[fields[i]];
		}
		return newRow;
	}
}
复制代码
  • Row是JDBCOutputFormat的writeRecord的类型,它里头使用Object数据来存取字段值,同时也提供了诸如of、copy、project等静态方法

JDBCOutputFormatBuilder

flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java

/**
	 * Builder for a {@link JDBCOutputFormat}.
	 */
	public static class JDBCOutputFormatBuilder {
		private final JDBCOutputFormat format;

		protected JDBCOutputFormatBuilder() {
			this.format = new JDBCOutputFormat();
		}

		public JDBCOutputFormatBuilder setUsername(String username) {
			format.username = username;
			return this;
		}

		public JDBCOutputFormatBuilder setPassword(String password) {
			format.password = password;
			return this;
		}

		public JDBCOutputFormatBuilder setDrivername(String drivername) {
			format.drivername = drivername;
			return this;
		}

		public JDBCOutputFormatBuilder setDBUrl(String dbURL) {
			format.dbURL = dbURL;
			return this;
		}

		public JDBCOutputFormatBuilder setQuery(String query) {
			format.query = query;
			return this;
		}

		public JDBCOutputFormatBuilder setBatchInterval(int batchInterval) {
			format.batchInterval = batchInterval;
			return this;
		}

		public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray) {
			format.typesArray = typesArray;
			return this;
		}

		/**
		 * Finalizes the configuration and checks validity.
		 *
		 * @return Configured JDBCOutputFormat
		 */
		public JDBCOutputFormat finish() {
			if (format.username == null) {
				LOG.info("Username was not supplied.");
			}
			if (format.password == null) {
				LOG.info("Password was not supplied.");
			}
			if (format.dbURL == null) {
				throw new IllegalArgumentException("No database URL supplied.");
			}
			if (format.query == null) {
				throw new IllegalArgumentException("No query supplied.");
			}
			if (format.drivername == null) {
				throw new IllegalArgumentException("No driver supplied.");
			}

			return format;
		}
	}
复制代码
  • JDBCOutputFormatBuilder提供了对username、password、dbURL、query、drivername、batchInterval、typesArray这几个属性的builder方法

JDBCAppendTableSink

flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java

/**
 * An at-least-once Table sink for JDBC.
 *
 * <p>The mechanisms of Flink guarantees delivering messages at-least-once to this sink (if
 * checkpointing is enabled). However, one common use case is to run idempotent queries
 * (e.g., <code>REPLACE</code> or <code>INSERT OVERWRITE</code>) to upsert into the database and
 * achieve exactly-once semantic.</p>
 */
public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {

	private final JDBCOutputFormat outputFormat;

	private String[] fieldNames;
	private TypeInformation[] fieldTypes;

	JDBCAppendTableSink(JDBCOutputFormat outputFormat) {
		this.outputFormat = outputFormat;
	}

	public static JDBCAppendTableSinkBuilder builder() {
		return new JDBCAppendTableSinkBuilder();
	}

	@Override
	public void emitDataStream(DataStream<Row> dataStream) {
		dataStream
				.addSink(new JDBCSinkFunction(outputFormat))
				.name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames));
	}

	@Override
	public void emitDataSet(DataSet<Row> dataSet) {
		dataSet.output(outputFormat);
	}

	@Override
	public TypeInformation<Row> getOutputType() {
		return new RowTypeInfo(fieldTypes, fieldNames);
	}

	@Override
	public String[] getFieldNames() {
		return fieldNames;
	}

	@Override
	public TypeInformation<?>[] getFieldTypes() {
		return fieldTypes;
	}

	@Override
	public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
		int[] types = outputFormat.getTypesArray();

		String sinkSchema =
			String.join(", ", IntStream.of(types).mapToObj(JDBCTypeUtil::getTypeName).collect(Collectors.toList()));
		String tableSchema =
			String.join(", ", Stream.of(fieldTypes).map(JDBCTypeUtil::getTypeName).collect(Collectors.toList()));
		String msg = String.format("Schema of output table is incompatible with JDBCAppendTableSink schema. " +
			"Table schema: [%s], sink schema: [%s]", tableSchema, sinkSchema);

		Preconditions.checkArgument(fieldTypes.length == types.length, msg);
		for (int i = 0; i < types.length; ++i) {
			Preconditions.checkArgument(
				JDBCTypeUtil.typeInformationToSqlType(fieldTypes[i]) == types[i],
				msg);
		}

		JDBCAppendTableSink copy;
		try {
			copy = new JDBCAppendTableSink(InstantiationUtil.clone(outputFormat));
		} catch (IOException | ClassNotFoundException e) {
			throw new RuntimeException(e);
		}

		copy.fieldNames = fieldNames;
		copy.fieldTypes = fieldTypes;
		return copy;
	}

	@VisibleForTesting
	JDBCOutputFormat getOutputFormat() {
		return outputFormat;
	}
}
复制代码
JDBCSinkFunction
BatchTableSink声明实现TableSink

JDBCSinkFunction

flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java

class JDBCSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction {
	final JDBCOutputFormat outputFormat;

	JDBCSinkFunction(JDBCOutputFormat outputFormat) {
		this.outputFormat = outputFormat;
	}

	@Override
	public void invoke(Row value) throws Exception {
		outputFormat.writeRecord(value);
	}

	@Override
	public void snapshotState(FunctionSnapshotContext context) throws Exception {
		outputFormat.flush();
	}

	@Override
	public void initializeState(FunctionInitializationContext context) throws Exception {
	}

	@Override
	public void open(Configuration parameters) throws Exception {
		super.open(parameters);
		RuntimeContext ctx = getRuntimeContext();
		outputFormat.setRuntimeContext(ctx);
		outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
	}

	@Override
	public void close() throws Exception {
		outputFormat.close();
		super.close();
	}
}
复制代码
  • JDBCSinkFunction继承了RichSinkFunction,同时也实现了CheckpointedFunction接口;invoke方法使用的是JDBCOutputFormat.writeRecord方法,而snapshotState则是调用了JDBCOutputFormat.flush来及时提交记录

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Uberland

Uberland

Alex Rosenblat / University of California Press / 2018-11-19 / GBP 21.00

Silicon Valley technology is transforming the way we work, and Uber is leading the charge. An American startup that promised to deliver entrepreneurship for the masses through its technology, Uber ins......一起来看看 《Uberland》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

SHA 加密
SHA 加密

SHA 加密工具