聊聊storm TridentBoltExecutor的finishBatch方法

栏目: 编程工具 · 发布时间: 6年前

内容简介:storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/MasterBatchCoordinator.javastorm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutCoordinator.javastorm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBolt

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/MasterBatchCoordinator.java

public void nextTuple() {
        sync();
    }

    private void sync() {
        // note that sometimes the tuples active may be less than max_spout_pending, e.g.
        // max_spout_pending = 3
        // tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),
        // and there won't be a batch for tx 4 because there's max_spout_pending tx active
        TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
        if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {
            maybeCommit.status = AttemptStatus.COMMITTING;
            _collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
            LOG.debug("Emitted on [stream = {}], [tx_status = {}], [{}]", COMMIT_STREAM_ID, maybeCommit, this);
        }
        
        if(_active) {
            if(_activeTx.size() < _maxTransactionActive) {
                Long curr = _currTransaction;
                for(int i=0; i<_maxTransactionActive; i++) {
                    if(!_activeTx.containsKey(curr) && isReady(curr)) {
                        // by using a monotonically increasing attempt id, downstream tasks
                        // can be memory efficient by clearing out state for old attempts
                        // as soon as they see a higher attempt id for a transaction
                        Integer attemptId = _attemptIds.get(curr);
                        if(attemptId==null) {
                            attemptId = 0;
                        } else {
                            attemptId++;
                        }
                        _attemptIds.put(curr, attemptId);
                        for(TransactionalState state: _states) {
                            state.setData(CURRENT_ATTEMPTS, _attemptIds);
                        }
                        
                        TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);
                        final TransactionStatus newTransactionStatus = new TransactionStatus(attempt);
                        _activeTx.put(curr, newTransactionStatus);
                        _collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);
                        LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", BATCH_STREAM_ID, attempt, newTransactionStatus, this);
                        _throttler.markEvent();
                    }
                    curr = nextTransactionId(curr);
                }
            }
        }
    }
复制代码
  • MasterBatchCoordinator是整个trident的真正的spout,它的nextTuple方法会向TridentSpoutCoordinator向MasterBatchCoordinator.BATCH_STREAM_ID( $batch )发射tuple

TridentSpoutCoordinator.execute

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutCoordinator.java

public void execute(Tuple tuple, BasicOutputCollector collector) {
        TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);

        if(tuple.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
            _state.cleanupBefore(attempt.getTransactionId());
            _coord.success(attempt.getTransactionId());
        } else {
            long txid = attempt.getTransactionId();
            Object prevMeta = _state.getPreviousState(txid);
            Object meta = _coord.initializeTransaction(txid, prevMeta, _state.getState(txid));
            _state.overrideState(txid, meta);
            collector.emit(MasterBatchCoordinator.BATCH_STREAM_ID, new Values(attempt, meta));
        }
                
    }
复制代码
  • TridentSpoutCoordinator接收MasterBatchCoordinator在MasterBatchCoordinator.BATCH_STREAM_ID( $batch )发过来的tuple,然后向包装用户spout的TridentBoltExecutor发送batch指令

TridentBoltExecutor( TridentSpoutExecutor )

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java

public void execute(Tuple tuple) {
        if(TupleUtils.isTick(tuple)) {
            long now = System.currentTimeMillis();
            if(now - _lastRotate > _messageTimeoutMs) {
                _batches.rotate();
                _lastRotate = now;
            }
            return;
        }
        String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());
        if(batchGroup==null) {
            // this is so we can do things like have simple DRPC that doesn't need to use batch processing
            _coordCollector.setCurrBatch(null);
            _bolt.execute(null, tuple);
            _collector.ack(tuple);
            return;
        }
        IBatchID id = (IBatchID) tuple.getValue(0);
        //get transaction id
        //if it already exists and attempt id is greater than the attempt there
        
        
        TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());
