Flink - FlinkKafkaProducer010

栏目: 后端 · 发布时间: 7年前

内容简介:Flink - FlinkKafkaProducer010

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html

使用的方式,

   DataStream<String> stream = ...;

FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
        stream,                     // input stream
        "my-topic",                 // target topic
        new SimpleStringSchema(),   // serialization schema
        properties);                // custom configuration for KafkaProducer (including broker list)

// the following is necessary for at-least-once delivery guarantee
myProducerConfig.setLogFailuresOnly(false);   // "false" by default
myProducerConfig.setFlushOnCheckpoint(true);  // "false" by default

Besides enabling Flink’s checkpointing, you should also configure the setter methods setLogFailuresOnly(boolean) and setFlushOnCheckpoint(boolean) appropriately, as shown in the above examples in the previous section:

  • setLogFailuresOnly(boolean) : enabling this will let the producer log failures only instead of catching and rethrowing them. This essentially accounts the record to have succeeded, even if it was never written to the target Kafka topic. This must be disabled for at-least-once.
  • setFlushOnCheckpoint(boolean) : with this enabled, Flink’s checkpoints will wait for any on-the-fly records at the time of the checkpoint to be acknowledged by Kafka before succeeding the checkpoint. This ensures that all records before the checkpoint have been written to Kafka. This must be enabled for at-least-once.

Note: By default, the number of retries is set to “0”. This means that when setLogFailuresOnly is set to false , the producer fails immediately on errors, including leader changes. The value is set to “0” by default to avoid duplicate messages in the target topic that are caused by retries. For most production environments with frequent broker changes, we recommend setting the number of retries to a higher value.

setLogFailuresOnly ,如果true,发送kafka失败时,只是log,不会中断执行,这样可能丢数据

如果false,发送kafka失败时,抛异常,这样job会restart,不会丢数据,但是会中断执行;这里最好把produer的retires设成3,这样避免kafka临时不可用导致job中断,比如leader切换

setFlushOnCheckpoint ,如果true,在做checkpoint的时候,会等待所有pending的record被发送成功,这样保证数据不丢

首先FlinkKafkaProducer010是一种sink,

一般的使用方式是,steam.addSink(RichSinkFunction)

      public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
        this.transformation.getOutputType();
        if(sinkFunction instanceof InputTypeConfigurable) {
            ((InputTypeConfigurable)sinkFunction).setInputType(this.getType(), this.getExecutionConfig());
        }

        StreamSink sinkOperator = new StreamSink((SinkFunction)this.clean(sinkFunction));
        DataStreamSink sink = new DataStreamSink(this, sinkOperator);
        this.getExecutionEnvironment().addOperator(sink.getTransformation());
        return sink;
    }

这里用FlinkKafkaProducer010.writeToKafkaWithTimestamps封装这部分,比较tricky

     /**
     * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
     * the topic.
     *
     * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
     *
     *  @param inStream The stream to write to Kafka
     *  @param topicId The name of the target topic
     *  @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
     *  @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
     *  @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
     */
    public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
                                                                                    String topicId,
                                                                                    KeyedSerializationSchema<T> serializationSchema,
                                                                                    Properties producerConfig,
                                                                                    KafkaPartitioner<T> customPartitioner) {

        GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
        FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner);
        SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
        return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
    }

可以看到这里实现了addSink的逻辑,返回FlinkKafkaProducer010Configuration,其实就是DataStreamSink

      public static class FlinkKafkaProducer010Configuration<T> extends DataStreamSink<T> {

        private final FlinkKafkaProducerBase wrappedProducerBase;
        private final FlinkKafkaProducer010 producer;

        private FlinkKafkaProducer010Configuration(DataStream stream, FlinkKafkaProducer010<T> producer) {
            //noinspection unchecked
            super(stream, producer);
            this.producer = producer;
            this.wrappedProducerBase = (FlinkKafkaProducerBase) producer.userFunction;
        }

先抛开这奇葩的封装,对于sink而言,关键还是SinkFunction的实现,

  public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunction<T>, RichFunction {

    public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
        // We create a Kafka 09 producer instance here and only "override" (by intercepting) the
        // invoke call.
        super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner));
    }

