聊聊storm的ICommitterTridentSpout

栏目: 编程工具 · 发布时间: 7年前

内容简介:storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/ICommitterTridentSpout.javastorm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentTopologyBuilder.javastorm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/MasterBatchC

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/ICommitterTridentSpout.java

public interface ICommitterTridentSpout<X> extends ITridentSpout<X> {
    public interface Emitter extends ITridentSpout.Emitter {
        void commit(TransactionAttempt attempt);
    } 
    
    @Override
    public Emitter getEmitter(String txStateId, Map conf, TopologyContext context);    
}
复制代码
  • ICommitterTridentSpout继承了ITridentSpout,主要是对getEmitter方法进行覆盖,返回扩展的Emitter,它继承ITridentSpout.Emitter ,多定义了一个commit接口

TridentTopologyBuilder.buildTopology

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentTopologyBuilder.java

public StormTopology buildTopology(Map<String, Number> masterCoordResources) {
        TopologyBuilder builder = new TopologyBuilder();
        Map<GlobalStreamId, String> batchIdsForSpouts = fleshOutStreamBatchIds(false);
        Map<GlobalStreamId, String> batchIdsForBolts = fleshOutStreamBatchIds(true);

        Map<String, List<String>> batchesToCommitIds = new HashMap<>();
        Map<String, List<ITridentSpout>> batchesToSpouts = new HashMap<>();
        
        for(String id: _spouts.keySet()) {
            TransactionalSpoutComponent c = _spouts.get(id);
            if(c.spout instanceof IRichSpout) {
                
                //TODO: wrap this to set the stream name
                builder.setSpout(id, (IRichSpout) c.spout, c.parallelism);
            } else {
                String batchGroup = c.batchGroupId;
                if(!batchesToCommitIds.containsKey(batchGroup)) {
                    batchesToCommitIds.put(batchGroup, new ArrayList<String>());
                }
                batchesToCommitIds.get(batchGroup).add(c.commitStateId);

                if(!batchesToSpouts.containsKey(batchGroup)) {
                    batchesToSpouts.put(batchGroup, new ArrayList<ITridentSpout>());
                }
                batchesToSpouts.get(batchGroup).add((ITridentSpout) c.spout);
                
                
                BoltDeclarer scd =
                      builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout))
                        .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID)
                        .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID);
                
                for(Map<String, Object> m: c.componentConfs) {
                    scd.addConfigurations(m);
                }
                
                Map<String, TridentBoltExecutor.CoordSpec> specs = new HashMap();
                specs.put(c.batchGroupId, new CoordSpec());
                BoltDeclarer bd = builder.setBolt(id,
                        new TridentBoltExecutor(
                          new TridentSpoutExecutor(
                            c.commitStateId,
                            c.streamName,
                            ((ITridentSpout) c.spout)),
                            batchIdsForSpouts,
                            specs),
                        c.parallelism);
                bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID);
                bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.SUCCESS_STREAM_ID);
                if(c.spout instanceof ICommitterTridentSpout) {
                    bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID);
                }
                for(Map<String, Object> m: c.componentConfs) {
                    bd.addConfigurations(m);
                }
            }
        }
        
        //......

        return builder.createTopology();
    }
复制代码
  • TridentTopologyBuilder.buildTopology的时候,对用户的spout判断,如果是ICommitterTridentSpout类型的,则会配置allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID)

MasterBatchCoordinator

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/MasterBatchCoordinator.java

