聊聊flink的CsvReader

栏目: 编程工具 · 发布时间: 7年前

内容简介:flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.javaflink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/CsvReader.javaflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<RecordDto> csvInput = env.readCsvFile(csvFilePath)
                .pojoType(RecordDto.class, "playerName", "country", "year", "game", "gold", "silver", "bronze", "total");

        csvInput.map(new MapFunction<RecordDto, RecordDto>() {
            @Override
            public RecordDto map(RecordDto value) throws Exception {
                LOGGER.info("execute map:{}",value);
                TimeUnit.SECONDS.sleep(5);
                return value;
            }
        }).print();
复制代码

ExecutionEnvironment.readCsvFile

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.java

/**
	 * Creates a CSV reader to read a comma separated value (CSV) file. The reader has options to
	 * define parameters and field types and will eventually produce the DataSet that corresponds to
	 * the read and parsed CSV input.
	 *
	 * @param filePath The path of the CSV file.
	 * @return A CsvReader that can be used to configure the CSV input.
	 */
	public CsvReader readCsvFile(String filePath) {
		return new CsvReader(filePath, this);
	}
复制代码
  • 这里根据filePath创建了CsvReader

CsvReader

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/CsvReader.java

public CsvReader(String filePath, ExecutionEnvironment executionContext) {
		this(new Path(Preconditions.checkNotNull(filePath, "The file path may not be null.")), executionContext);
	}

	public CsvReader(Path filePath, ExecutionEnvironment executionContext) {
		Preconditions.checkNotNull(filePath, "The file path may not be null.");
		Preconditions.checkNotNull(executionContext, "The execution context may not be null.");

		this.path = filePath;
		this.executionContext = executionContext;
	}

	/**
	 * Configures the reader to read the CSV data and parse it to the given type. The all fields of the type
	 * must be public or able to set value. The type information for the fields is obtained from the type class.
	 *
	 * @param pojoType The class of the target POJO.
	 * @param pojoFields The fields of the POJO which are mapped to CSV fields.
	 * @return The DataSet representing the parsed CSV data.
	 */
	public <T> DataSource<T> pojoType(Class<T> pojoType, String... pojoFields) {
		Preconditions.checkNotNull(pojoType, "The POJO type class must not be null.");
		Preconditions.checkNotNull(pojoFields, "POJO fields must be specified (not null) if output type is a POJO.");

		final TypeInformation<T> ti = TypeExtractor.createTypeInfo(pojoType);
		if (!(ti instanceof PojoTypeInfo)) {
			throw new IllegalArgumentException(
				"The specified class is not a POJO. The type class must meet the POJO requirements. Found: " + ti);
		}
		final PojoTypeInfo<T> pti = (PojoTypeInfo<T>) ti;

		CsvInputFormat<T> inputFormat = new PojoCsvInputFormat<T>(path, this.lineDelimiter, this.fieldDelimiter, pti, pojoFields, this.includedMask);

		configureInputFormat(inputFormat);

		return new DataSource<T>(executionContext, inputFormat, pti, Utils.getCallLocationName());
	}
复制代码
  • CsvReader提供了pojoType方法,用于将csv的数据映射为 java 类型,同时转换为flink的DataSource;创建DataSource的时候,这里提供了PojoCsvInputFormat以及PojoTypeInfo

Task

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java

/**
 * The Task represents one execution of a parallel subtask on a TaskManager.
 * A Task wraps a Flink operator (which may be a user function) and
 * runs it, providing all services necessary for example to consume input data,
 * produce its results (intermediate result partitions) and communicate
 * with the JobManager.
 *
 * <p>The Flink operators (implemented as subclasses of
 * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
 * The task connects those to the network stack and actor messages, and tracks the state
 * of the execution and handles exceptions.
 *
 * <p>Tasks have no knowledge about how they relate to other tasks, or whether they
 * are the first attempt to execute the task, or a repeated attempt. All of that
 * is only known to the JobManager. All the task knows are its own runnable code,
 * the task's configuration, and the IDs of the intermediate results to consume and
 * produce (if any).
 *
 * <p>Each Task is run by one dedicated thread.
 */
public class Task implements Runnable, TaskActions, CheckpointListener {
    //......

    /**
     * The core work method that bootstraps the task and executes its code.
     */
    @Override
    public void run() {
            //......
            // now load and instantiate the task's invokable code
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

            // ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------

            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;

            // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }

            // notify everyone that we switched to running
            notifyObservers(ExecutionState.RUNNING, null);
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

            // make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);

            // run the invokable
            invokable.invoke();

            //......
    }
}
复制代码
  • Task的run方法会调用invokable.invoke(),这里的invokable为DataSourceTask

DataSourceTask.invoke

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/DataSourceTask.java

