Flink - FlinkKafkaConsumer010

栏目: 后端 · 发布时间: 8年前

内容简介:Flink - FlinkKafkaConsumer010
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
    .addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));

使用方式如上,核心就是对SourceFunction的实现

FlinkKafkaConsumer010除了重写createFetcher外,大部分都是继承自FlinkKafkaConsumerBase

FlinkKafkaConsumerBase

    public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
        CheckpointListener,
        ResultTypeQueryable<T>,
        CheckpointedFunction,
        CheckpointedRestoring<HashMap<KafkaTopicPartition, Long>>

FlinkKafkaConsumerBase继承RichParallelSourceFunction,实现4个接口

RichFunction.open

先看看FlinkKafkaConsumerBase初始化,

    @Override
    public void open(Configuration configuration) {
        // determine the offset commit mode
        // offsetCommitMode有三种,ON_CHECKPOINTS,KAFKA_PERIODIC,DISABLED;如果打开checkpoint,offest会记录在snapshot中,否则offset会定期写回kafka topic
        offsetCommitMode = OffsetCommitModes.fromConfiguration(
                getIsAutoCommitEnabled(),
                enableCommitOnCheckpoints,
                ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());

        // initialize subscribed partitions
        List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics); //获取topic的partition信息

        subscribedPartitionsToStartOffsets = new HashMap<>(kafkaTopicPartitions.size()); //Map<KafkaTopicPartition, Long>,用于记录每个partition的offset

        if (restoredState != null) { //如果有可恢复的state
            for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
                if (restoredState.containsKey(kafkaTopicPartition)) { //如果state中包含该partition
                    subscribedPartitionsToStartOffsets.put(kafkaTopicPartition, restoredState.get(kafkaTopicPartition)); //将snapshot中的offset恢复
                }
            }

        } else { //如果没有state,那么初始化subscribedPartitionsToStartOffsets
            initializeSubscribedPartitionsToStartOffsets(
                subscribedPartitionsToStartOffsets,
                kafkaTopicPartitions,
                getRuntimeContext().getIndexOfThisSubtask(),
                getRuntimeContext().getNumberOfParallelSubtasks(),
                startupMode,
                specificStartupOffsets);
            //startupMode,有下面几种,默认是GROUP_OFFSETS
            if (subscribedPartitionsToStartOffsets.size() != 0) {
                switch (startupMode) {
                    case EARLIEST: //从最早的开始读
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            subscribedPartitionsToStartOffsets.size(),
                            subscribedPartitionsToStartOffsets.keySet());
                        break;
                    case LATEST: //从最新的开始读
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            subscribedPartitionsToStartOffsets.size(),
                            subscribedPartitionsToStartOffsets.keySet());
                        break;
                    case SPECIFIC_OFFSETS: //从特定的offset开始读
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            subscribedPartitionsToStartOffsets.size(),
                            specificStartupOffsets,
                            subscribedPartitionsToStartOffsets.keySet());

                        List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size());
                        for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
                            if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
                                partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
                            }
                        }

                        if (partitionsDefaultedToGroupOffsets.size() > 0) { //说明你有某些partitions没有指定offset,所以用了默认的GROUP_OFFSET
                            LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" +
                                    "; their startup offsets will be defaulted to their committed group offsets in Kafka.",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                partitionsDefaultedToGroupOffsets.size(),
                                partitionsDefaultedToGroupOffsets);
                        }
                        break;
                    default:
                    case GROUP_OFFSETS: //根据kafka group中的offset开始读
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            subscribedPartitionsToStartOffsets.size(),
                            subscribedPartitionsToStartOffsets.keySet());
                }
            }
        }
    }
    
    protected static void initializeSubscribedPartitionsToStartOffsets(
            Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets,
            List<KafkaTopicPartition> kafkaTopicPartitions,
            int indexOfThisSubtask,
            int numParallelSubtasks,
            StartupMode startupMode,
            Map<KafkaTopicPartition, Long> specificStartupOffsets) {

        for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
            if (i % numParallelSubtasks == indexOfThisSubtask) { //如果这个partition会分配到该task;所以只有被分配到的partition会有offset数据,这里实际做了partition的分配
                if (startupMode != StartupMode.SPECIFIC_OFFSETS) { //如果不是SPECIFIC_OFFSETS,就把offset设为特定的常量值
                    subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel());
                } else {

                    KafkaTopicPartition partition = kafkaTopicPartitions.get(i);

                    Long specificOffset = specificStartupOffsets.get(partition);
                    if (specificOffset != null) {
                        // since the specified offsets represent the next record to read, we subtract
                        // it by one so that the initial state of the consumer will be correct
                        subscribedPartitionsToStartOffsets.put(partition, specificOffset - 1); //设置上你配置的partition对应的offset,注意需要减一
                    } else { //如果没有该partition的offset,就用默认的GROUP_OFFSET
                        subscribedPartitionsToStartOffsets.put(partition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
                    }
                }
            }
        }
    }