@Override
    public void nextTuple() {
        sync();
    }

    private void sync() {
        // note that sometimes the tuples active may be less than max_spout_pending, e.g.
        // max_spout_pending = 3
        // tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),
        // and there won't be a batch for tx 4 because there's max_spout_pending tx active
        TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
        if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {
            maybeCommit.status = AttemptStatus.COMMITTING;
            _collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
            LOG.debug("Emitted on [stream = {}], [tx_status = {}], [{}]", COMMIT_STREAM_ID, maybeCommit, this);
        }
        
        if(_active) {
            if(_activeTx.size() < _maxTransactionActive) {
                Long curr = _currTransaction;
                for(int i=0; i<_maxTransactionActive; i++) {
                    if(!_activeTx.containsKey(curr) && isReady(curr)) {
                        // by using a monotonically increasing attempt id, downstream tasks
                        // can be memory efficient by clearing out state for old attempts
                        // as soon as they see a higher attempt id for a transaction
                        Integer attemptId = _attemptIds.get(curr);
                        if(attemptId==null) {
                            attemptId = 0;
                        } else {
                            attemptId++;
                        }
                        _attemptIds.put(curr, attemptId);
                        for(TransactionalState state: _states) {
                            state.setData(CURRENT_ATTEMPTS, _attemptIds);
                        }
                        
                        TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);
                        final TransactionStatus newTransactionStatus = new TransactionStatus(attempt);
                        _activeTx.put(curr, newTransactionStatus);
                        _collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);
                        LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", BATCH_STREAM_ID, attempt, newTransactionStatus, this);
                        _throttler.markEvent();
                    }
                    curr = nextTransactionId(curr);
                }
            }
        }
    }

    @Override
    public void ack(Object msgId) {
        TransactionAttempt tx = (TransactionAttempt) msgId;
        TransactionStatus status = _activeTx.get(tx.getTransactionId());
        LOG.debug("Ack. [tx_attempt = {}], [tx_status = {}], [{}]", tx, status, this);
        if(status!=null && tx.equals(status.attempt)) {
            if(status.status==AttemptStatus.PROCESSING) {
                status.status = AttemptStatus.PROCESSED;
                LOG.debug("Changed status. [tx_attempt = {}] [tx_status = {}]", tx, status);
            } else if(status.status==AttemptStatus.COMMITTING) {
                _activeTx.remove(tx.getTransactionId());
                _attemptIds.remove(tx.getTransactionId());
                _collector.emit(SUCCESS_STREAM_ID, new Values(tx));
                _currTransaction = nextTransactionId(tx.getTransactionId());
                for(TransactionalState state: _states) {
                    state.setData(CURRENT_TX, _currTransaction);                    
                }
                LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", SUCCESS_STREAM_ID, tx, status, this);
            }
            sync();
        }
    }
复制代码
  • MasterBatchCoordinator在收到ack的时候,如果status是AttemptStatus.PROCESSING状态,则更改status为AttemptStatus.PROCESSED;如果status是AttemptStatus.COMMITTING,则往SUCCESS_STREAM_ID发射tuple;之后调用sync方法
  • nextTuple方法也是调用sync方法,判断如果是AttemptStatus.PROCESSED状态,则更改status为AttemptStatus.COMMITTING,同时往COMMIT_STREAM_ID发射tuple
  • 可以看到这里状态由AttemptStatus.PROCESSING变为AttemptStatus.PROCESSED( nextTuple方法将AttemptStatus.PROCESSED变为AttemptStatus.COMMITTING,然后往COMMIT_STREAM_ID发射tuple ),再变为AttemptStatus.COMMITTING( ack的时候,如果是AttemptStatus.COMMITTING状态,则往SUCCESS_STREAM_ID发射tuple )

TridentSpoutExecutor

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutExecutor.java

public void execute(BatchInfo info, Tuple input) {
        // there won't be a BatchInfo for the success stream
        TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
        if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {
            if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) {
                ((ICommitterTridentSpout.Emitter) _emitter).commit(attempt);
                _activeBatches.remove(attempt.getTransactionId());
            } else {
                 throw new FailedException("Received commit for different transaction attempt");
            }
        } else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
            // valid to delete before what's been committed since 
            // those batches will never be accessed again
            _activeBatches.headMap(attempt.getTransactionId()).clear();
            _emitter.success(attempt);
        } else {            
            _collector.setBatch(info.batchId);
            _emitter.emitBatch(attempt, input.getValue(1), _collector);
            _activeBatches.put(attempt.getTransactionId(), attempt);
        }
    }
复制代码
  • TridentSpoutExecutor在execute的时候,判断如果是MasterBatchCoordinator.COMMIT_STREAM_ID的数据,而且TransactionAttempt的txid相等,则调用((ICommitterTridentSpout.Emitter) _emitter).commit(attempt)

TridentBoltExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java