@Override
	public void invoke() throws Exception {
		// --------------------------------------------------------------------
		// Initialize
		// --------------------------------------------------------------------
		initInputFormat();

		LOG.debug(getLogString("Start registering input and output"));

		try {
			initOutputs(getUserCodeClassLoader());
		} catch (Exception ex) {
			throw new RuntimeException("The initialization of the DataSource's outputs caused an error: " +
					ex.getMessage(), ex);
		}

		LOG.debug(getLogString("Finished registering input and output"));

		// --------------------------------------------------------------------
		// Invoke
		// --------------------------------------------------------------------
		LOG.debug(getLogString("Starting data source operator"));

		RuntimeContext ctx = createRuntimeContext();

		final Counter numRecordsOut;
		{
			Counter tmpNumRecordsOut;
			try {
				OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup();
				ioMetricGroup.reuseInputMetricsForTask();
				if (this.config.getNumberOfChainedStubs() == 0) {
					ioMetricGroup.reuseOutputMetricsForTask();
				}
				tmpNumRecordsOut = ioMetricGroup.getNumRecordsOutCounter();
			} catch (Exception e) {
				LOG.warn("An exception occurred during the metrics setup.", e);
				tmpNumRecordsOut = new SimpleCounter();
			}
			numRecordsOut = tmpNumRecordsOut;
		}
		
		Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed");

		if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
			((RichInputFormat) this.format).setRuntimeContext(ctx);
			LOG.debug(getLogString("Rich Source detected. Initializing runtime context."));
			((RichInputFormat) this.format).openInputFormat();
			LOG.debug(getLogString("Rich Source detected. Opening the InputFormat."));
		}

		ExecutionConfig executionConfig = getExecutionConfig();

		boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled();

		LOG.debug("DataSourceTask object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
		
		final TypeSerializer<OT> serializer = this.serializerFactory.getSerializer();
		
		try {
			// start all chained tasks
			BatchTask.openChainedTasks(this.chainedTasks, this);
			
			// get input splits to read
			final Iterator<InputSplit> splitIterator = getInputSplits();
			
			// for each assigned input split
			while (!this.taskCanceled && splitIterator.hasNext())
			{
				// get start and end
				final InputSplit split = splitIterator.next();

				LOG.debug(getLogString("Opening input split " + split.toString()));
				
				final InputFormat<OT, InputSplit> format = this.format;
			
				// open input format
				format.open(split);
	
				LOG.debug(getLogString("Starting to read input from split " + split.toString()));
				
				try {
					final Collector<OT> output = new CountingCollector<>(this.output, numRecordsOut);

					if (objectReuseEnabled) {
						OT reuse = serializer.createInstance();

						// as long as there is data to read
						while (!this.taskCanceled && !format.reachedEnd()) {

							OT returned;
							if ((returned = format.nextRecord(reuse)) != null) {
								output.collect(returned);
							}
						}
					} else {
						// as long as there is data to read
						while (!this.taskCanceled && !format.reachedEnd()) {
							OT returned;
							if ((returned = format.nextRecord(serializer.createInstance())) != null) {
								output.collect(returned);
							}
						}
					}

					if (LOG.isDebugEnabled() && !this.taskCanceled) {
						LOG.debug(getLogString("Closing input split " + split.toString()));
					}
				} finally {
					// close. We close here such that a regular close throwing an exception marks a task as failed.
					format.close();
				}
				completedSplitsCounter.inc();
			} // end for all input splits

			// close the collector. if it is a chaining task collector, it will close its chained tasks
			this.output.close();

			// close all chained tasks letting them report failure
			BatchTask.closeChainedTasks(this.chainedTasks, this);

		}
		catch (Exception ex) {
			// close the input, but do not report any exceptions, since we already have another root cause
			try {
				this.format.close();
			} catch (Throwable ignored) {}

			BatchTask.cancelChainedTasks(this.chainedTasks);

			ex = ExceptionInChainedStubException.exceptionUnwrap(ex);

			if (ex instanceof CancelTaskException) {
				// forward canceling exception
				throw ex;
			}
			else if (!this.taskCanceled) {
				// drop exception, if the task was canceled
				BatchTask.logAndThrowException(ex, this);
			}
		} finally {
			BatchTask.clearWriters(eventualOutputs);
			// --------------------------------------------------------------------
			// Closing
			// --------------------------------------------------------------------
			if (this.format != null && RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
				((RichInputFormat) this.format).closeInputFormat();
				LOG.debug(getLogString("Rich Source detected. Closing the InputFormat."));
			}
		}

		if (!this.taskCanceled) {
			LOG.debug(getLogString("Finished data source operator"));
		}
		else {
			LOG.debug(getLogString("Data source operator cancelled"));
		}
	}
复制代码
PojoCsvInputFormat

DelimitedInputFormat

flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/io/DelimitedInputFormat.java

/**
	 * The default read buffer size = 1MB.
	 */
	private static final int DEFAULT_READ_BUFFER_SIZE = 1024 * 1024;

	private transient byte[] readBuffer;

	private int bufferSize = -1;

	private void initBuffers() {
		this.bufferSize = this.bufferSize <= 0 ? DEFAULT_READ_BUFFER_SIZE : this.bufferSize;

		if (this.bufferSize <= this.delimiter.length) {
			throw new IllegalArgumentException("Buffer size must be greater than length of delimiter.");
		}

		if (this.readBuffer == null || this.readBuffer.length != this.bufferSize) {
			this.readBuffer = new byte[this.bufferSize];
		}
		if (this.wrapBuffer == null || this.wrapBuffer.length < 256) {
			this.wrapBuffer = new byte[256];
		}

		this.readPos = 0;
		this.limit = 0;
		this.overLimit = false;
		this.end = false;
	}

	/**
	 * Checks whether the current split is at its end.
	 * 
	 * @return True, if the split is at its end, false otherwise.
	 */
	@Override
	public boolean reachedEnd() {
		return this.end;
	}
	
	@Override
	public OT nextRecord(OT record) throws IOException {
		if (readLine()) {
			return readRecord(record, this.currBuffer, this.currOffset, this.currLen);
		} else {
			this.end = true;
			return null;
		}
	}

	/**
	 * Fills the read buffer with bytes read from the file starting from an offset.
	 */
	private boolean fillBuffer(int offset) throws IOException {
		int maxReadLength = this.readBuffer.length - offset;
		// special case for reading the whole split.
		if (this.splitLength == FileInputFormat.READ_WHOLE_SPLIT_FLAG) {
			int read = this.stream.read(this.readBuffer, offset, maxReadLength);
			if (read == -1) {
				this.stream.close();
				this.stream = null;
				return false;
			} else {
				this.readPos = offset;
				this.limit = read;
				return true;
			}
		}
		
		// else ..
		int toRead;
		if (this.splitLength > 0) {
			// if we have more data, read that
			toRead = this.splitLength > maxReadLength ? maxReadLength : (int) this.splitLength;
		}
		else {
			// if we have exhausted our split, we need to complete the current record, or read one
			// more across the next split.
			// the reason is that the next split will skip over the beginning until it finds the first
			// delimiter, discarding it as an incomplete chunk of data that belongs to the last record in the
			// previous split.
			toRead = maxReadLength;
			this.overLimit = true;
		}

		int read = this.stream.read(this.readBuffer, offset, toRead);

		if (read == -1) {
			this.stream.close();
			this.stream = null;
			return false;
		} else {
			this.splitLength -= read;
			this.readPos = offset; // position from where to start reading
			this.limit = read + offset; // number of valid bytes in the read buffer
			return true;
		}
	}
复制代码
DelimitedInputFormat.getStatistics方法里头FileInputSplit的length

CsvInputFormat.readRecord

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/CsvInputFormat.java

@Override
	public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
		/*
		 * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
		 */
		// Found window's end line, so find carriage return before the newline
		if (this.lineDelimiterIsLinebreak && numBytes > 0 && bytes[offset + numBytes - 1] == '\r') {
			//reduce the number of bytes so that the Carriage return is not taken as data
			numBytes--;
		}

		if (commentPrefix != null && commentPrefix.length <= numBytes) {
			//check record for comments
			boolean isComment = true;
			for (int i = 0; i < commentPrefix.length; i++) {
				if (commentPrefix[i] != bytes[offset + i]) {
					isComment = false;
					break;
				}
			}
			if (isComment) {
				this.commentCount++;
				return null;
			}
		}

		if (parseRecord(parsedValues, bytes, offset, numBytes)) {
			return fillRecord(reuse, parsedValues);
		} else {
			this.invalidLineCount++;
			return null;
		}
	}
复制代码
  • CsvInputFormat的readRecord方法负责读取原始数据,之后通过parseRecord方法解析原始数据填充到parsedValues( Object[] ),之后调用子类的fillRecord方法( 这里是PojoCsvInputFormat )将parsedValues填充到reuse对象( 该对象是DataSourceTask在调用format.nextRecord时传入的serializer.createInstance() )

PojoCsvInputFormat.fillRecord

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/PojoCsvInputFormat.java

/**
 * Input format that reads csv into POJOs.
 * @param <OUT> resulting POJO type
 */
@Internal
public class PojoCsvInputFormat<OUT> extends CsvInputFormat<OUT> {

	//......