SinkFunction是在FlinkKafkaProducer010上实现的

By the way, FlinkKafkaProducer010 extends StreamSink,也就是说他还是operator,原因就在writeToKafkaWithTimestamps中生成DataStreamSink时需要operator对象,他直接把FlinkKafkaProducer010传进去,所以FlinkKafkaProducer010 必须是StreamSink;这代码写的真是不行

FlinkKafkaProducer010其实就是用的FlinkKafkaProducer09

对于SinkFunction,关键的接口是invoke

      public void invoke(T value) throws Exception {
        invokeInternal(value, Long.MAX_VALUE);
    }

invokeInternal

      private void invokeInternal(T next, long elementTimestamp) throws Exception {

        final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;

        internalProducer.checkErroneous();

        byte[] serializedKey = internalProducer.schema.serializeKey(next);
        byte[] serializedValue = internalProducer.schema.serializeValue(next);
        String targetTopic = internalProducer.schema.getTargetTopic(next);
        if (targetTopic == null) {
            targetTopic = internalProducer.defaultTopicId;
        }

        Long timestamp = null;
        if(this.writeTimestampToKafka) {
            timestamp = elementTimestamp;
        }

        ProducerRecord<byte[], byte[]> record;
        if (internalProducer.partitioner == null) {
            record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
        } else {
            record = new ProducerRecord<>(targetTopic, internalProducer.partitioner.partition(next, serializedKey, serializedValue, internalProducer.partitions.length), timestamp, serializedKey, serializedValue);
        }
        if (internalProducer.flushOnCheckpoint) {
            synchronized (internalProducer.pendingRecordsLock) {
                internalProducer.pendingRecords++;  // 如果flushOnCheckpoint打开,需要记录正在发送的record数目
            }
        }
        internalProducer.producer.send(record, internalProducer.callback);
    }

代码很容易理解,正常的producer发送流程,

除了,

internalProducer.checkErroneous();

internalProducer.callback

internalProducer.callback是用来处理kafka返回的ack的

FlinkKafkaProducerBase
      @Override
    public void open(Configuration configuration) {
        if (logFailuresOnly) {
            callback = new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (e != null) {
                        LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
                    }
                    acknowledgeMessage();
                }
            };
        }
        else {
            callback = new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null && asyncException == null) {
                        asyncException = exception;
                    }
                    acknowledgeMessage();
                }
            };
        }
    }

可以看到logFailuresOnly是true的时候,对于Exception只是,log

如果是false,就会记录下这个Exception到asyncException

acknowledgeMessage,无论是否有错都需要ack

      private void acknowledgeMessage() {
        if (flushOnCheckpoint) {
            synchronized (pendingRecordsLock) {
                pendingRecords--;
                if (pendingRecords == 0) {
                    pendingRecordsLock.notifyAll();
                }
            }
        }
    }

逻辑就是计数--,如果pendingRecords == 0,即没有正在发送的record,通知所有在等锁的

checkErroneous()

      protected void checkErroneous() throws Exception {
        Exception e = asyncException;
        if (e != null) {
            // prevent double throwing
            asyncException = null;
            throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
        }
    }

就是把asyncException里面的异常抛出去


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

鳥哥的Linux私房菜(第四版)

鳥哥的Linux私房菜(第四版)

鳥哥 / 碁峰資訊股份有限公司 / 2016-1-25 / TWD 980.00

本書前三版均蟬聯電腦專業書籍Linux暢銷排行榜Top1,為地表最暢銷的Linux中文書籍! 您是有意學習Linux的小菜鳥,卻不知如何下手?您是遨遊Linux的老鳥,想要一本資料豐富的工具書?本書絕對是最佳選擇! ※鳥哥傾囊相授,內容由淺入深 書中包含了鳥哥從完全不懂Linux到現在的所有歷程,鳥哥將這幾年來的所知所學傾囊相授,以最淺顯易懂的文字帶領您進入Linux的世界。 ......一起来看看 《鳥哥的Linux私房菜(第四版)》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

MD5 加密
MD5 加密

MD5 加密工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具