初始化的工作主要是恢复和初始化,topic partition的offset

RichParallelSourceFunction

核心run函数,

    @Override
    public void run(SourceContext<T> sourceContext) throws Exception {

        // we need only do work, if we actually have partitions assigned
        if (!subscribedPartitionsToStartOffsets.isEmpty()) {

            // create the fetcher that will communicate with the Kafka brokers
            final AbstractFetcher<T, ?> fetcher = createFetcher(
                    sourceContext, //sourceContext,用于发送record和watermark
                    subscribedPartitionsToStartOffsets, //partition,offset对应关系
                       periodicWatermarkAssigner,
                    punctuatedWatermarkAssigner,
                    (StreamingRuntimeContext) getRuntimeContext(),
                    offsetCommitMode); //offsetCommitMode有三种,ON_CHECKPOINTS,KAFKA_PERIODIC,DISABLED

            // publish the reference, for snapshot-, commit-, and cancel calls
            // IMPORTANT: We can only do that now, because only now will calls to
            //            the fetchers 'snapshotCurrentState()' method return at least
            //            the restored offsets
            this.kafkaFetcher = fetcher;
            if (!running) {
                return;
            }

            // (3) run the fetcher' main work method
            fetcher.runFetchLoop();
        }
    }

主要就是创建Fetcher,并启动,Fetcher中做了具体的工作

创建Fetcher的参数都中大多比较容易理解, 除了,

periodicWatermarkAssigner

punctuatedWatermarkAssigner

这些是用来产生watermark的,参考Flink - watermark

CheckpointedFunction 接口

主要实现,initializeState,snapshotState函数

initializeState,目的就是从stateBackend中把offset state恢复到restoredState;这个数据在open时候会被用到

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {

        OperatorStateStore stateStore = context.getOperatorStateStore();
        offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); //从StateBackend读出state

        if (context.isRestored()) {
            if (restoredState == null) {
                restoredState = new HashMap<>();
                for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsStateForCheckpoint.get()) {
                    restoredState.put(kafkaOffset.f0, kafkaOffset.f1); //将offsetsStateForCheckpoint数据恢复到restoredState
                }
            }
        }
    }

snapshotState,做snapshot的逻辑

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
    
        offsetsStateForCheckpoint.clear(); //transient ListState<Tuple2<KafkaTopicPartition, Long>> offsetsStateForCheckpoint

        final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
        if (fetcher == null) {
            //...
        } else {
            HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState(); //从fetcher snapshot最新的offset数据

            if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                // the map cannot be asynchronously updated, because only one checkpoint call can happen
                // on this function at a time: either snapshotState() or notifyCheckpointComplete()
                pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets); //增加pendingOffset
            }

            for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
                offsetsStateForCheckpoint.add(
                        Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue())); //把offset存入stateBackend
            }
        }

        if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
            // truncate the map of pending offsets to commit, to prevent infinite growth
            while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) { //pending的太多,删掉老的
                pendingOffsetsToCommit.remove(0);
            }
        }
    }

CheckpointListener 接口

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {

        final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;

        if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
            // only one commit operation must be in progress
            try {
                final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId); //在pendingOffsetsToCommit中找这个checkpoint
                if (posInMap == -1) {
                    LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
                    return;
                }

                @SuppressWarnings("unchecked")
                HashMap<KafkaTopicPartition, Long> offsets =
                    (HashMap<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap); //移除该checkpoint

                // remove older checkpoints in map
                for (int i = 0; i < posInMap; i++) {
                    pendingOffsetsToCommit.remove(0); //把比这个checkpoint更老的也都删掉
                }

                if (offsets == null || offsets.size() == 0) {
                    LOG.debug("Checkpoint state was empty.");
                    return;
                }
                fetcher.commitInternalOffsetsToKafka(offsets);//将offset信息发给kafka的group
            } 
        }
    }

Kafka010Fetcher

FlinkKafkaConsumer010也就重写createFetcher

不同的kafka版本就Fetcher是不一样的

public class Kafka010Fetcher<T> extends Kafka09Fetcher<T>