public void execute(Tuple tuple) {
        if (TupleUtils.isTick(tuple)) {
            long now = System.currentTimeMillis();
            if (now - _lastRotate > _messageTimeoutMs) {
                _batches.rotate();
                _lastRotate = now;
            }
            return;
        }
        String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());
        if (batchGroup == null) {
            // this is so we can do things like have simple DRPC that doesn't need to use batch processing
            _coordCollector.setCurrBatch(null);
            _bolt.execute(null, tuple);
            _collector.ack(tuple);
            return;
        }
        IBatchID id = (IBatchID) tuple.getValue(0);
        //get transaction id
        //if it already exists and attempt id is greater than the attempt there


        TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());
        //        if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {
        //            System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()
        //                    + " (" + _batches.size() + ")" +
        //                    "\ntuple: " + tuple +
        //                    "\nwith tracked " + tracked +
        //                    "\nwith id " + id +
        //                    "\nwith group " + batchGroup
        //                    + "\n");
        //
        //        }
        //System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex());

        // this code here ensures that only one attempt is ever tracked for a batch, so when
        // failures happen you don't get an explosion in memory usage in the tasks
        if (tracked != null) {
            if (id.getAttemptId() > tracked.attemptId) {
                _batches.remove(id.getId());
                tracked = null;
            } else if (id.getAttemptId() < tracked.attemptId) {
                // no reason to try to execute a previous attempt than we've already seen
                return;
            }
        }

        if (tracked == null) {
            tracked =
                new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup),
                                 id.getAttemptId());
            _batches.put(id.getId(), tracked);
        }
        _coordCollector.setCurrBatch(tracked);

        //System.out.println("TRACKED: " + tracked + " " + tuple);

        TupleType t = getTupleType(tuple, tracked);
        if (t == TupleType.COMMIT) {
            tracked.receivedCommit = true;
            checkFinish(tracked, tuple, t);
        } else if (t == TupleType.COORD) {
            int count = tuple.getInteger(1);
            tracked.reportedTasks++;
            tracked.expectedTupleCount += count;
            checkFinish(tracked, tuple, t);
        } else {
            tracked.receivedTuples++;
            boolean success = true;
            try {
                _bolt.execute(tracked.info, tuple);
                if (tracked.condition.expectedTaskReports == 0) {
                    success = finishBatch(tracked, tuple);
                }
            } catch (FailedException e) {
                failBatch(tracked, e);
            }
            if (success) {
                _collector.ack(tuple);
            } else {
                _collector.fail(tuple);
            }
        }
        _coordCollector.setCurrBatch(null);
    }
复制代码
  • 这里再调用_bolt.execute(tracked.info, tuple)之后,会调用_collector.ack(tuple)完成ack

SpoutOutputCollector

storm-core-1.2.2-sources.jar!/org/apache/storm/spout/SpoutOutputCollector.java

/**
     * Emits a new tuple to the specified output stream with the given message ID.
     * When Storm detects that this tuple has been fully processed, or has failed
     * to be fully processed, the spout will receive an ack or fail callback respectively
     * with the messageId as long as the messageId was not null. If the messageId was null,
     * Storm will not track the tuple and no callback will be received. 
     * Note that Storm's event logging functionality will only work if the messageId
     * is serializable via Kryo or the Serializable interface. The emitted values must be immutable.
     *
     * @return the list of task ids that this tuple was sent to
     */
    public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
        return _delegate.emit(streamId, tuple, messageId);
    }
复制代码
  • 这里调用了_delegate.emit的emit,这里的_delegate为SpoutOutputCollectorImpl

SpoutOutputCollectorImpl

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java

