聊聊storm的OpaquePartitionedTridentSpoutExecutor 原 荐

栏目: 编程工具 · 发布时间: 7年前

内容简介:本文主要研究一下storm的OpaquePartitionedTridentSpoutExecutorstorm-core-1.2.2-sources.jar!/org/apache/storm/trident/TridentTopology.javastorm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentTopologyBuilder.java

本文主要研究一下storm的OpaquePartitionedTridentSpoutExecutor

TridentTopology.newStream

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/TridentTopology.java

public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {
        return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));
    }
  • TridentTopology.newStream方法,对于IOpaquePartitionedTridentSpout类型的spout会使用OpaquePartitionedTridentSpoutExecutor来包装;而KafkaTridentSpoutOpaque则实现了IOpaquePartitionedTridentSpout接口

TridentTopologyBuilder.buildTopology

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentTopologyBuilder.java

public StormTopology buildTopology(Map<String, Number> masterCoordResources) {
        TopologyBuilder builder = new TopologyBuilder();
        Map<GlobalStreamId, String> batchIdsForSpouts = fleshOutStreamBatchIds(false);
        Map<GlobalStreamId, String> batchIdsForBolts = fleshOutStreamBatchIds(true);

        Map<String, List<String>> batchesToCommitIds = new HashMap<>();
        Map<String, List<ITridentSpout>> batchesToSpouts = new HashMap<>();
        
        for(String id: _spouts.keySet()) {
            TransactionalSpoutComponent c = _spouts.get(id);
            if(c.spout instanceof IRichSpout) {
                
                //TODO: wrap this to set the stream name
                builder.setSpout(id, (IRichSpout) c.spout, c.parallelism);
            } else {
                String batchGroup = c.batchGroupId;
                if(!batchesToCommitIds.containsKey(batchGroup)) {
                    batchesToCommitIds.put(batchGroup, new ArrayList<String>());
                }
                batchesToCommitIds.get(batchGroup).add(c.commitStateId);

                if(!batchesToSpouts.containsKey(batchGroup)) {
                    batchesToSpouts.put(batchGroup, new ArrayList<ITridentSpout>());
                }
                batchesToSpouts.get(batchGroup).add((ITridentSpout) c.spout);
                
                
                BoltDeclarer scd =
                      builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout))
                        .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID)
                        .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID);
                
                for(Map<String, Object> m: c.componentConfs) {
                    scd.addConfigurations(m);
                }
                
                Map<String, TridentBoltExecutor.CoordSpec> specs = new HashMap();
                specs.put(c.batchGroupId, new CoordSpec());
                BoltDeclarer bd = builder.setBolt(id,
                        new TridentBoltExecutor(
                          new TridentSpoutExecutor(
                            c.commitStateId,
                            c.streamName,
                            ((ITridentSpout) c.spout)),
                            batchIdsForSpouts,
                            specs),
                        c.parallelism);
                bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID);
                bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.SUCCESS_STREAM_ID);
                if(c.spout instanceof ICommitterTridentSpout) {
                    bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID);
                }
                for(Map<String, Object> m: c.componentConfs) {
                    bd.addConfigurations(m);
                }
            }
        }

        //......

        return builder.createTopology();
    }
  • TridentTopologyBuilder.buildTopology会将IOpaquePartitionedTridentSpout( OpaquePartitionedTridentSpoutExecutor )使用TridentSpoutExecutor包装,然后再使用TridentBoltExecutor包装为bolt

OpaquePartitionedTridentSpoutExecutor

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java