Kafka010Fetcher的不同,

    @Override
    protected void emitRecord(
            T record,
            KafkaTopicPartitionState<TopicPartition> partition,
            long offset,
            ConsumerRecord<?, ?> consumerRecord) throws Exception {

        // we attach the Kafka 0.10 timestamp here
        emitRecordWithTimestamp(record, partition, offset, consumerRecord.timestamp()); //0.10支持record中带timestap
    }

    /**
     * This method needs to be overridden because Kafka broke binary compatibility between 0.9 and 0.10,
     * changing binary signatures.
     */
    @Override
    protected KafkaConsumerCallBridge010 createCallBridge() {
        return new KafkaConsumerCallBridge010(); //CallBridge目的是在封装各个版本kafka consumer之间的差异
    }

KafkaConsumerCallBridge010封装0.10版本中做assignPartitions,seek上API和其他版本的差异性

public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge {

    @Override
    public void assignPartitions(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) throws Exception {
        consumer.assign(topicPartitions);
    }

    @Override
    public void seekPartitionToBeginning(KafkaConsumer<?, ?> consumer, TopicPartition partition) {
        consumer.seekToBeginning(Collections.singletonList(partition));
    }

    @Override
    public void seekPartitionToEnd(KafkaConsumer<?, ?> consumer, TopicPartition partition) {
        consumer.seekToEnd(Collections.singletonList(partition));
    }
}

Kafka09Fetcher

关键是runFetchLoop,启动KafkaConsumerThread

并从handover中取出records,然后封装发出去

    @Override
    public void runFetchLoop() throws Exception {
        try {
            final Handover handover = this.handover; //handover用于在Fetcher线程和consumer线程间传递数据

            // kick off the actual Kafka consumer
            consumerThread.start(); //new KafkaConsumerThread,真正的consumer线程

            while (running) {
                // this blocks until we get the next records
                // it automatically re-throws exceptions encountered in the fetcher thread
                final ConsumerRecords<byte[], byte[]> records = handover.pollNext(); //从handover中拿出数据

                // get the records for each topic partition
                for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {

                    List<ConsumerRecord<byte[], byte[]>> partitionRecords =
                            records.records(partition.getKafkaPartitionHandle());//ConsumerRecords中的结构是Map<TopicPartition, List<ConsumerRecord<K, V>>> records

                    for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
                        final T value = deserializer.deserialize(
                                record.key(), record.value(),
                                record.topic(), record.partition(), record.offset());

                        if (deserializer.isEndOfStream(value)) {
                            // end of stream signaled
                            running = false;
                            break;
                        }

                        // emit the actual record. this also updates offset state atomically
                        // and deals with timestamps and watermark generation
                        emitRecord(value, partition, record.offset(), record);
                    }
                }
            }
        }

这里有个重要的结构是,subscribedPartitionStates

AbstractFetcher
        // create our partition state according to the timestamp/watermark mode
        this.subscribedPartitionStates = initializeSubscribedPartitionStates(
                assignedPartitionsWithInitialOffsets,
                timestampWatermarkMode,
                watermarksPeriodic, watermarksPunctuated,
                userCodeClassLoader);

可以看到,把这些信息都合并放到subscribedPartitionStates,尤其是assignedPartitionsWithInitialOffsets