public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
        try {
            return sendSpoutMsg(streamId, tuple, messageId, null);
        } catch (InterruptedException e) {
            LOG.warn("Spout thread interrupted during emit().");
            throw new RuntimeException(e);
        }
    }

    private List<Integer> sendSpoutMsg(String stream, List<Object> values, Object messageId, Integer outTaskId) throws
        InterruptedException {
        emittedCount.increment();

        List<Integer> outTasks;
        if (outTaskId != null) {
            outTasks = taskData.getOutgoingTasks(outTaskId, stream, values);
        } else {
            outTasks = taskData.getOutgoingTasks(stream, values);
        }

        final boolean needAck = (messageId != null) && hasAckers;

        final List<Long> ackSeq = needAck ? new ArrayList<>() : null;

        final long rootId = needAck ? MessageId.generateId(random) : 0;

        for (int i = 0; i < outTasks.size(); i++) { // perf critical path. don't use iterators.
            Integer t = outTasks.get(i);
            MessageId msgId;
            if (needAck) {
                long as = MessageId.generateId(random);
                msgId = MessageId.makeRootId(rootId, as);
                ackSeq.add(as);
            } else {
                msgId = MessageId.makeUnanchored();
            }

            final TupleImpl tuple =
                new TupleImpl(executor.getWorkerTopologyContext(), values, executor.getComponentId(), this.taskId, stream, msgId);
            AddressedTuple adrTuple = new AddressedTuple(t, tuple);
            executor.getExecutorTransfer().tryTransfer(adrTuple, executor.getPendingEmits());
        }
        if (isEventLoggers) {
            taskData.sendToEventLogger(executor, values, executor.getComponentId(), messageId, random, executor.getPendingEmits());
        }

        if (needAck) {
            boolean sample = executor.samplerCheck();
            TupleInfo info = new TupleInfo();
            info.setTaskId(this.taskId);
            info.setStream(stream);
            info.setMessageId(messageId);
            if (isDebug) {
                info.setValues(values);
            }
            if (sample) {
                info.setTimestamp(System.currentTimeMillis());
            }

            pending.put(rootId, info);
            List<Object> ackInitTuple = new Values(rootId, Utils.bitXorVals(ackSeq), this.taskId);
            taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer(), executor.getPendingEmits());
        } else if (messageId != null) {
            // Reusing TupleInfo object as we directly call executor.ackSpoutMsg() & are not sending msgs. perf critical
            if (isDebug) {
                if (spoutExecutorThdId != Thread.currentThread().getId()) {
                    throw new RuntimeException("Detected background thread emitting tuples for the spout. " +
                                               "Spout Output Collector should only emit from the main spout executor thread.");
                }
            }
            globalTupleInfo.clear();
            globalTupleInfo.setStream(stream);
            globalTupleInfo.setValues(values);
            globalTupleInfo.setMessageId(messageId);
            globalTupleInfo.setTimestamp(0);
            globalTupleInfo.setId("0:");
            Long timeDelta = 0L;
            executor.ackSpoutMsg(executor, taskData, timeDelta, globalTupleInfo);
        }
        return outTasks;
    }
复制代码
  • 这里neekAck的话,会调用taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer(), executor.getPendingEmits());
  • 注意这里的ackInitTuple为Values(rootId, Utils.bitXorVals(ackSeq), this.taskId),第二个值对List ackSeq进行了Utils.bitXorVals运算
  • ackSeq在没有outTask的时候,是个空的list,它的Utils.bitXorVals操作为0

Utils

storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/Utils.java

public static long bitXorVals(List<Long> coll) {
        long result = 0;
        for (Long val : coll) {
            result ^= val;
        }
        return result;
    }

    public static long bitXor(Long a, Long b) {
        return a ^ b;
    }
复制代码
  • bitXor运算是storm的ack机制的核心运算

Acker

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Acker.java

