内容简介:log实例storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.javastorm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutExecutor.java
TridentTopology topology = new TridentTopology(); topology.newStream("spout1", spout) .partitionBy(new Fields("user")) .partitionAggregate(new Fields("user","score","batchId"),new OriginUserCountAggregator(),new Fields("result","aggBatchId")) .parallelismHint(3) .global() .aggregate(new Fields("result","aggBatchId"),new AggAgg(),new Fields("agg")) .each(new Fields("agg"),new PrintEachFunc(),new Fields()) ; 复制代码
- 这里最后构造了3个bolt,分别为b-0、b-1、b-2
- b-0主要是partitionAggregate,它的parallelismHint为3
- b-1主要是处理CombinerAggregator的init,它的parallelismHint为1,由于它的上游bolt有3个task,因而它的TridentBoltExecutor的tracked.condition.expectedTaskReports为3,它要等到这三个task的聚合数据都到了之后,才能finishBatch
- b-2主要是处理CombinerAggregator的combine以及each操作
- 整个数据流从spout开始的一个batch,到了b-0通过partitionBy分流为3个子batch,到了b-1则聚合了3个子batch之后才finishBatch,到了b-2则在b-1聚合之后的结果在做最后的聚合
log实例
23:22:00.718 [Thread-49-spout-spout1-executor[11 11]] INFO com.example.demo.trident.batch.DebugFixedBatchSpout - batchId:1,emit:[nickt1, 1] 23:22:00.718 [Thread-49-spout-spout1-executor[11 11]] INFO com.example.demo.trident.batch.DebugFixedBatchSpout - batchId:1,emit:[nickt2, 1] 23:22:00.718 [Thread-49-spout-spout1-executor[11 11]] INFO com.example.demo.trident.batch.DebugFixedBatchSpout - batchId:1,emit:[nickt3, 1] 23:22:00.720 [Thread-45-b-0-executor[8 8]] INFO com.example.demo.trident.OriginUserCountAggregator - null init map, aggBatchId:1:0 23:22:00.720 [Thread-45-b-0-executor[8 8]] INFO com.example.demo.trident.OriginUserCountAggregator - null aggregate batch:1,tuple:[nickt2, 1, 1] 23:22:00.720 [Thread-45-b-0-executor[8 8]] INFO com.example.demo.trident.OriginUserCountAggregator - null complete agg batch:1:0,val:{1={nickt2=1}} 23:22:00.722 [Thread-22-b-0-executor[7 7]] INFO com.example.demo.trident.OriginUserCountAggregator - null init map, aggBatchId:1:0 23:22:00.723 [Thread-29-b-0-executor[6 6]] INFO com.example.demo.trident.OriginUserCountAggregator - null init map, aggBatchId:1:0 23:22:00.723 [Thread-22-b-0-executor[7 7]] INFO com.example.demo.trident.OriginUserCountAggregator - null aggregate batch:1,tuple:[nickt1, 1, 1] 23:22:00.723 [Thread-29-b-0-executor[6 6]] INFO com.example.demo.trident.OriginUserCountAggregator - null aggregate batch:1,tuple:[nickt3, 1, 1] 23:22:00.723 [Thread-22-b-0-executor[7 7]] INFO com.example.demo.trident.OriginUserCountAggregator - null complete agg batch:1:0,val:{1={nickt1=1}} 23:22:00.723 [Thread-29-b-0-executor[6 6]] INFO com.example.demo.trident.OriginUserCountAggregator - null complete agg batch:1:0,val:{1={nickt3=1}} 23:22:00.724 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - zero called 23:22:00.724 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - init tuple:[{1={nickt2=1}}, 1:0] 23:22:00.724 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - combine val1:{},val2:{1={nickt2=1}} 23:22:00.726 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - init tuple:[{1={nickt3=1}}, 1:0] 23:22:00.727 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - combine val1:{1={nickt2=1}},val2:{1={nickt3=1}} 23:22:00.728 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - init tuple:[{1={nickt1=1}}, 1:0] 23:22:00.728 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - combine val1:{1={nickt3=1, nickt2=1}},val2:{1={nickt1=1}} 23:22:00.731 [Thread-31-b-2-executor[10 10]] INFO com.example.demo.trident.AggAgg - zero called 23:22:00.731 [Thread-31-b-2-executor[10 10]] INFO com.example.demo.trident.AggAgg - combine val1:{},val2:{1={nickt3=1, nickt2=1, nickt1=1}} 23:22:00.731 [Thread-31-b-2-executor[10 10]] INFO com.example.demo.trident.PrintEachFunc - null each tuple:[{1={nickt3=1, nickt2=1, nickt1=1}}] 复制代码
- 这里看到storm的线程的命名已经带上了bolt的命名,比如b-0、b-1、b-2
TridentBoltExecutor
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java
public void execute(Tuple tuple) { if(TupleUtils.isTick(tuple)) { long now = System.currentTimeMillis(); if(now - _lastRotate > _messageTimeoutMs) { _batches.rotate(); _lastRotate = now; } return; } String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId()); if(batchGroup==null) { // this is so we can do things like have simple DRPC that doesn't need to use batch processing _coordCollector.setCurrBatch(null); _bolt.execute(null, tuple); _collector.ack(tuple); return; } IBatchID id = (IBatchID) tuple.getValue(0); //get transaction id //if it already exists and attempt id is greater than the attempt there TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId()); // if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) { // System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex() // + " (" + _batches.size() + ")" + // "\ntuple: " + tuple + // "\nwith tracked " + tracked + // "\nwith id " + id + // "\nwith group " + batchGroup // + "\n"); // // } //System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()); // this code here ensures that only one attempt is ever tracked for a batch, so when // failures happen you don't get an explosion in memory usage in the tasks if(tracked!=null) { if(id.getAttemptId() > tracked.attemptId) { _batches.remove(id.getId()); tracked = null; } else if(id.getAttemptId() < tracked.attemptId) { // no reason to try to execute a previous attempt than we've already seen return; } } if(tracked==null) { tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId()); _batches.put(id.getId(), tracked); } _coordCollector.setCurrBatch(tracked); //System.out.println("TRACKED: " + tracked + " " + tuple); TupleType t = getTupleType(tuple, tracked); if(t==TupleType.COMMIT) { tracked.receivedCommit = true; checkFinish(tracked, tuple, t); } else if(t==TupleType.COORD) { int count = tuple.getInteger(1); tracked.reportedTasks++; tracked.expectedTupleCount+=count; checkFinish(tracked, tuple, t); } else { tracked.receivedTuples++; boolean success = true; try { _bolt.execute(tracked.info, tuple); if(tracked.condition.expectedTaskReports==0) { success = finishBatch(tracked, tuple); } } catch(FailedException e) { failBatch(tracked, e); } if(success) { _collector.ack(tuple); } else { _collector.fail(tuple); } } _coordCollector.setCurrBatch(null); } private void failBatch(TrackedBatch tracked, FailedException e) { if(e!=null && e instanceof ReportedFailedException) { _collector.reportError(e); } tracked.failed = true; if(tracked.delayedAck!=null) { _collector.fail(tracked.delayedAck); tracked.delayedAck = null; } } private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) { if(tracked.failed) { failBatch(tracked); _collector.fail(tuple); return; } CoordCondition cond = tracked.condition; boolean delayed = tracked.delayedAck==null && (cond.commitStream!=null && type==TupleType.COMMIT || cond.commitStream==null); if(delayed) { tracked.delayedAck = tuple; } boolean failed = false; if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) { if(tracked.receivedTuples == tracked.expectedTupleCount) { finishBatch(tracked, tuple); } else { //TODO: add logging that not all tuples were received failBatch(tracked); _collector.fail(tuple); failed = true; } } if(!delayed && !failed) { _collector.ack(tuple); } } 复制代码
- execute方法里头在TrackedBatch不存在时会创建一个,创建的时候会调用_bolt.initBatchState方法
- 这里头可以看到在接收到正常tuple的时候,先调用_bolt.execute(tracked.info, tuple)执行,然后在调用_collector的ack,如果_bolt.execute抛出FailedException,则直接failBatch,它会标记tracked.failed为true,最后在整个batch的tuple收发结束之后调用checkFinish,一旦发现有tracked.failed,则会调用_collector.fail
-
这里的_bolt有两类,分别是TridentSpoutExecutor与SubtopologyBolt;如果是TridentSpoutExecutor,则tracked.condition.expectedTaskReports为0,这里每收到一个tuple(
实际是发射一个batch的指令
),在_bolt.execute之后就立马finishBatch;而对于SubtopologyBolt,这里tracked.condition.expectedTaskReports不为0,需要等到最后的[id,count]指令再checkFinish
TridentSpoutExecutor
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutExecutor.java
@Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector) { _emitter = _spout.getEmitter(_txStateId, conf, context); _collector = new AddIdCollector(_streamName, collector); } @Override public void execute(BatchInfo info, Tuple input) { // there won't be a BatchInfo for the success stream TransactionAttempt attempt = (TransactionAttempt) input.getValue(0); if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) { if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) { ((ICommitterTridentSpout.Emitter) _emitter).commit(attempt); _activeBatches.remove(attempt.getTransactionId()); } else { throw new FailedException("Received commit for different transaction attempt"); } } else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) { // valid to delete before what's been committed since // those batches will never be accessed again _activeBatches.headMap(attempt.getTransactionId()).clear(); _emitter.success(attempt); } else { _collector.setBatch(info.batchId); _emitter.emitBatch(attempt, input.getValue(1), _collector); _activeBatches.put(attempt.getTransactionId(), attempt); } } @Override public void finishBatch(BatchInfo batchInfo) { } @Override public Object initBatchState(String batchGroup, Object batchId) { return null; } 复制代码
- TridentSpoutExecutor使用的是AddIdCollector,它的initBatchState以及finishBatch方法均为空操作
- execute方法分COMMIT_STREAM_ID、SUCCESS_STREAM_ID、普通stream来处理
- 普通的stream发来的tuple就是发射batch的指令,这里就调用_emitter.emitBatch发射batch的tuples
SubtopologyBolt
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/SubtopologyBolt.java
@Override public Object initBatchState(String batchGroup, Object batchId) { ProcessorContext ret = new ProcessorContext(batchId, new Object[_nodes.size()]); for(TridentProcessor p: _myTopologicallyOrdered.get(batchGroup)) { p.startBatch(ret); } return ret; } @Override public void execute(BatchInfo batchInfo, Tuple tuple) { String sourceStream = tuple.getSourceStreamId(); InitialReceiver ir = _roots.get(sourceStream); if(ir==null) { throw new RuntimeException("Received unexpected tuple " + tuple.toString()); } ir.receive((ProcessorContext) batchInfo.state, tuple); } @Override public void finishBatch(BatchInfo batchInfo) { for(TridentProcessor p: _myTopologicallyOrdered.get(batchInfo.batchGroup)) { p.finishBatch((ProcessorContext) batchInfo.state); } } protected static class InitialReceiver { List<TridentProcessor> _receivers = new ArrayList<>(); RootFactory _factory; ProjectionFactory _project; String _stream; public InitialReceiver(String stream, Fields allFields) { // TODO: don't want to project for non-batch bolts...??? // how to distinguish "batch" streams from non-batch streams? _stream = stream; _factory = new RootFactory(allFields); List<String> projected = new ArrayList<>(allFields.toList()); projected.remove(0); _project = new ProjectionFactory(_factory, new Fields(projected)); } public void receive(ProcessorContext context, Tuple tuple) { TridentTuple t = _project.create(_factory.create(tuple)); for(TridentProcessor r: _receivers) { r.execute(context, _stream, t); } } public void addReceiver(TridentProcessor p) { _receivers.add(p); } public Factory getOutputFactory() { return _project; } } 复制代码
比如AggregateProcessor、EachProcessor 比如AggregateProcessor 比如AggregateProcessor、EachProcessor
WindowTridentProcessor
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/WindowTridentProcessor.java
@Override public void startBatch(ProcessorContext processorContext) { // initialize state for batch processorContext.state[tridentContext.getStateIndex()] = new ArrayList<TridentTuple>(); } @Override public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) { // add tuple to the batch state Object state = processorContext.state[tridentContext.getStateIndex()]; ((List<TridentTuple>) state).add(projection.create(tuple)); } @Override public void finishBatch(ProcessorContext processorContext) { Object batchId = processorContext.batchId; Object batchTxnId = getBatchTxnId(batchId); LOG.debug("Received finishBatch of : [{}] ", batchId); // get all the tuples in a batch and add it to trident-window-manager List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()]; tridentWindowManager.addTuplesBatch(batchId, tuples); List<Integer> pendingTriggerIds = null; List<String> triggerKeys = new ArrayList<>(); Iterable<Object> triggerValues = null; if (retriedAttempt(batchId)) { pendingTriggerIds = (List<Integer>) windowStore.get(inprocessTriggerKey(batchTxnId)); if (pendingTriggerIds != null) { for (Integer pendingTriggerId : pendingTriggerIds) { triggerKeys.add(triggerKey(pendingTriggerId)); } triggerValues = windowStore.get(triggerKeys); } } // if there are no trigger values in earlier attempts or this is a new batch, emit pending triggers. if(triggerValues == null) { pendingTriggerIds = new ArrayList<>(); Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers(); LOG.debug("pending triggers at batch: [{}] and triggers.size: [{}] ", batchId, pendingTriggers.size()); try { Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator(); List<Object> values = new ArrayList<>(); StoreBasedTridentWindowManager.TriggerResult triggerResult = null; while (pendingTriggersIter.hasNext()) { triggerResult = pendingTriggersIter.next(); for (List<Object> aggregatedResult : triggerResult.result) { String triggerKey = triggerKey(triggerResult.id); triggerKeys.add(triggerKey); values.add(aggregatedResult); pendingTriggerIds.add(triggerResult.id); } pendingTriggersIter.remove(); } triggerValues = values; } finally { // store inprocess triggers of a batch in store for batch retries for any failures if (!pendingTriggerIds.isEmpty()) { windowStore.put(inprocessTriggerKey(batchTxnId), pendingTriggerIds); } } } collector.setContext(processorContext); int i = 0; for (Object resultValue : triggerValues) { collector.emit(new ConsList(new TriggerInfo(windowTaskId, pendingTriggerIds.get(i++)), (List<Object>) resultValue)); } collector.setContext(null); } 复制代码
比如ProjectedProcessor、PartitionPersistProcessor
小结
TridentBoltExecutor会在tuple处理完之后自动帮你进行ack WindowTridentProcessor每次在startBatch的时候都会重置state topology.trident.batch.emit.interval.millis,在defaults.yaml默认为500 tuple按emit的顺序来,最后一个是[id,count],它就相当于结束batch的指令,用于检测及触发完成batch操作
doc
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 监控聚合器系列之: open-falcon新聚合器polymetric
- elasticsearch学习笔记(七)——快速入门案例实战之电商网站商品管理:嵌套聚合,下钻分析,聚合分析
- mongodb高级聚合查询
- MongoDB聚合(aggregate)
- mongodb 聚合管道
- MongoDB指南---16、聚合
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。