//        if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {
//            System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()
//                    + " (" + _batches.size() + ")" +
//                    "\ntuple: " + tuple +
//                    "\nwith tracked " + tracked +
//                    "\nwith id " + id + 
//                    "\nwith group " + batchGroup
//                    + "\n");
//            
//        }
        //System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex());
        
        // this code here ensures that only one attempt is ever tracked for a batch, so when
        // failures happen you don't get an explosion in memory usage in the tasks
        if(tracked!=null) {
            if(id.getAttemptId() > tracked.attemptId) {
                _batches.remove(id.getId());
                tracked = null;
            } else if(id.getAttemptId() < tracked.attemptId) {
                // no reason to try to execute a previous attempt than we've already seen
                return;
            }
        }
        
        if(tracked==null) {
            tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId());
            _batches.put(id.getId(), tracked);
        }
        _coordCollector.setCurrBatch(tracked);
        
        //System.out.println("TRACKED: " + tracked + " " + tuple);
        
        TupleType t = getTupleType(tuple, tracked);
        if(t==TupleType.COMMIT) {
            tracked.receivedCommit = true;
            checkFinish(tracked, tuple, t);
        } else if(t==TupleType.COORD) {
            int count = tuple.getInteger(1);
            tracked.reportedTasks++;
            tracked.expectedTupleCount+=count;
            checkFinish(tracked, tuple, t);
        } else {
            tracked.receivedTuples++;
            boolean success = true;
            try {
                _bolt.execute(tracked.info, tuple);
                if(tracked.condition.expectedTaskReports==0) {
                    success = finishBatch(tracked, tuple);
                }
            } catch(FailedException e) {
                failBatch(tracked, e);
            }
            if(success) {
                _collector.ack(tuple);                   
            } else {
                _collector.fail(tuple);
            }
        }
        _coordCollector.setCurrBatch(null);
    }

    private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {
        boolean success = true;
        try {
            _bolt.finishBatch(tracked.info);
            String stream = COORD_STREAM(tracked.info.batchGroup);
            for(Integer task: tracked.condition.targetTasks) {
                _collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));
            }
            if(tracked.delayedAck!=null) {
                _collector.ack(tracked.delayedAck);
                tracked.delayedAck = null;
            }
        } catch(FailedException e) {
            failBatch(tracked, e);
            success = false;
        }
        _batches.remove(tracked.info.batchId.getId());
        return success;
    }
复制代码
  • TridentBoltExecutor.execute方法,首先会创建并初始化TrackedBatch( 如果TrackedBatch不存在的话 ),之后接收到batch指令的时候,对tracked.receivedTuple累加,然后调用_bolt.execute(tracked.info, tuple)
  • 对于spout来说,这里的_bolt是TridentSpoutExecutor,它的execute方法会往下游的TridentBoltExecutor发射一个batch的tuples;由于spout的expectedTaskReports==0,所以这里在调用完TridentSpoutExecutor发射batch的tuples时,它就立马调用finishBatch
  • finishBatch操作,这里会通过COORD_STREAM往下游的TridentBoltExecutor发射[id,count]数据,告知下游TridentBoltExecutor说它一共发射了多少tuples

TridentBoltExecutor( SubtopologyBolt )

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java

@Override
    public void execute(Tuple tuple) {
        if(TupleUtils.isTick(tuple)) {
            long now = System.currentTimeMillis();
            if(now - _lastRotate > _messageTimeoutMs) {
                _batches.rotate();
                _lastRotate = now;
            }
            return;
        }
        String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());
        if(batchGroup==null) {
            // this is so we can do things like have simple DRPC that doesn't need to use batch processing
            _coordCollector.setCurrBatch(null);
            _bolt.execute(null, tuple);
            _collector.ack(tuple);
            return;
        }
        IBatchID id = (IBatchID) tuple.getValue(0);
        //get transaction id
        //if it already exists and attempt id is greater than the attempt there
        
        
        TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());