public void execute(Tuple input) {
        if (TupleUtils.isTick(input)) {
            Map<Object, AckObject> tmp = pending.rotate();
            LOG.debug("Number of timeout tuples:{}", tmp.size());
            return;
        }

        boolean resetTimeout = false;
        String streamId = input.getSourceStreamId();
        Object id = input.getValue(0);
        AckObject curr = pending.get(id);
        if (ACKER_INIT_STREAM_ID.equals(streamId)) {
            if (curr == null) {
                curr = new AckObject();
                pending.put(id, curr);
            }
            curr.updateAck(input.getLong(1));
            curr.spoutTask = input.getInteger(2);
        } else if (ACKER_ACK_STREAM_ID.equals(streamId)) {
            if (curr == null) {
                curr = new AckObject();
                pending.put(id, curr);
            }
            curr.updateAck(input.getLong(1));
        } else if (ACKER_FAIL_STREAM_ID.equals(streamId)) {
            // For the case that ack_fail message arrives before ack_init
            if (curr == null) {
                curr = new AckObject();
            }
            curr.failed = true;
            pending.put(id, curr);
        } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
            resetTimeout = true;
            if (curr != null) {
                pending.put(id, curr);
            } //else if it has not been added yet, there is no reason time it out later on
        } else if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
            collector.flush();
            return;
        } else {
            LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());
            return;
        }

        int task = curr.spoutTask;
        if (task >= 0 && (curr.val == 0 || curr.failed || resetTimeout)) {
            Values tuple = new Values(id, getTimeDeltaMillis(curr.startTime));
            if (curr.val == 0) {
                pending.remove(id);
                collector.emitDirect(task, ACKER_ACK_STREAM_ID, tuple);
            } else if (curr.failed) {
                pending.remove(id);
                collector.emitDirect(task, ACKER_FAIL_STREAM_ID, tuple);
            } else if (resetTimeout) {
                collector.emitDirect(task, ACKER_RESET_TIMEOUT_STREAM_ID, tuple);
            } else {
                throw new IllegalStateException("The checks are inconsistent we reach what should be unreachable code.");
            }
        }

        collector.ack(input);
    }

    private static class AckObject {
        public long val = 0L;
        public long startTime = Time.currentTimeMillis();
        public int spoutTask = -1;
        public boolean failed = false;

        // val xor value
        public void updateAck(Long value) {
            val = Utils.bitXor(val, value);
        }
    }
复制代码
  • 当Acker收到ACKER_INIT_STREAM_ID时,如果当前AckObject为null,则创建一个AckObject,其val默认为0;之后调用curr.updateAck(input.getLong(1)),即根据tuple的第二个值来更新AckObject的val
  • SpoutOutputCollectorImpl发射过来的tuple为Values(rootId, Utils.bitXorVals(ackSeq), this.taskId),其第二个值为Utils.bitXorVals(ackSeq);askSeq为List<Long>,当没有outputTask的时候,其list为空,而Utils.bitXorVals值为0,这种情况下,curr.updateAck(0)返回0
  • Acker在execute的最后会判断,如果curr.val == 0则会触发collector.emitDirect(task, ACKER_ACK_STREAM_ID, tuple)

SpoutExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java