/**
     * Utility method that takes the topic partitions and creates the topic partition state
     * holders. If a watermark generator per partition exists, this will also initialize those.
     */
    private KafkaTopicPartitionState<KPH>[] initializeSubscribedPartitionStates(
            Map<KafkaTopicPartition, Long> assignedPartitionsToInitialOffsets,
            int timestampWatermarkMode,
            SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
            SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
            ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
        switch (timestampWatermarkMode) {

            case NO_TIMESTAMPS_WATERMARKS: {
                 //.......

            case PERIODIC_WATERMARKS: {
                @SuppressWarnings("unchecked")
                KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[] partitions = //KafkaTopicPartitionStateWithPeriodicWatermarks是KafkaTopicPartitionState的子类
                        (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[])
                                new KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitionsToInitialOffsets.size()]; //大小和assignedPartitionsToInitialOffsets一样

                int pos = 0;
                for (Map.Entry<KafkaTopicPartition, Long> partition : assignedPartitionsToInitialOffsets.entrySet()) {
                    KPH kafkaHandle = createKafkaPartitionHandle(partition.getKey()); //生成kafkaHandle,这个就是TopicPartition信息的抽象,为了屏蔽版本间结构的差异

                    AssignerWithPeriodicWatermarks<T> assignerInstance = //AssignerWithPeriodicWatermarks
                            watermarksPeriodic.deserializeValue(userCodeClassLoader);

                    partitions[pos] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>( //对于PUNCTUATED_WATERMARKS,这里是KafkaTopicPartitionStateWithPunctuatedWatermarks
                            partition.getKey(), kafkaHandle, assignerInstance); //对于NO_TIMESTAMPS_WATERMARKS,这里没有assignerInstance参数
                    partitions[pos].setOffset(partition.getValue()); //设置offset
 
                    pos++;
                }

                return partitions;
            }

            case PUNCTUATED_WATERMARKS: {
                //......
            }

subscribedPartitionStates,中包含该TopicPartition的offset和watermark的提取逻辑

KafkaConsumerThread

@Override
    public void run() {

        // this is the means to talk to FlinkKafkaConsumer's main thread
        final Handover handover = this.handover; //线程间数据交换结构

        // This method initializes the KafkaConsumer and guarantees it is torn down properly.
        // This is important, because the consumer has multi-threading issues,
        // including concurrent 'close()' calls.
        final KafkaConsumer<byte[], byte[]> consumer;
        try {
            consumer = new KafkaConsumer<>(kafkaProperties); //初始化kafka consumer
        }
        catch (Throwable t) {
            handover.reportError(t);
            return;
        }

        // from here on, the consumer is guaranteed to be closed properly
        try {
            // The callback invoked by Kafka once an offset commit is complete
            final OffsetCommitCallback offsetCommitCallback = new CommitCallback(); //这个callback,只是commitInProgress = false,表示commit结束

            // offsets in the state may still be placeholder sentinel values if we are starting fresh, or the
            // checkpoint / savepoint state we were restored with had not completely been replaced with actual offset
            // values yet; replace those with actual offsets, according to what the sentinel value represent.
            for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates) {
                if (partition.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) { //先把kafka group offset强制设为earliest或latest,然后用kafka上最新的offset更新当前的offset
                    consumerCallBridge.seekPartitionToBeginning(consumer, partition.getKafkaPartitionHandle());
                    partition.setOffset(consumer.position(partition.getKafkaPartitionHandle()) - 1);
                } else if (partition.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
                    consumerCallBridge.seekPartitionToEnd(consumer, partition.getKafkaPartitionHandle());
                    partition.setOffset(consumer.position(partition.getKafkaPartitionHandle()) - 1);
                } else if (partition.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
                    // the KafkaConsumer by default will automatically seek the consumer position
                    // to the committed group offset, so we do not need to do it.

                    partition.setOffset(consumer.position(partition.getKafkaPartitionHandle()) - 1); //如果GROUP_OFFSET,就直接读取kafka group上的offset
                } else {
                    consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1); //其他情况,就用partition自带的offset,比如从state中恢复出来的offset
                }
            }

            // from now on, external operations may call the consumer
            this.consumer = consumer;

            // the latest bulk of records. may carry across the loop if the thread is woken up
            // from blocking on the handover
            ConsumerRecords<byte[], byte[]> records = null;

            // main fetch loop
            while (running) {

                // check if there is something to commit
                if (!commitInProgress) { //同时只能有一个commit
                    // get and reset the work-to-be committed, so we don't repeatedly commit the same
                    final Map<TopicPartition, OffsetAndMetadata> toCommit = nextOffsetsToCommit.getAndSet(null); //checkpoint的时候会snapshot fetcher的offset,并通过fetcher.commitInternalOffsetsToKafka设置
                    if (toCommit != null) {

                        // also record that a commit is already in progress
                        // the order here matters! first set the flag, then send the commit command.
                        commitInProgress = true;
                        consumer.commitAsync(toCommit, offsetCommitCallback); //异步commit offset
                    }
                }

                // get the next batch of records, unless we did not manage to hand the old batch over
                if (records == null) {
                    try {
                        records = consumer.poll(pollTimeout); //从kafka读取数据
                    }
                    catch (WakeupException we) {
                        continue;
                    }
                }

                try {
                    handover.produce(records); //放入handover
                    records = null;
                }
                catch (Handover.WakeupException e) {
                    // fall through the loop
                }
            }
            // end main fetch loop
        }
    }

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Beginning Apache Struts

Beginning Apache Struts

Arnold Doray / Apress / 2006-02-20 / USD 44.99

Beginning Apache Struts will provide you a working knowledge of Apache Struts 1.2. This book is ideal for you Java programmers who have some JSP familiarity, but little or no prior experience with Ser......一起来看看 《Beginning Apache Struts》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具