内容简介:log实例storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.javastorm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutExecutor.java
TridentTopology topology = new TridentTopology(); topology.newStream("spout1", spout) .partitionBy(new Fields("user")) .partitionAggregate(new Fields("user","score","batchId"),new OriginUserCountAggregator(),new Fields("result","aggBatchId")) .parallelismHint(3) .global() .aggregate(new Fields("result","aggBatchId"),new AggAgg(),new Fields("agg")) .each(new Fields("agg"),new PrintEachFunc(),new Fields()) ; 复制代码
- 这里最后构造了3个bolt,分别为b-0、b-1、b-2
- b-0主要是partitionAggregate,它的parallelismHint为3
- b-1主要是处理CombinerAggregator的init,它的parallelismHint为1,由于它的上游bolt有3个task,因而它的TridentBoltExecutor的tracked.condition.expectedTaskReports为3,它要等到这三个task的聚合数据都到了之后,才能finishBatch
- b-2主要是处理CombinerAggregator的combine以及each操作
- 整个数据流从spout开始的一个batch,到了b-0通过partitionBy分流为3个子batch,到了b-1则聚合了3个子batch之后才finishBatch,到了b-2则在b-1聚合之后的结果在做最后的聚合
log实例
23:22:00.718 [Thread-49-spout-spout1-executor[11 11]] INFO com.example.demo.trident.batch.DebugFixedBatchSpout - batchId:1,emit:[nickt1, 1] 23:22:00.718 [Thread-49-spout-spout1-executor[11 11]] INFO com.example.demo.trident.batch.DebugFixedBatchSpout - batchId:1,emit:[nickt2, 1] 23:22:00.718 [Thread-49-spout-spout1-executor[11 11]] INFO com.example.demo.trident.batch.DebugFixedBatchSpout - batchId:1,emit:[nickt3, 1] 23:22:00.720 [Thread-45-b-0-executor[8 8]] INFO com.example.demo.trident.OriginUserCountAggregator - null init map, aggBatchId:1:0 23:22:00.720 [Thread-45-b-0-executor[8 8]] INFO com.example.demo.trident.OriginUserCountAggregator - null aggregate batch:1,tuple:[nickt2, 1, 1] 23:22:00.720 [Thread-45-b-0-executor[8 8]] INFO com.example.demo.trident.OriginUserCountAggregator - null complete agg batch:1:0,val:{1={nickt2=1}} 23:22:00.722 [Thread-22-b-0-executor[7 7]] INFO com.example.demo.trident.OriginUserCountAggregator - null init map, aggBatchId:1:0 23:22:00.723 [Thread-29-b-0-executor[6 6]] INFO com.example.demo.trident.OriginUserCountAggregator - null init map, aggBatchId:1:0 23:22:00.723 [Thread-22-b-0-executor[7 7]] INFO com.example.demo.trident.OriginUserCountAggregator - null aggregate batch:1,tuple:[nickt1, 1, 1] 23:22:00.723 [Thread-29-b-0-executor[6 6]] INFO com.example.demo.trident.OriginUserCountAggregator - null aggregate batch:1,tuple:[nickt3, 1, 1] 23:22:00.723 [Thread-22-b-0-executor[7 7]] INFO com.example.demo.trident.OriginUserCountAggregator - null complete agg batch:1:0,val:{1={nickt1=1}} 23:22:00.723 [Thread-29-b-0-executor[6 6]] INFO com.example.demo.trident.OriginUserCountAggregator - null complete agg batch:1:0,val:{1={nickt3=1}} 23:22:00.724 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - zero called 23:22:00.724 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - init tuple:[{1={nickt2=1}}, 1:0] 23:22:00.724 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - combine val1:{},val2:{1={nickt2=1}} 23:22:00.726 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - init tuple:[{1={nickt3=1}}, 1:0] 23:22:00.727 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - combine val1:{1={nickt2=1}},val2:{1={nickt3=1}} 23:22:00.728 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - init tuple:[{1={nickt1=1}}, 1:0] 23:22:00.728 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - combine val1:{1={nickt3=1, nickt2=1}},val2:{1={nickt1=1}} 23:22:00.731 [Thread-31-b-2-executor[10 10]] INFO com.example.demo.trident.AggAgg - zero called 23:22:00.731 [Thread-31-b-2-executor[10 10]] INFO com.example.demo.trident.AggAgg - combine val1:{},val2:{1={nickt3=1, nickt2=1, nickt1=1}} 23:22:00.731 [Thread-31-b-2-executor[10 10]] INFO com.example.demo.trident.PrintEachFunc - null each tuple:[{1={nickt3=1, nickt2=1, nickt1=1}}] 复制代码
- 这里看到storm的线程的命名已经带上了bolt的命名,比如b-0、b-1、b-2
TridentBoltExecutor
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java
public void execute(Tuple tuple) { if(TupleUtils.isTick(tuple)) { long now = System.currentTimeMillis(); if(now - _lastRotate > _messageTimeoutMs) { _batches.rotate(); _lastRotate = now; } return; } String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId()); if(batchGroup==null) { // this is so we can do things like have simple DRPC that doesn't need to use batch processing _coordCollector.setCurrBatch(null); _bolt.execute(null, tuple); _collector.ack(tuple); return; } IBatchID id = (IBatchID) tuple.getValue(0); //get transaction id //if it already exists and attempt id is greater than the attempt there TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId()); // if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) { // System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex() // + " (" + _batches.size() + ")" + // "\ntuple: " + tuple + // "\nwith tracked " + tracked + // "\nwith id " + id + // "\nwith group " + batchGroup // + "\n"); // // } //System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()); // this code here ensures that only one attempt is ever tracked for a batch, so when // failures happen you don't get an explosion in memory usage in the tasks if(tracked!=null) { if(id.getAttemptId() > tracked.attemptId) { _batches.remove(id.getId()); tracked = null; } else if(id.getAttemptId() < tracked.attemptId) { // no reason to try to execute a previous attempt than we've already seen return; } } if(tracked==null) { tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId()); _batches.put(id.getId(), tracked); } _coordCollector.setCurrBatch(tracked); //System.out.println("TRACKED: " + tracked + " " + tuple); TupleType t = getTupleType(tuple, tracked); if(t==TupleType.COMMIT) { tracked.receivedCommit = true; checkFinish(tracked, tuple, t); } else if(t==TupleType.COORD) { int count = tuple.getInteger(1); tracked.reportedTasks++; tracked.expectedTupleCount+=count; checkFinish(tracked, tuple, t); } else { tracked.receivedTuples++; boolean success = true; try { _bolt.execute(tracked.info, tuple); if(tracked.condition.expectedTaskReports==0) { success = finishBatch(tracked, tuple); } } catch(FailedException e) { failBatch(tracked, e); } if(success) { _collector.ack(tuple); } else { _collector.fail(tuple); } } _coordCollector.setCurrBatch(null); } private void failBatch(TrackedBatch tracked, FailedException e) { if(e!=null && e instanceof ReportedFailedException) { _collector.reportError(e); } tracked.failed = true; if(tracked.delayedAck!=null) { _collector.fail(tracked.delayedAck); tracked.delayedAck = null; } } private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) { if(tracked.failed) { failBatch(tracked); _collector.fail(tuple); return; } CoordCondition cond = tracked.condition; boolean delayed = tracked.delayedAck==null && (cond.commitStream!=null && type==TupleType.COMMIT || cond.commitStream==null); if(delayed) { tracked.delayedAck = tuple; } boolean failed = false; if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) { if(tracked.receivedTuples == tracked.expectedTupleCount) { finishBatch(tracked, tuple); } else { //TODO: add logging that not all tuples were received failBatch(tracked); _collector.fail(tuple); failed = true; } } if(!delayed && !failed) { _collector.ack(tuple); } } 复制代码
- execute方法里头在TrackedBatch不存在时会创建一个,创建的时候会调用_bolt.initBatchState方法
- 这里头可以看到在接收到正常tuple的时候,先调用_bolt.execute(tracked.info, tuple)执行,然后在调用_collector的ack,如果_bolt.execute抛出FailedException,则直接failBatch,它会标记tracked.failed为true,最后在整个batch的tuple收发结束之后调用checkFinish,一旦发现有tracked.failed,则会调用_collector.fail
-
这里的_bolt有两类,分别是TridentSpoutExecutor与SubtopologyBolt;如果是TridentSpoutExecutor,则tracked.condition.expectedTaskReports为0,这里每收到一个tuple(
实际是发射一个batch的指令
),在_bolt.execute之后就立马finishBatch;而对于SubtopologyBolt,这里tracked.condition.expectedTaskReports不为0,需要等到最后的[id,count]指令再checkFinish
TridentSpoutExecutor
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutExecutor.java
@Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector) { _emitter = _spout.getEmitter(_txStateId, conf, context); _collector = new AddIdCollector(_streamName, collector); } @Override public void execute(BatchInfo info, Tuple input) { // there won't be a BatchInfo for the success stream TransactionAttempt attempt = (TransactionAttempt) input.getValue(0); if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) { if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) { ((ICommitterTridentSpout.Emitter) _emitter).commit(attempt); _activeBatches.remove(attempt.getTransactionId()); } else { throw new FailedException("Received commit for different transaction attempt"); } } else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) { // valid to delete before what's been committed since // those batches will never be accessed again _activeBatches.headMap(attempt.getTransactionId()).clear(); _emitter.success(attempt); } else { _collector.setBatch(info.batchId); _emitter.emitBatch(attempt, input.getValue(1), _collector); _activeBatches.put(attempt.getTransactionId(), attempt); } } @Override public void finishBatch(BatchInfo batchInfo) { } @Override public Object initBatchState(String batchGroup, Object batchId) { return null; } 复制代码
- TridentSpoutExecutor使用的是AddIdCollector,它的initBatchState以及finishBatch方法均为空操作
- execute方法分COMMIT_STREAM_ID、SUCCESS_STREAM_ID、普通stream来处理
- 普通的stream发来的tuple就是发射batch的指令,这里就调用_emitter.emitBatch发射batch的tuples
SubtopologyBolt
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/SubtopologyBolt.java
@Override public Object initBatchState(String batchGroup, Object batchId) { ProcessorContext ret = new ProcessorContext(batchId, new Object[_nodes.size()]); for(TridentProcessor p: _myTopologicallyOrdered.get(batchGroup)) { p.startBatch(ret); } return ret; } @Override public void execute(BatchInfo batchInfo, Tuple tuple) { String sourceStream = tuple.getSourceStreamId(); InitialReceiver ir = _roots.get(sourceStream); if(ir==null) { throw new RuntimeException("Received unexpected tuple " + tuple.toString()); } ir.receive((ProcessorContext) batchInfo.state, tuple); } @Override public void finishBatch(BatchInfo batchInfo) { for(TridentProcessor p: _myTopologicallyOrdered.get(batchInfo.batchGroup)) { p.finishBatch((ProcessorContext) batchInfo.state); } } protected static class InitialReceiver { List<TridentProcessor> _receivers = new ArrayList<>(); RootFactory _factory; ProjectionFactory _project; String _stream; public InitialReceiver(String stream, Fields allFields) { // TODO: don't want to project for non-batch bolts...??? // how to distinguish "batch" streams from non-batch streams? _stream = stream; _factory = new RootFactory(allFields); List<String> projected = new ArrayList<>(allFields.toList()); projected.remove(0); _project = new ProjectionFactory(_factory, new Fields(projected)); } public void receive(ProcessorContext context, Tuple tuple) { TridentTuple t = _project.create(_factory.create(tuple)); for(TridentProcessor r: _receivers) { r.execute(context, _stream, t); } } public void addReceiver(TridentProcessor p) { _receivers.add(p); } public Factory getOutputFactory() { return _project; } } 复制代码
比如AggregateProcessor、EachProcessor 比如AggregateProcessor 比如AggregateProcessor、EachProcessor
WindowTridentProcessor
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/WindowTridentProcessor.java
@Override public void startBatch(ProcessorContext processorContext) { // initialize state for batch processorContext.state[tridentContext.getStateIndex()] = new ArrayList<TridentTuple>(); } @Override public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) { // add tuple to the batch state Object state = processorContext.state[tridentContext.getStateIndex()]; ((List<TridentTuple>) state).add(projection.create(tuple)); } @Override public void finishBatch(ProcessorContext processorContext) { Object batchId = processorContext.batchId; Object batchTxnId = getBatchTxnId(batchId); LOG.debug("Received finishBatch of : [{}] ", batchId); // get all the tuples in a batch and add it to trident-window-manager List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()]; tridentWindowManager.addTuplesBatch(batchId, tuples); List<Integer> pendingTriggerIds = null; List<String> triggerKeys = new ArrayList<>(); Iterable<Object> triggerValues = null; if (retriedAttempt(batchId)) { pendingTriggerIds = (List<Integer>) windowStore.get(inprocessTriggerKey(batchTxnId)); if (pendingTriggerIds != null) { for (Integer pendingTriggerId : pendingTriggerIds) { triggerKeys.add(triggerKey(pendingTriggerId)); } triggerValues = windowStore.get(triggerKeys); } } // if there are no trigger values in earlier attempts or this is a new batch, emit pending triggers. if(triggerValues == null) { pendingTriggerIds = new ArrayList<>(); Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers(); LOG.debug("pending triggers at batch: [{}] and triggers.size: [{}] ", batchId, pendingTriggers.size()); try { Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator(); List<Object> values = new ArrayList<>(); StoreBasedTridentWindowManager.TriggerResult triggerResult = null; while (pendingTriggersIter.hasNext()) { triggerResult = pendingTriggersIter.next(); for (List<Object> aggregatedResult : triggerResult.result) { String triggerKey = triggerKey(triggerResult.id); triggerKeys.add(triggerKey); values.add(aggregatedResult); pendingTriggerIds.add(triggerResult.id); } pendingTriggersIter.remove(); } triggerValues = values; } finally { // store inprocess triggers of a batch in store for batch retries for any failures if (!pendingTriggerIds.isEmpty()) { windowStore.put(inprocessTriggerKey(batchTxnId), pendingTriggerIds); } } } collector.setContext(processorContext); int i = 0; for (Object resultValue : triggerValues) { collector.emit(new ConsList(new TriggerInfo(windowTaskId, pendingTriggerIds.get(i++)), (List<Object>) resultValue)); } collector.setContext(null); } 复制代码
比如ProjectedProcessor、PartitionPersistProcessor
小结
TridentBoltExecutor会在tuple处理完之后自动帮你进行ack WindowTridentProcessor每次在startBatch的时候都会重置state topology.trident.batch.emit.interval.millis,在defaults.yaml默认为500 tuple按emit的顺序来,最后一个是[id,count],它就相当于结束batch的指令,用于检测及触发完成batch操作
doc
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 监控聚合器系列之: open-falcon新聚合器polymetric
- elasticsearch学习笔记(七)——快速入门案例实战之电商网站商品管理:嵌套聚合,下钻分析,聚合分析
- mongodb高级聚合查询
- MongoDB聚合(aggregate)
- mongodb 聚合管道
- MongoDB指南---16、聚合
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Approximation Algorithms
Vijay V. Vazirani / Springer / 2001-07-02 / USD 54.95
'This book covers the dominant theoretical approaches to the approximate solution of hard combinatorial optimization and enumeration problems. It contains elegant combinatorial theory, useful and inte......一起来看看 《Approximation Algorithms》 这本书的介绍吧!