public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
        String streamId = tuple.getSourceStreamId();
        if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
            spoutOutputCollector.flush();
        } else if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
            pending.rotate();
        } else if (streamId.equals(Constants.METRICS_TICK_STREAM_ID)) {
            metricsTick(idToTask.get(taskId - idToTaskBase), tuple);
        } else if (streamId.equals(Constants.CREDENTIALS_CHANGED_STREAM_ID)) {
            Object spoutObj = idToTask.get(taskId - idToTaskBase).getTaskObject();
            if (spoutObj instanceof ICredentialsListener) {
                ((ICredentialsListener) spoutObj).setCredentials((Map<String, String>) tuple.getValue(0));
            }
        } else if (streamId.equals(Acker.ACKER_RESET_TIMEOUT_STREAM_ID)) {
            Long id = (Long) tuple.getValue(0);
            TupleInfo pendingForId = pending.get(id);
            if (pendingForId != null) {
                pending.put(id, pendingForId);
            }
        } else {
            Long id = (Long) tuple.getValue(0);
            Long timeDeltaMs = (Long) tuple.getValue(1);
            TupleInfo tupleInfo = pending.remove(id);
            if (tupleInfo != null && tupleInfo.getMessageId() != null) {
                if (taskId != tupleInfo.getTaskId()) {
                    throw new RuntimeException("Fatal error, mismatched task ids: " + taskId + " " + tupleInfo.getTaskId());
                }
                Long timeDelta = null;
                if (hasAckers) {
                    long startTimeMs = tupleInfo.getTimestamp();
                    if (startTimeMs != 0) {
                        timeDelta = timeDeltaMs;
                    }
                }
                if (streamId.equals(Acker.ACKER_ACK_STREAM_ID)) {
                    ackSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo);
                } else if (streamId.equals(Acker.ACKER_FAIL_STREAM_ID)) {
                    failSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo, "FAIL-STREAM");
                }
            }
        }
    }

    public void ackSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo) {
        try {
            ISpout spout = (ISpout) taskData.getTaskObject();
            int taskId = taskData.getTaskId();
            if (executor.getIsDebug()) {
                LOG.info("SPOUT Acking message {} {}", tupleInfo.getId(), tupleInfo.getMessageId());
            }
            spout.ack(tupleInfo.getMessageId());
            if (!taskData.getUserContext().getHooks().isEmpty()) { // avoid allocating SpoutAckInfo obj if not necessary
                new SpoutAckInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
            }
            if (hasAckers && timeDelta != null) {
                executor.getStats().spoutAckedTuple(tupleInfo.getStream(), timeDelta,
                                                    taskData.getTaskMetrics().getAcked(tupleInfo.getStream()));
            }
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }
复制代码
  • SpoutExecutor在收到Acker.ACKER_ACK_STREAM_ID的时候,会调用ackSpoutMsg方法,该方法会回调原始spout的ack方法,即spout.ack(tupleInfo.getMessageId())

小结

  • MasterBatchCoordinator在第一次收到同一个msgId的ack时( 第一次被调用 ),status由开始的AttemptStatus.PROCESSING转变为AttemptStatus.PROCESSED,在之后的sync方法里头AttemptStatus.PROCESSED转变为AttemptStatus.COMMITTING,然后往MasterBatchCoordinator.COMMIT_STREAM_ID发射tuple
  • 当用户的spout是ICommitterTridentSpout时,TridentTopologyBuilder.buildTopology的时候,会配置allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID);TridentSpoutExecutor会接收MasterBatchCoordinator.COMMIT_STREAM_ID的数据,然后调用((ICommitterTridentSpout.Emitter) _emitter).commit(attempt)方法;之后TridentBoltExecutor在TridentSpoutExecutor.execute执行完了之后会自动ack该tuple,然后调用MasterBatchCoordinator的ack方法( 第二次被调用 ),然后触发_collector.emit(SUCCESS_STREAM_ID, new Values(tx))
  • 当用户的spout不是ICommitterTridentSpout时,这个时候整个topology就没有component去接收MasterBatchCoordinator.COMMIT_STREAM_ID发射的tuple,即outgoingTasks为空,那么在SpoutOutputCollectorImpl在needAck的情况下,会给Acker.ACKER_INIT_STREAM_ID发射的tuple,其第二个值为Utils.bitXorVals(ackSeq),ackSeq为空list( 根据outgoingTasks来计算 ),该值为0;那么在Acker接收到ACKER_INIT_STREAM_ID时,curr.updateAck(input.getLong(1))之后curr.val的值为0;这样Acker在execute的最后看到curr.val为0,又会给Acker.ACKER_ACK_STREAM_ID发射tuple,SpoutExecutor在收到Acker.ACKER_ACK_STREAM_ID的时候,会调用ackSpoutMsg方法,该方法会回调原始spout的ack方法,即spout.ack(tupleInfo.getMessageId());即当一个streamId没有component消费的时候,会自动ack;这样对于spout不是ICommitterTridentSpout的情况,在往MasterBatchCoordinator.COMMIT_STREAM_ID发射tuple之后,会调用MasterBatchCoordinator的ack方法( 第二次被调用 ),然后触发_collector.emit(SUCCESS_STREAM_ID, new Values(tx))

spout是否是ICommitterTridentSpout类型的区别在于不是ICommitterTridentSpout类型,它在往MasterBatchCoordinator.COMMIT_STREAM_ID发射tuple之后,Acker会自动ack,调用MasterBatchCoordinator的ack方法( 第二次被调用 );而ICommitterTridentSpout类型会先执行((ICommitterTridentSpout.Emitter) _emitter).commit(attempt)方法,然后由TridentBoltExecutor来ack,然后调用MasterBatchCoordinator的ack方法( 第二次被调用 );二者在成功的场景下最后都会往SUCCESS_STREAM_ID发送tuple


以上所述就是小编给大家介绍的《聊聊storm的ICommitterTridentSpout》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Neural Networks for Applied Sciences and Engineering

Neural Networks for Applied Sciences and Engineering

Samarasinghe, Sandhya / CRC Pr I Llc / 2006-9 / $ 118.59

In response to the exponentially increasing need to analyze vast amounts of data, Neural Networks for Applied Sciences and Engineering: From Fundamentals to Complex Pattern Recognition provides scient......一起来看看 《Neural Networks for Applied Sciences and Engineering》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具