	@Override
	public void open(FileInputSplit split) throws IOException {
		super.open(split);

		pojoFields = new Field[pojoFieldNames.length];

		Map<String, Field> allFields = new HashMap<String, Field>();

		findAllFields(pojoTypeClass, allFields);

		for (int i = 0; i < pojoFieldNames.length; i++) {
			pojoFields[i] = allFields.get(pojoFieldNames[i]);

			if (pojoFields[i] != null) {
				pojoFields[i].setAccessible(true);
			} else {
				throw new RuntimeException("There is no field called \"" + pojoFieldNames[i] + "\" in " + pojoTypeClass.getName());
			}
		}
	}

	@Override
	public OUT fillRecord(OUT reuse, Object[] parsedValues) {
		for (int i = 0; i < parsedValues.length; i++) {
			try {
				pojoFields[i].set(reuse, parsedValues[i]);
			} catch (IllegalAccessException e) {
				throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldNames[i] + "\"", e);
			}
		}
		return reuse;
	}

	//......
}
复制代码
  • PojoCsvInputFormat的open方法用于在executor的executePlan的时候调用,提前使用反射获取所需的Field
  • fillRecord方法这里仅仅是使用反射将parsedValues设置到pojo中
  • 如果反射设置不成功则抛出IllegalAccessException异常

CountingCollector.collect

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/util/metrics/CountingCollector.java

public class CountingCollector<OUT> implements Collector<OUT> {
	private final Collector<OUT> collector;
	private final Counter numRecordsOut;

	public CountingCollector(Collector<OUT> collector, Counter numRecordsOut) {
		this.collector = collector;
		this.numRecordsOut = numRecordsOut;
	}

	@Override
	public void collect(OUT record) {
		this.numRecordsOut.inc();
		this.collector.collect(record);
	}

	@Override
	public void close() {
		this.collector.close();
	}
}
复制代码
  • 这里的collector为org.apache.flink.runtime.operators.chaining.ChainedMapDriver

ChainedMapDriver

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java

@Override
	public void collect(IT record) {
		try {
			this.numRecordsIn.inc();
			this.outputCollector.collect(this.mapper.map(record));
		} catch (Exception ex) {
			throw new ExceptionInChainedStubException(this.taskName, ex);
		}
	}
复制代码
  • 这里会先调用mapper的map方法,执行map逻辑,然后调用outputCollector.collect将结果发送出去
  • 这里的outputCollector为CountingCollector,它里头包装的collector为org.apache.flink.runtime.operators.shipping.OutputCollector

OutputCollector

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/shipping/OutputCollector.java

/**
	 * Collects a record and emits it to all writers.
	 */
	@Override
	public void collect(T record)  {
		if (record != null) {
			this.delegate.setInstance(record);
			try {
				for (RecordWriter<SerializationDelegate<T>> writer : writers) {
					writer.emit(this.delegate);
				}
			}
			catch (IOException e) {
				throw new RuntimeException("Emitting the record caused an I/O exception: " + e.getMessage(), e);
			}
			catch (InterruptedException e) {
				throw new RuntimeException("Emitting the record was interrupted: " + e.getMessage(), e);
			}
		}
		else {
			throw new NullPointerException("The system does not support records that are null."
								+ "Null values are only supported as fields inside other objects.");
		}
	}
复制代码
  • 这里调用RecordWriter的emit方法来发射数据

RecordWriter

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java

public void emit(T record) throws IOException, InterruptedException {
		for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {
			sendToTarget(record, targetChannel);
		}
	}
复制代码
  • 这里通过channelSelector.selectChannels返回要发送的targetChannel,这里的channelSelector为OutputEmitter

OutputEmitter

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/shipping/OutputEmitter.java

@Override
	public final int[] selectChannels(SerializationDelegate<T> record, int numberOfChannels) {
		switch (strategy) {
		case FORWARD:
			return forward();
		case PARTITION_RANDOM:
		case PARTITION_FORCED_REBALANCE:
			return robin(numberOfChannels);
		case PARTITION_HASH:
			return hashPartitionDefault(record.getInstance(), numberOfChannels);
		case BROADCAST:
			return broadcast(numberOfChannels);
		case PARTITION_CUSTOM:
			return customPartition(record.getInstance(), numberOfChannels);
		case PARTITION_RANGE:
			return rangePartition(record.getInstance(), numberOfChannels);
		default:
			throw new UnsupportedOperationException("Unsupported distribution strategy: " + strategy.name());
		}
	}

	private int[] forward() {
		return this.channels;
	}
复制代码
  • 这里的strategy为FORWARD

以上所述就是小编给大家介绍的《聊聊flink的CsvReader》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Shallows

The Shallows

Nicholas Carr / W. W. Norton & Company / 2010-6-15 / USD 26.95

"Is Google making us stupid?" When Nicholas Carr posed that question, in a celebrated Atlantic Monthly cover story, he tapped into a well of anxiety about how the Internet is changing us. He also crys......一起来看看 《The Shallows》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具