public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentSpout<Object> {
    protected final Logger LOG = LoggerFactory.getLogger(OpaquePartitionedTridentSpoutExecutor.class);

    IOpaquePartitionedTridentSpout<Object, ISpoutPartition, Object> _spout;
    
    //......

    public OpaquePartitionedTridentSpoutExecutor(IOpaquePartitionedTridentSpout<Object, ISpoutPartition, Object> spout) {
        _spout = spout;
    }
    
    @Override
    public ITridentSpout.BatchCoordinator<Object> getCoordinator(String txStateId, Map conf, TopologyContext context) {
        return new Coordinator(conf, context);
    }

    @Override
    public ICommitterTridentSpout.Emitter getEmitter(String txStateId, Map conf, TopologyContext context) {
        return new Emitter(txStateId, conf, context);
    }

    @Override
    public Fields getOutputFields() {
        return _spout.getOutputFields();
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return _spout.getComponentConfiguration();
    }
    
}
  • OpaquePartitionedTridentSpoutExecutor实现了ICommitterTridentSpout,这里getCoordinator返回的是ITridentSpout.BatchCoordinator,getEmitter返回的是ICommitterTridentSpout.Emitter

ITridentSpout.BatchCoordinator

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java

public class Coordinator implements ITridentSpout.BatchCoordinator<Object> {
        IOpaquePartitionedTridentSpout.Coordinator _coordinator;

        public Coordinator(Map conf, TopologyContext context) {
            _coordinator = _spout.getCoordinator(conf, context);
        }
        
        @Override
        public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) {
            LOG.debug("Initialize Transaction. [txid = {}], [prevMetadata = {}], [currMetadata = {}]", txid, prevMetadata, currMetadata);
            return _coordinator.getPartitionsForBatch();
        }


        @Override
        public void close() {
            LOG.debug("Closing");
            _coordinator.close();
            LOG.debug("Closed");
        }

        @Override
        public void success(long txid) {
            LOG.debug("Success [txid = {}]", txid);
        }

        @Override
        public boolean isReady(long txid) {
            boolean ready = _coordinator.isReady(txid);
            LOG.debug("[isReady = {}], [txid = {}]", ready, txid);
            return ready;
        }
    }
  • 包装了spout的_coordinator,它的类型IOpaquePartitionedTridentSpout.Coordinator,这里仅仅是多了debug日志

ICommitterTridentSpout.Emitter

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java

public class Emitter implements ICommitterTridentSpout.Emitter {        
        IOpaquePartitionedTridentSpout.Emitter<Object, ISpoutPartition, Object> _emitter;
        TransactionalState _state;
        TreeMap<Long, Map<String, Object>> _cachedMetas = new TreeMap<>();
        Map<String, EmitterPartitionState> _partitionStates = new HashMap<>();
        int _index;
        int _numTasks;

        public Emitter(String txStateId, Map conf, TopologyContext context) {
            _emitter = _spout.getEmitter(conf, context);
            _index = context.getThisTaskIndex();
            _numTasks = context.getComponentTasks(context.getThisComponentId()).size();
            _state = TransactionalState.newUserState(conf, txStateId);
            LOG.debug("Created {}", this);
        }

        Object _savedCoordinatorMeta = null;
        boolean _changedMeta = false;

