内容简介:storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/CustomStreamGrouping.javastorm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.javastorm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/CustomStreamGrouping.java
public interface CustomStreamGrouping extends Serializable { /** * Tells the stream grouping at runtime the tasks in the target bolt. This information should be used in chooseTasks to determine the * target tasks. * * It also tells the grouping the metadata on the stream this grouping will be used on. */ void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks); /** * This function implements a custom stream grouping. It takes in as input the number of tasks in the target bolt in prepare and returns * the tasks to send the tuples to. * * @param values the values to group on */ List<Integer> chooseTasks(int taskId, List<Object> values); } 复制代码
- 这里定义了prepare以及chooseTasks方法
- GrouperFactory里头定义了FieldsGrouper、GlobalGrouper、NoneGrouper、AllGrouper、BasicLoadAwareCustomStreamGrouping
- 另外org.apache.storm.grouping包里头也定义了ShuffleGrouping、PartialKeyGrouping、LoadAwareShuffleGrouping
FieldsGrouper
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
public static class FieldsGrouper implements CustomStreamGrouping { private Fields outFields; private List<List<Integer>> targetTasks; private Fields groupFields; private int numTasks; public FieldsGrouper(Fields outFields, Grouping thriftGrouping) { this.outFields = outFields; this.groupFields = new Fields(Thrift.fieldGrouping(thriftGrouping)); } @Override public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) { this.targetTasks = new ArrayList<List<Integer>>(); for (Integer targetTask : targetTasks) { this.targetTasks.add(Collections.singletonList(targetTask)); } this.numTasks = targetTasks.size(); } @Override public List<Integer> chooseTasks(int taskId, List<Object> values) { int targetTaskIndex = TupleUtils.chooseTaskIndex(outFields.select(groupFields, values), numTasks); return targetTasks.get(targetTaskIndex); } } 复制代码
- 对选中fields的values通过TupleUtils.chooseTaskIndex选择task下标;chooseTaskIndex主要是采用Arrays.deepHashCode取哈希值然后对numTask向下取模
GlobalGrouper
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
public static class GlobalGrouper implements CustomStreamGrouping { private List<Integer> targetTasks; public GlobalGrouper() { } @Override public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) { this.targetTasks = targetTasks; } @Override public List<Integer> chooseTasks(int taskId, List<Object> values) { if (targetTasks.isEmpty()) { return null; } // It's possible for target to have multiple tasks if it reads multiple sources return Collections.singletonList(targetTasks.get(0)); } } 复制代码
- 这里固定取第一个task,即targetTasks.get(0)
NoneGrouper
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
public static class NoneGrouper implements CustomStreamGrouping { private final Random random; private List<Integer> targetTasks; private int numTasks; public NoneGrouper() { random = new Random(); } @Override public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) { this.targetTasks = targetTasks; this.numTasks = targetTasks.size(); } @Override public List<Integer> chooseTasks(int taskId, List<Object> values) { int index = random.nextInt(numTasks); return Collections.singletonList(targetTasks.get(index)); } } 复制代码
- 这里通过random.nextInt(numTasks)随机取task
AllGrouper
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
public static class AllGrouper implements CustomStreamGrouping { private List<Integer> targetTasks; @Override public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) { this.targetTasks = targetTasks; } @Override public List<Integer> chooseTasks(int taskId, List<Object> values) { return targetTasks; } } 复制代码
- 这里返回所有的targetTasks
ShuffleGrouping
storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java
public class ShuffleGrouping implements CustomStreamGrouping, Serializable { private ArrayList<List<Integer>> choices; private AtomicInteger current; @Override public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) { choices = new ArrayList<List<Integer>>(targetTasks.size()); for (Integer i : targetTasks) { choices.add(Arrays.asList(i)); } current = new AtomicInteger(0); Collections.shuffle(choices, new Random()); } @Override public List<Integer> chooseTasks(int taskId, List<Object> values) { int rightNow; int size = choices.size(); while (true) { rightNow = current.incrementAndGet(); if (rightNow < size) { return choices.get(rightNow); } else if (rightNow == size) { current.set(0); return choices.get(0); } } // race condition with another thread, and we lost. try again } } 复制代码
- 这里在prepare的时候对ArrayList<List> choices进行随机化
- 采用current.incrementAndGet()实现round robbin的效果,超过size的时候重置返回第一个,没有超过则返回incr后的index的值
PartialKeyGrouping
storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java
public class PartialKeyGrouping implements CustomStreamGrouping, Serializable { private static final long serialVersionUID = -1672360572274911808L; private List<Integer> targetTasks; private Fields fields = null; private Fields outFields = null; private AssignmentCreator assignmentCreator; private TargetSelector targetSelector; public PartialKeyGrouping() { this(null); } public PartialKeyGrouping(Fields fields) { this(fields, new RandomTwoTaskAssignmentCreator(), new BalancedTargetSelector()); } public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator) { this(fields, assignmentCreator, new BalancedTargetSelector()); } public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator, TargetSelector targetSelector) { this.fields = fields; this.assignmentCreator = assignmentCreator; this.targetSelector = targetSelector; } @Override public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) { this.targetTasks = targetTasks; if (this.fields != null) { this.outFields = context.getComponentOutputFields(stream); } } @Override public List<Integer> chooseTasks(int taskId, List<Object> values) { List<Integer> boltIds = new ArrayList<>(1); if (values.size() > 0) { final byte[] rawKeyBytes = getKeyBytes(values); final int[] taskAssignmentForKey = assignmentCreator.createAssignment(this.targetTasks, rawKeyBytes); final int selectedTask = targetSelector.chooseTask(taskAssignmentForKey); boltIds.add(selectedTask); } return boltIds; } //...... } 复制代码
- 这里通过RandomTwoTaskAssignmentCreator来选中两个taskId,然后选择使用次数小的那个
LoadAwareCustomStreamGrouping
storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
public interface LoadAwareCustomStreamGrouping extends CustomStreamGrouping { void refreshLoad(LoadMapping loadMapping); } 复制代码
qMetrics.population() / qMetrics.capacity()
以上所述就是小编给大家介绍的《聊聊storm的CustomStreamGrouping》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
PCI Express 体系结构导读
王齐 / 机械工业 / 2010-3 / 55.00元
《PCI Express 体系结构导读》讲述了与PCI及PCI Express总线相关的最为基础的内容,并介绍了一些必要的、与PCI总线相关的处理器体系结构知识,这也是《PCI Express 体系结构导读》的重点所在。深入理解处理器体系结构是理解PCI与PCI Express总线的重要基础。 读者通过对《PCI Express 体系结构导读》的学习,可超越PCI与PCI Express总线......一起来看看 《PCI Express 体系结构导读》 这本书的介绍吧!