//        if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {
//            System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()
//                    + " (" + _batches.size() + ")" +
//                    "\ntuple: " + tuple +
//                    "\nwith tracked " + tracked +
//                    "\nwith id " + id + 
//                    "\nwith group " + batchGroup
//                    + "\n");
//            
//        }
        //System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex());
        
        // this code here ensures that only one attempt is ever tracked for a batch, so when
        // failures happen you don't get an explosion in memory usage in the tasks
        if(tracked!=null) {
            if(id.getAttemptId() > tracked.attemptId) {
                _batches.remove(id.getId());
                tracked = null;
            } else if(id.getAttemptId() < tracked.attemptId) {
                // no reason to try to execute a previous attempt than we've already seen
                return;
            }
        }
        
        if(tracked==null) {
            tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId());
            _batches.put(id.getId(), tracked);
        }
        _coordCollector.setCurrBatch(tracked);
        
        //System.out.println("TRACKED: " + tracked + " " + tuple);
        
        TupleType t = getTupleType(tuple, tracked);
        if(t==TupleType.COMMIT) {
            tracked.receivedCommit = true;
            checkFinish(tracked, tuple, t);
        } else if(t==TupleType.COORD) {
            int count = tuple.getInteger(1);
            tracked.reportedTasks++;
            tracked.expectedTupleCount+=count;
            checkFinish(tracked, tuple, t);
        } else {
            tracked.receivedTuples++;
            boolean success = true;
            try {
                _bolt.execute(tracked.info, tuple);
                if(tracked.condition.expectedTaskReports==0) {
                    success = finishBatch(tracked, tuple);
                }
            } catch(FailedException e) {
                failBatch(tracked, e);
            }
            if(success) {
                _collector.ack(tuple);                   
            } else {
                _collector.fail(tuple);
            }
        }
        _coordCollector.setCurrBatch(null);
    }

    private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) {
        if(tracked.failed) {
            failBatch(tracked);
            _collector.fail(tuple);
            return;
        }
        CoordCondition cond = tracked.condition;
        boolean delayed = tracked.delayedAck==null &&
                              (cond.commitStream!=null && type==TupleType.COMMIT
                               || cond.commitStream==null);
        if(delayed) {
            tracked.delayedAck = tuple;
        }
        boolean failed = false;
        if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) {
            if(tracked.receivedTuples == tracked.expectedTupleCount) {
                finishBatch(tracked, tuple);                
            } else {
                //TODO: add logging that not all tuples were received
                failBatch(tracked);
                _collector.fail(tuple);
                failed = true;
            }
        }
        
        if(!delayed && !failed) {
            _collector.ack(tuple);
        }
        
    }

    private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {
        boolean success = true;
        try {
            _bolt.finishBatch(tracked.info);
            String stream = COORD_STREAM(tracked.info.batchGroup);
            for(Integer task: tracked.condition.targetTasks) {
                _collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));
            }
            if(tracked.delayedAck!=null) {
                _collector.ack(tracked.delayedAck);
                tracked.delayedAck = null;
            }
        } catch(FailedException e) {
            failBatch(tracked, e);
            success = false;
        }
        _batches.remove(tracked.info.batchId.getId());
        return success;
    }
复制代码
  • TridentBoltExecutor( SubtopologyBolt )是spout下游的bolt,它的_bolt是SubtopologyBolt,而且它的tracked.condition.expectedTaskReports不为0,因而它是在接收到TupleType.COORD的tuple的时候,才进行checkFinish操作( 这里先忽略TupleType.COMMIT类型 )
  • 由于BoltExecutor是使用Utils.asyncLoop来挨个消费receiveQueue的数据的,而且emitBatch的时候也是挨个接收batch的tuples,最后再接收到TridentBoltExecutor( TridentSpoutExecutor )在finishBatch的时候通过COORD_STREAM发过来的[id,count]的tuple( 注意这里的COORD_STREAM是分发给每个task的,如果TridentBoltExecutor有多个parallel,则他们是按各自的task来接收的 )
  • 所以TridentBoltExecutor( SubtopologyBolt )先挨个处理每个tuple,处理完之后才轮到TupleType.COORD这个tuple,然后触发checkFinish操作;在没有commitStream的情况下,tracked.receivedCommit默认为true,因而这里只要检测收到的tuples与应收的tuples数一致,就执行_bolt.finishBatch操作完成一个batch,然后再往它的下游TridentBoltExecutor发射它应收的[id,count]的tuple

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

用户力:需求驱动的产品、运营和商业模式

用户力:需求驱动的产品、运营和商业模式

郝志中 / 机械工业出版社 / 2015-11-1 / 59.00

《用户力:需求驱动的产品、运营和商业模式》从用户需求角度深刻阐释了互联网产品设计、网络运营、商业模式构建的本质与方法论! 本书以“用户需求”为主线,先用逆向思维进行倒推,从本质的角度分析了用户的需求是如何驱动企业的产品设计、网络运营和商业模式构建的,将这三个重要部分进行了系统性和结构化的串联,然后用顺向思维进行铺陈,从实践和方法论的角度总结了企业究竟应该如围绕用户的真实需求来进行产品设计、网......一起来看看 《用户力:需求驱动的产品、运营和商业模式》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换