        @Override
        public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) {
            LOG.debug("Emitting Batch. [transaction = {}], [coordinatorMeta = {}], [collector = {}], [{}]",
                    tx, coordinatorMeta, collector, this);

            if(_savedCoordinatorMeta==null || !_savedCoordinatorMeta.equals(coordinatorMeta)) {
                _partitionStates.clear();
                final List<ISpoutPartition> taskPartitions = _emitter.getPartitionsForTask(_index, _numTasks, coordinatorMeta);
                for (ISpoutPartition partition : taskPartitions) {
                    _partitionStates.put(partition.getId(), new EmitterPartitionState(new RotatingTransactionalState(_state, partition.getId()), partition));
                }

                // refresh all partitions for backwards compatibility with old spout
                _emitter.refreshPartitions(_emitter.getOrderedPartitions(coordinatorMeta));
                _savedCoordinatorMeta = coordinatorMeta;
                _changedMeta = true;
            }
            Map<String, Object> metas = new HashMap<>();
            _cachedMetas.put(tx.getTransactionId(), metas);

            Entry<Long, Map<String, Object>> entry = _cachedMetas.lowerEntry(tx.getTransactionId());
            Map<String, Object> prevCached;
            if(entry!=null) {
                prevCached = entry.getValue();
            } else {
                prevCached = new HashMap<>();
            }
            
            for(Entry<String, EmitterPartitionState> e: _partitionStates.entrySet()) {
                String id = e.getKey();
                EmitterPartitionState s = e.getValue();
                s.rotatingState.removeState(tx.getTransactionId());
                Object lastMeta = prevCached.get(id);
                if(lastMeta==null) lastMeta = s.rotatingState.getLastState();
                Object meta = _emitter.emitPartitionBatch(tx, collector, s.partition, lastMeta);
                metas.put(id, meta);
            }
            LOG.debug("Emitted Batch. [transaction = {}], [coordinatorMeta = {}], [collector = {}], [{}]",
                    tx, coordinatorMeta, collector, this);
        }

        @Override
        public void success(TransactionAttempt tx) {
            for(EmitterPartitionState state: _partitionStates.values()) {
                state.rotatingState.cleanupBefore(tx.getTransactionId());
            }
            LOG.debug("Success transaction {}. [{}]", tx, this);
        }

        @Override
        public void commit(TransactionAttempt attempt) {
            LOG.debug("Committing transaction {}. [{}]", attempt, this);
            // this code here handles a case where a previous commit failed, and the partitions
            // changed since the last commit. This clears out any state for the removed partitions
            // for this txid.
            // we make sure only a single task ever does this. we're also guaranteed that
            // it's impossible for there to be another writer to the directory for that partition
            // because only a single commit can be happening at once. this is because in order for 
            // another attempt of the batch to commit, the batch phase must have succeeded in between.
            // hence, all tasks for the prior commit must have finished committing (whether successfully or not)
            if(_changedMeta && _index==0) {
                Set<String> validIds = new HashSet<>();
                for(ISpoutPartition p: _emitter.getOrderedPartitions(_savedCoordinatorMeta)) {
                    validIds.add(p.getId());
                }
                for(String existingPartition: _state.list("")) {
                    if(!validIds.contains(existingPartition)) {
                        RotatingTransactionalState s = new RotatingTransactionalState(_state, existingPartition);
                        s.removeState(attempt.getTransactionId());
                    }
                }
                _changedMeta = false;
            }
            
            Long txid = attempt.getTransactionId();
            Map<String, Object> metas = _cachedMetas.remove(txid);
            for(Entry<String, Object> entry: metas.entrySet()) {
                _partitionStates.get(entry.getKey()).rotatingState.overrideState(txid, entry.getValue());
            }
            LOG.debug("Exiting commit method for transaction {}. [{}]", attempt, this);
        }

        @Override
        public void close() {
            LOG.debug("Closing");
            _emitter.close();
            LOG.debug("Closed");
        }

        @Override
        public String toString() {
            return "Emitter{" +
                    ", _state=" + _state +
                    ", _cachedMetas=" + _cachedMetas +
                    ", _partitionStates=" + _partitionStates +
                    ", _index=" + _index +
                    ", _numTasks=" + _numTasks +
                    ", _savedCoordinatorMeta=" + _savedCoordinatorMeta +
                    ", _changedMeta=" + _changedMeta +
                    '}';
        }
    }

    static class EmitterPartitionState {
        public RotatingTransactionalState rotatingState;
        public ISpoutPartition partition;
        
        public EmitterPartitionState(RotatingTransactionalState s, ISpoutPartition p) {
            rotatingState = s;
            partition = p;
        }
    }
  • 这里对spout的IOpaquePartitionedTridentSpout.Emitter进行了封装,_partitionStates使用了EmitterPartitionState
  • emitBatch方法首先计算_partitionStates,然后计算prevCached,最后调用_emitter.emitPartitionBatch(tx, collector, s.partition, lastMeta)
  • success方法调用state.rotatingState.cleanupBefore(tx.getTransactionId()),清空该txid之前的状态信息;commit方法主要是更新_partitionStates

