内容简介:Flink - FlinkKafkaConsumer010
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));
使用方式如上,核心就是对SourceFunction的实现
FlinkKafkaConsumer010除了重写createFetcher外,大部分都是继承自FlinkKafkaConsumerBase
FlinkKafkaConsumerBase
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
CheckpointListener,
ResultTypeQueryable<T>,
CheckpointedFunction,
CheckpointedRestoring<HashMap<KafkaTopicPartition, Long>>
FlinkKafkaConsumerBase继承RichParallelSourceFunction,实现4个接口
RichFunction.open
先看看FlinkKafkaConsumerBase初始化,
@Override
public void open(Configuration configuration) {
// determine the offset commit mode
// offsetCommitMode有三种,ON_CHECKPOINTS,KAFKA_PERIODIC,DISABLED;如果打开checkpoint,offest会记录在snapshot中,否则offset会定期写回kafka topic
offsetCommitMode = OffsetCommitModes.fromConfiguration(
getIsAutoCommitEnabled(),
enableCommitOnCheckpoints,
((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
// initialize subscribed partitions
List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics); //获取topic的partition信息
subscribedPartitionsToStartOffsets = new HashMap<>(kafkaTopicPartitions.size()); //Map<KafkaTopicPartition, Long>,用于记录每个partition的offset
if (restoredState != null) { //如果有可恢复的state
for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
if (restoredState.containsKey(kafkaTopicPartition)) { //如果state中包含该partition
subscribedPartitionsToStartOffsets.put(kafkaTopicPartition, restoredState.get(kafkaTopicPartition)); //将snapshot中的offset恢复
}
}
} else { //如果没有state,那么初始化subscribedPartitionsToStartOffsets
initializeSubscribedPartitionsToStartOffsets(
subscribedPartitionsToStartOffsets,
kafkaTopicPartitions,
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks(),
startupMode,
specificStartupOffsets);
//startupMode,有下面几种,默认是GROUP_OFFSETS
if (subscribedPartitionsToStartOffsets.size() != 0) {
switch (startupMode) {
case EARLIEST: //从最早的开始读
LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
subscribedPartitionsToStartOffsets.keySet());
break;
case LATEST: //从最新的开始读
LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
subscribedPartitionsToStartOffsets.keySet());
break;
case SPECIFIC_OFFSETS: //从特定的offset开始读
LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
specificStartupOffsets,
subscribedPartitionsToStartOffsets.keySet());
List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size());
for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
}
}
if (partitionsDefaultedToGroupOffsets.size() > 0) { //说明你有某些partitions没有指定offset,所以用了默认的GROUP_OFFSET
LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" +
"; their startup offsets will be defaulted to their committed group offsets in Kafka.",
getRuntimeContext().getIndexOfThisSubtask(),
partitionsDefaultedToGroupOffsets.size(),
partitionsDefaultedToGroupOffsets);
}
break;
default:
case GROUP_OFFSETS: //根据kafka group中的offset开始读
LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
subscribedPartitionsToStartOffsets.keySet());
}
}
}
}
protected static void initializeSubscribedPartitionsToStartOffsets(
Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets,
List<KafkaTopicPartition> kafkaTopicPartitions,
int indexOfThisSubtask,
int numParallelSubtasks,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets) {
for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
if (i % numParallelSubtasks == indexOfThisSubtask) { //如果这个partition会分配到该task;所以只有被分配到的partition会有offset数据,这里实际做了partition的分配
if (startupMode != StartupMode.SPECIFIC_OFFSETS) { //如果不是SPECIFIC_OFFSETS,就把offset设为特定的常量值
subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel());
} else {
KafkaTopicPartition partition = kafkaTopicPartitions.get(i);
Long specificOffset = specificStartupOffsets.get(partition);
if (specificOffset != null) {
// since the specified offsets represent the next record to read, we subtract
// it by one so that the initial state of the consumer will be correct
subscribedPartitionsToStartOffsets.put(partition, specificOffset - 1); //设置上你配置的partition对应的offset,注意需要减一
} else { //如果没有该partition的offset,就用默认的GROUP_OFFSET
subscribedPartitionsToStartOffsets.put(partition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
}
}
}
}
}
初始化的工作主要是恢复和初始化,topic partition的offset
RichParallelSourceFunction
核心run函数,
@Override
public void run(SourceContext<T> sourceContext) throws Exception {
// we need only do work, if we actually have partitions assigned
if (!subscribedPartitionsToStartOffsets.isEmpty()) {
// create the fetcher that will communicate with the Kafka brokers
final AbstractFetcher<T, ?> fetcher = createFetcher(
sourceContext, //sourceContext,用于发送record和watermark
subscribedPartitionsToStartOffsets, //partition,offset对应关系
periodicWatermarkAssigner,
punctuatedWatermarkAssigner,
(StreamingRuntimeContext) getRuntimeContext(),
offsetCommitMode); //offsetCommitMode有三种,ON_CHECKPOINTS,KAFKA_PERIODIC,DISABLED
// publish the reference, for snapshot-, commit-, and cancel calls
// IMPORTANT: We can only do that now, because only now will calls to
// the fetchers 'snapshotCurrentState()' method return at least
// the restored offsets
this.kafkaFetcher = fetcher;
if (!running) {
return;
}
// (3) run the fetcher' main work method
fetcher.runFetchLoop();
}
}
主要就是创建Fetcher,并启动,Fetcher中做了具体的工作
创建Fetcher的参数都中大多比较容易理解, 除了,
periodicWatermarkAssigner
punctuatedWatermarkAssigner
这些是用来产生watermark的,参考Flink - watermark
CheckpointedFunction 接口
主要实现,initializeState,snapshotState函数
initializeState,目的就是从stateBackend中把offset state恢复到restoredState;这个数据在open时候会被用到
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
OperatorStateStore stateStore = context.getOperatorStateStore();
offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); //从StateBackend读出state
if (context.isRestored()) {
if (restoredState == null) {
restoredState = new HashMap<>();
for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsStateForCheckpoint.get()) {
restoredState.put(kafkaOffset.f0, kafkaOffset.f1); //将offsetsStateForCheckpoint数据恢复到restoredState
}
}
}
}
snapshotState,做snapshot的逻辑
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
offsetsStateForCheckpoint.clear(); //transient ListState<Tuple2<KafkaTopicPartition, Long>> offsetsStateForCheckpoint
final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
if (fetcher == null) {
//...
} else {
HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState(); //从fetcher snapshot最新的offset数据
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets); //增加pendingOffset
}
for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
offsetsStateForCheckpoint.add(
Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue())); //把offset存入stateBackend
}
}
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// truncate the map of pending offsets to commit, to prevent infinite growth
while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) { //pending的太多,删掉老的
pendingOffsetsToCommit.remove(0);
}
}
}
CheckpointListener 接口
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// only one commit operation must be in progress
try {
final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId); //在pendingOffsetsToCommit中找这个checkpoint
if (posInMap == -1) {
LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
return;
}
@SuppressWarnings("unchecked")
HashMap<KafkaTopicPartition, Long> offsets =
(HashMap<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap); //移除该checkpoint
// remove older checkpoints in map
for (int i = 0; i < posInMap; i++) {
pendingOffsetsToCommit.remove(0); //把比这个checkpoint更老的也都删掉
}
if (offsets == null || offsets.size() == 0) {
LOG.debug("Checkpoint state was empty.");
return;
}
fetcher.commitInternalOffsetsToKafka(offsets);//将offset信息发给kafka的group
}
}
}
Kafka010Fetcher
FlinkKafkaConsumer010也就重写createFetcher
不同的kafka版本就Fetcher是不一样的
public class Kafka010Fetcher<T> extends Kafka09Fetcher<T>
Kafka010Fetcher的不同,
@Override
protected void emitRecord(
T record,
KafkaTopicPartitionState<TopicPartition> partition,
long offset,
ConsumerRecord<?, ?> consumerRecord) throws Exception {
// we attach the Kafka 0.10 timestamp here
emitRecordWithTimestamp(record, partition, offset, consumerRecord.timestamp()); //0.10支持record中带timestap
}
/**
* This method needs to be overridden because Kafka broke binary compatibility between 0.9 and 0.10,
* changing binary signatures.
*/
@Override
protected KafkaConsumerCallBridge010 createCallBridge() {
return new KafkaConsumerCallBridge010(); //CallBridge目的是在封装各个版本kafka consumer之间的差异
}
KafkaConsumerCallBridge010封装0.10版本中做assignPartitions,seek上API和其他版本的差异性
public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge {
@Override
public void assignPartitions(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) throws Exception {
consumer.assign(topicPartitions);
}
@Override
public void seekPartitionToBeginning(KafkaConsumer<?, ?> consumer, TopicPartition partition) {
consumer.seekToBeginning(Collections.singletonList(partition));
}
@Override
public void seekPartitionToEnd(KafkaConsumer<?, ?> consumer, TopicPartition partition) {
consumer.seekToEnd(Collections.singletonList(partition));
}
}
Kafka09Fetcher
关键是runFetchLoop,启动KafkaConsumerThread
并从handover中取出records,然后封装发出去
@Override
public void runFetchLoop() throws Exception {
try {
final Handover handover = this.handover; //handover用于在Fetcher线程和consumer线程间传递数据
// kick off the actual Kafka consumer
consumerThread.start(); //new KafkaConsumerThread,真正的consumer线程
while (running) {
// this blocks until we get the next records
// it automatically re-throws exceptions encountered in the fetcher thread
final ConsumerRecords<byte[], byte[]> records = handover.pollNext(); //从handover中拿出数据
// get the records for each topic partition
for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {
List<ConsumerRecord<byte[], byte[]>> partitionRecords =
records.records(partition.getKafkaPartitionHandle());//ConsumerRecords中的结构是Map<TopicPartition, List<ConsumerRecord<K, V>>> records
for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
final T value = deserializer.deserialize(
record.key(), record.value(),
record.topic(), record.partition(), record.offset());
if (deserializer.isEndOfStream(value)) {
// end of stream signaled
running = false;
break;
}
// emit the actual record. this also updates offset state atomically
// and deals with timestamps and watermark generation
emitRecord(value, partition, record.offset(), record);
}
}
}
}
这里有个重要的结构是,subscribedPartitionStates
AbstractFetcher
// create our partition state according to the timestamp/watermark mode
this.subscribedPartitionStates = initializeSubscribedPartitionStates(
assignedPartitionsWithInitialOffsets,
timestampWatermarkMode,
watermarksPeriodic, watermarksPunctuated,
userCodeClassLoader);
可以看到,把这些信息都合并放到subscribedPartitionStates,尤其是assignedPartitionsWithInitialOffsets
/**
* Utility method that takes the topic partitions and creates the topic partition state
* holders. If a watermark generator per partition exists, this will also initialize those.
*/
private KafkaTopicPartitionState<KPH>[] initializeSubscribedPartitionStates(
Map<KafkaTopicPartition, Long> assignedPartitionsToInitialOffsets,
int timestampWatermarkMode,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
switch (timestampWatermarkMode) {
case NO_TIMESTAMPS_WATERMARKS: {
//.......
case PERIODIC_WATERMARKS: {
@SuppressWarnings("unchecked")
KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[] partitions = //KafkaTopicPartitionStateWithPeriodicWatermarks是KafkaTopicPartitionState的子类
(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[])
new KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitionsToInitialOffsets.size()]; //大小和assignedPartitionsToInitialOffsets一样
int pos = 0;
for (Map.Entry<KafkaTopicPartition, Long> partition : assignedPartitionsToInitialOffsets.entrySet()) {
KPH kafkaHandle = createKafkaPartitionHandle(partition.getKey()); //生成kafkaHandle,这个就是TopicPartition信息的抽象,为了屏蔽版本间结构的差异
AssignerWithPeriodicWatermarks<T> assignerInstance = //AssignerWithPeriodicWatermarks
watermarksPeriodic.deserializeValue(userCodeClassLoader);
partitions[pos] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>( //对于PUNCTUATED_WATERMARKS,这里是KafkaTopicPartitionStateWithPunctuatedWatermarks
partition.getKey(), kafkaHandle, assignerInstance); //对于NO_TIMESTAMPS_WATERMARKS,这里没有assignerInstance参数
partitions[pos].setOffset(partition.getValue()); //设置offset
pos++;
}
return partitions;
}
case PUNCTUATED_WATERMARKS: {
//......
}
subscribedPartitionStates,中包含该TopicPartition的offset和watermark的提取逻辑
KafkaConsumerThread
@Override
public void run() {
// this is the means to talk to FlinkKafkaConsumer's main thread
final Handover handover = this.handover; //线程间数据交换结构
// This method initializes the KafkaConsumer and guarantees it is torn down properly.
// This is important, because the consumer has multi-threading issues,
// including concurrent 'close()' calls.
final KafkaConsumer<byte[], byte[]> consumer;
try {
consumer = new KafkaConsumer<>(kafkaProperties); //初始化kafka consumer
}
catch (Throwable t) {
handover.reportError(t);
return;
}
// from here on, the consumer is guaranteed to be closed properly
try {
// The callback invoked by Kafka once an offset commit is complete
final OffsetCommitCallback offsetCommitCallback = new CommitCallback(); //这个callback,只是commitInProgress = false,表示commit结束
// offsets in the state may still be placeholder sentinel values if we are starting fresh, or the
// checkpoint / savepoint state we were restored with had not completely been replaced with actual offset
// values yet; replace those with actual offsets, according to what the sentinel value represent.
for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates) {
if (partition.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) { //先把kafka group offset强制设为earliest或latest,然后用kafka上最新的offset更新当前的offset
consumerCallBridge.seekPartitionToBeginning(consumer, partition.getKafkaPartitionHandle());
partition.setOffset(consumer.position(partition.getKafkaPartitionHandle()) - 1);
} else if (partition.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
consumerCallBridge.seekPartitionToEnd(consumer, partition.getKafkaPartitionHandle());
partition.setOffset(consumer.position(partition.getKafkaPartitionHandle()) - 1);
} else if (partition.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
// the KafkaConsumer by default will automatically seek the consumer position
// to the committed group offset, so we do not need to do it.
partition.setOffset(consumer.position(partition.getKafkaPartitionHandle()) - 1); //如果GROUP_OFFSET,就直接读取kafka group上的offset
} else {
consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1); //其他情况,就用partition自带的offset,比如从state中恢复出来的offset
}
}
// from now on, external operations may call the consumer
this.consumer = consumer;
// the latest bulk of records. may carry across the loop if the thread is woken up
// from blocking on the handover
ConsumerRecords<byte[], byte[]> records = null;
// main fetch loop
while (running) {
// check if there is something to commit
if (!commitInProgress) { //同时只能有一个commit
// get and reset the work-to-be committed, so we don't repeatedly commit the same
final Map<TopicPartition, OffsetAndMetadata> toCommit = nextOffsetsToCommit.getAndSet(null); //checkpoint的时候会snapshot fetcher的offset,并通过fetcher.commitInternalOffsetsToKafka设置
if (toCommit != null) {
// also record that a commit is already in progress
// the order here matters! first set the flag, then send the commit command.
commitInProgress = true;
consumer.commitAsync(toCommit, offsetCommitCallback); //异步commit offset
}
}
// get the next batch of records, unless we did not manage to hand the old batch over
if (records == null) {
try {
records = consumer.poll(pollTimeout); //从kafka读取数据
}
catch (WakeupException we) {
continue;
}
}
try {
handover.produce(records); //放入handover
records = null;
}
catch (Handover.WakeupException e) {
// fall through the loop
}
}
// end main fetch loop
}
}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Java Web开发实战经典(基础篇)
李兴华、王月清 / 清华大学出版社 / 2010-8 / 69.80元
本书用通俗易懂的语言和丰富多彩的实例,通过对Ajax、JavaScript、HTML等Web系统开发技术基础知识的讲解,并结合MVC设计模式的理念,详细讲述了使用JSP及Struts框架进行Web系统开发的相关技术。 全书分4部分共17章,内容包括Java Web开发简介,HTML、JavaScript简介,XML简介,Tomcat服务器的安装及配置,JSP基础语法,JSP内置对象,Java......一起来看看 《Java Web开发实战经典(基础篇)》 这本书的介绍吧!