KafkaTridentSpoutOpaque

storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java

public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSpout<List<Map<String, Object>>,
        KafkaTridentSpoutTopicPartition, Map<String, Object>> {
    private static final long serialVersionUID = -8003272486566259640L;

    private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaque.class);

    private final KafkaTridentSpoutManager<K, V> kafkaManager;

    public KafkaTridentSpoutOpaque(KafkaSpoutConfig<K, V> conf) {
        this(new KafkaTridentSpoutManager<>(conf));
    }
    
    public KafkaTridentSpoutOpaque(KafkaTridentSpoutManager<K, V> kafkaManager) {
        this.kafkaManager = kafkaManager;
        LOG.debug("Created {}", this.toString());
    }

    @Override
    public Emitter<List<Map<String, Object>>, KafkaTridentSpoutTopicPartition, Map<String, Object>> getEmitter(
            Map conf, TopologyContext context) {
        return new KafkaTridentSpoutEmitter<>(kafkaManager, context);
    }

    @Override
    public Coordinator<List<Map<String, Object>>> getCoordinator(Map conf, TopologyContext context) {
        return new KafkaTridentSpoutOpaqueCoordinator<>(kafkaManager);
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    @Override
    public Fields getOutputFields() {
        final Fields outputFields = kafkaManager.getFields();
        LOG.debug("OutputFields = {}", outputFields);
        return outputFields;
    }

    @Override
    public final String toString() {
        return super.toString() +
                "{kafkaManager=" + kafkaManager + '}';
    }
}
  • KafkaTridentSpoutOpaque的getCoordinator返回的是KafkaTridentSpoutOpaqueCoordinator;getEmitter返回的是KafkaTridentSpoutEmitter

KafkaTridentSpoutOpaqueCoordinator

storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java

public class KafkaTridentSpoutOpaqueCoordinator<K,V> implements IOpaquePartitionedTridentSpout.Coordinator<List<Map<String, Object>>>,
        Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaqueCoordinator.class);

    private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
    private final KafkaTridentSpoutManager<K,V> kafkaManager;

    public KafkaTridentSpoutOpaqueCoordinator(KafkaTridentSpoutManager<K, V> kafkaManager) {
        this.kafkaManager = kafkaManager;
        LOG.debug("Created {}", this.toString());
    }

    @Override
    public boolean isReady(long txid) {
        LOG.debug("isReady = true");
        return true;    // the "old" trident kafka spout always returns true, like this
    }

    @Override
    public List<Map<String, Object>> getPartitionsForBatch() {
        final ArrayList<TopicPartition> topicPartitions = new ArrayList<>(kafkaManager.getTopicPartitions());
        LOG.debug("TopicPartitions for batch {}", topicPartitions);
        List<Map<String, Object>> tps = new ArrayList<>();
        for(TopicPartition tp : topicPartitions) {
            tps.add(tpSerializer.toMap(tp));
        }
        return tps;
    }

    @Override
    public void close() {
        LOG.debug("Closed"); // the "old" trident kafka spout is no op like this
    }

    @Override
    public final String toString() {
        return super.toString() +
                "{kafkaManager=" + kafkaManager +
                '}';
    }
}
  • 这里的isReady始终返回true,getPartitionsForBatch方法主要是将kafkaManager.getTopicPartitions()信息转换为map结构

KafkaTridentSpoutEmitter

storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java

public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTridentSpout.Emitter<
        List<Map<String, Object>>,
        KafkaTridentSpoutTopicPartition,
        Map<String, Object>>,
        Serializable {

    private static final long serialVersionUID = -7343927794834130435L;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);

    // Kafka
    private final KafkaConsumer<K, V> kafkaConsumer;

    // Bookkeeping
    private final KafkaTridentSpoutManager<K, V> kafkaManager;
    // set of topic-partitions for which first poll has already occurred, and the first polled txid
    private final Map<TopicPartition, Long> firstPollTransaction = new HashMap<>(); 

    // Declare some KafkaTridentSpoutManager references for convenience
    private final long pollTimeoutMs;
    private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy;
    private final RecordTranslator<K, V> translator;
    private final Timer refreshSubscriptionTimer;
    private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();

    private TopologyContext topologyContext;

    /**
     * Create a new Kafka spout emitter.
     * @param kafkaManager The Kafka consumer manager to use
     * @param topologyContext The topology context
     * @param refreshSubscriptionTimer The timer for deciding when to recheck the subscription
     */
    public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext, Timer refreshSubscriptionTimer) {
        this.kafkaConsumer = kafkaManager.createAndSubscribeKafkaConsumer(topologyContext);
        this.kafkaManager = kafkaManager;
        this.topologyContext = topologyContext;
        this.refreshSubscriptionTimer = refreshSubscriptionTimer;
        this.translator = kafkaManager.getKafkaSpoutConfig().getTranslator();

        final KafkaSpoutConfig<K, V> kafkaSpoutConfig = kafkaManager.getKafkaSpoutConfig();
        this.pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs();
        this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
        LOG.debug("Created {}", this.toString());
    }

    /**
     * Creates instance of this class with default 500 millisecond refresh subscription timer
     */
    public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext) {
        this(kafkaManager, topologyContext, new Timer(500,
                kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS));
    }

    //......

    @Override
    public Map<String, Object> emitPartitionBatch(TransactionAttempt tx, TridentCollector collector,
            KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, Object> lastBatch) {

        LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]",
                tx, currBatchPartition, lastBatch, collector);

        final TopicPartition currBatchTp = currBatchPartition.getTopicPartition();
        final Set<TopicPartition> assignments = kafkaConsumer.assignment();
        KafkaTridentSpoutBatchMetadata lastBatchMeta = lastBatch == null ? null : KafkaTridentSpoutBatchMetadata.fromMap(lastBatch);
        KafkaTridentSpoutBatchMetadata currentBatch = lastBatchMeta;
        Collection<TopicPartition> pausedTopicPartitions = Collections.emptySet();

        if (assignments == null || !assignments.contains(currBatchPartition.getTopicPartition())) {
            LOG.warn("SKIPPING processing batch [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " +
                            "[collector = {}] because it is not part of the assignments {} of consumer instance [{}] " +
                            "of consumer group [{}]", tx, currBatchPartition, lastBatch, collector, assignments,
                    kafkaConsumer, kafkaManager.getKafkaSpoutConfig().getConsumerGroupId());
        } else {
            try {
                // pause other topic-partitions to only poll from current topic-partition
                pausedTopicPartitions = pauseTopicPartitions(currBatchTp);

                seek(currBatchTp, lastBatchMeta, tx.getTransactionId());

                // poll
                if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
                    kafkaManager.getKafkaSpoutConfig().getSubscription().refreshAssignment();
                }

                final ConsumerRecords<K, V> records = kafkaConsumer.poll(pollTimeoutMs);
                LOG.debug("Polled [{}] records from Kafka.", records.count());

                if (!records.isEmpty()) {
                    emitTuples(collector, records);
                    // build new metadata
                    currentBatch = new KafkaTridentSpoutBatchMetadata(currBatchTp, records);
                }
            } finally {
                kafkaConsumer.resume(pausedTopicPartitions);
                LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions);
            }
            LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " +
                    "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector);
        }

        return currentBatch == null ? null : currentBatch.toMap();
    }

    private void emitTuples(TridentCollector collector, ConsumerRecords<K, V> records) {
        for (ConsumerRecord<K, V> record : records) {
            final List<Object> tuple = translator.apply(record);
            collector.emit(tuple);
            LOG.debug("Emitted tuple {} for record [{}]", tuple, record);
        }
    }

    @Override
    public void refreshPartitions(List<KafkaTridentSpoutTopicPartition> partitionResponsibilities) {
        LOG.trace("Refreshing of topic-partitions handled by Kafka. " +
                "No action taken by this method for topic partitions {}", partitionResponsibilities);
    }

    /**
     * Computes ordered list of topic-partitions for this task taking into consideration that topic-partitions
     * for this task must be assigned to the Kafka consumer running on this task.
     *
     * @param allPartitionInfo list of all partitions as returned by {@link KafkaTridentSpoutOpaqueCoordinator}
     * @return ordered list of topic partitions for this task
     */
    @Override
    public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(final List<Map<String, Object>> allPartitionInfo) {
        List<TopicPartition> allTopicPartitions = new ArrayList<>();
        for(Map<String, Object> map : allPartitionInfo) {
            allTopicPartitions.add(tpSerializer.fromMap(map));
        }
        final List<KafkaTridentSpoutTopicPartition> allPartitions = newKafkaTridentSpoutTopicPartitions(allTopicPartitions);
        LOG.debug("Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] ",
                allPartitions, topologyContext.getThisTaskIndex(), getNumTasks());
        return allPartitions;
    }

    @Override
    public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int taskId, int numTasks,
        List<Map<String, Object>> allPartitionInfo) {
        final Set<TopicPartition> assignedTps = kafkaConsumer.assignment();
        LOG.debug("Consumer [{}], running on task with index [{}], has assigned topic-partitions {}", kafkaConsumer, taskId, assignedTps);
        final List<KafkaTridentSpoutTopicPartition> taskTps = newKafkaTridentSpoutTopicPartitions(assignedTps);
        LOG.debug("Returning topic-partitions {} for task with index [{}]", taskTps, taskId);
        return taskTps;
    }

    @Override
    public void close() {
        kafkaConsumer.close();
        LOG.debug("Closed");
    }

    @Override
    public final String toString() {
        return super.toString() +
                "{kafkaManager=" + kafkaManager +
                '}';
    }
}
  • 这里的refreshSubscriptionTimer的interval取的是kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(),默认是2000
  • emitPartitionBatch方法没调用一次都会判断refreshSubscriptionTimer.isExpiredResetOnTrue(),如果时间到了,就会调用kafkaManager.getKafkaSpoutConfig().getSubscription().refreshAssignment()刷新assignment
  • emitPartitionBatch方法主要是找到与该batch关联的partition,停止从其他parition拉取消息,然后根据firstPollOffsetStrategy以及lastBatchMeta信息,调用kafkaConsumer的seek相关方法seek到指定位置
  • 之后就是用kafkaConsumer.poll(pollTimeoutMs)拉取数据,然后emitTuples;emitTuples方法会是用translator转换数据,然后调用collector.emit发射出去
  • refreshPartitions方法目前仅仅是trace下日志;getOrderedPartitions方法先将allPartitionInfo的数据从map结构反序列化回来,然后转换为KafkaTridentSpoutTopicPartition返回;getPartitionsForTask方法主要是通过kafkaConsumer.assignment()的信息转换为KafkaTridentSpoutTopicPartition返回

小结

旧版的为OpaqueTridentKafkaSpout,在storm-kafka类库中
OpaquePartitionedTridentSpoutExecutor

doc

聊聊storm的OpaquePartitionedTridentSpoutExecutor 原 荐

以上所述就是小编给大家介绍的《聊聊storm的OpaquePartitionedTridentSpoutExecutor 原 荐》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Transcending CSS

Transcending CSS

Andy Clarke、Molly E. Holzschlag / New Riders / November 15, 2006 / $49.99

As the Web evolves to incorporate new standards and the latest browsers offer new possibilities for creative design, the art of creating Web sites is also changing. Few Web designers are experienced p......一起来看看 《Transcending CSS》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

在线进制转换器
在线进制转换器

各进制数互转换器

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具