内容简介:storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/CustomStreamGrouping.javastorm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.javastorm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/CustomStreamGrouping.java
public interface CustomStreamGrouping extends Serializable {
/**
* Tells the stream grouping at runtime the tasks in the target bolt. This information should be used in chooseTasks to determine the
* target tasks.
*
* It also tells the grouping the metadata on the stream this grouping will be used on.
*/
void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);
/**
* This function implements a custom stream grouping. It takes in as input the number of tasks in the target bolt in prepare and returns
* the tasks to send the tuples to.
*
* @param values the values to group on
*/
List<Integer> chooseTasks(int taskId, List<Object> values);
}
复制代码
- 这里定义了prepare以及chooseTasks方法
- GrouperFactory里头定义了FieldsGrouper、GlobalGrouper、NoneGrouper、AllGrouper、BasicLoadAwareCustomStreamGrouping
- 另外org.apache.storm.grouping包里头也定义了ShuffleGrouping、PartialKeyGrouping、LoadAwareShuffleGrouping
FieldsGrouper
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
public static class FieldsGrouper implements CustomStreamGrouping {
private Fields outFields;
private List<List<Integer>> targetTasks;
private Fields groupFields;
private int numTasks;
public FieldsGrouper(Fields outFields, Grouping thriftGrouping) {
this.outFields = outFields;
this.groupFields = new Fields(Thrift.fieldGrouping(thriftGrouping));
}
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = new ArrayList<List<Integer>>();
for (Integer targetTask : targetTasks) {
this.targetTasks.add(Collections.singletonList(targetTask));
}
this.numTasks = targetTasks.size();
}
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
int targetTaskIndex = TupleUtils.chooseTaskIndex(outFields.select(groupFields, values), numTasks);
return targetTasks.get(targetTaskIndex);
}
}
复制代码
- 对选中fields的values通过TupleUtils.chooseTaskIndex选择task下标;chooseTaskIndex主要是采用Arrays.deepHashCode取哈希值然后对numTask向下取模
GlobalGrouper
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
public static class GlobalGrouper implements CustomStreamGrouping {
private List<Integer> targetTasks;
public GlobalGrouper() {
}
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
}
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
if (targetTasks.isEmpty()) {
return null;
}
// It's possible for target to have multiple tasks if it reads multiple sources
return Collections.singletonList(targetTasks.get(0));
}
}
复制代码
- 这里固定取第一个task,即targetTasks.get(0)
NoneGrouper
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
public static class NoneGrouper implements CustomStreamGrouping {
private final Random random;
private List<Integer> targetTasks;
private int numTasks;
public NoneGrouper() {
random = new Random();
}
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
this.numTasks = targetTasks.size();
}
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
int index = random.nextInt(numTasks);
return Collections.singletonList(targetTasks.get(index));
}
}
复制代码
- 这里通过random.nextInt(numTasks)随机取task
AllGrouper
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
public static class AllGrouper implements CustomStreamGrouping {
private List<Integer> targetTasks;
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
}
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
return targetTasks;
}
}
复制代码
- 这里返回所有的targetTasks
ShuffleGrouping
storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java
public class ShuffleGrouping implements CustomStreamGrouping, Serializable {
private ArrayList<List<Integer>> choices;
private AtomicInteger current;
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
choices = new ArrayList<List<Integer>>(targetTasks.size());
for (Integer i : targetTasks) {
choices.add(Arrays.asList(i));
}
current = new AtomicInteger(0);
Collections.shuffle(choices, new Random());
}
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
int rightNow;
int size = choices.size();
while (true) {
rightNow = current.incrementAndGet();
if (rightNow < size) {
return choices.get(rightNow);
} else if (rightNow == size) {
current.set(0);
return choices.get(0);
}
} // race condition with another thread, and we lost. try again
}
}
复制代码
- 这里在prepare的时候对ArrayList<List> choices进行随机化
- 采用current.incrementAndGet()实现round robbin的效果,超过size的时候重置返回第一个,没有超过则返回incr后的index的值
PartialKeyGrouping
storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java
public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {
private static final long serialVersionUID = -1672360572274911808L;
private List<Integer> targetTasks;
private Fields fields = null;
private Fields outFields = null;
private AssignmentCreator assignmentCreator;
private TargetSelector targetSelector;
public PartialKeyGrouping() {
this(null);
}
public PartialKeyGrouping(Fields fields) {
this(fields, new RandomTwoTaskAssignmentCreator(), new BalancedTargetSelector());
}
public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator) {
this(fields, assignmentCreator, new BalancedTargetSelector());
}
public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator, TargetSelector targetSelector) {
this.fields = fields;
this.assignmentCreator = assignmentCreator;
this.targetSelector = targetSelector;
}
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
if (this.fields != null) {
this.outFields = context.getComponentOutputFields(stream);
}
}
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
List<Integer> boltIds = new ArrayList<>(1);
if (values.size() > 0) {
final byte[] rawKeyBytes = getKeyBytes(values);
final int[] taskAssignmentForKey = assignmentCreator.createAssignment(this.targetTasks, rawKeyBytes);
final int selectedTask = targetSelector.chooseTask(taskAssignmentForKey);
boltIds.add(selectedTask);
}
return boltIds;
}
//......
}
复制代码
- 这里通过RandomTwoTaskAssignmentCreator来选中两个taskId,然后选择使用次数小的那个
LoadAwareCustomStreamGrouping
storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
public interface LoadAwareCustomStreamGrouping extends CustomStreamGrouping {
void refreshLoad(LoadMapping loadMapping);
}
复制代码
qMetrics.population() / qMetrics.capacity()
以上所述就是小编给大家介绍的《聊聊storm的CustomStreamGrouping》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
代码整洁之道:程序员的职业素养
罗伯特·C.马丁 (Robert C.Martin) / 余晟、章显洲 / 人民邮电出版社 / 2016-9-1 / 49.00元
1. 汇聚编程大师40余年编程生涯的心得体会 2. 阐释软件工艺中的原理、技术、工具和实践 3. 助力专业软件开发人员具备令人敬佩的职业素养 成功的程序员在以往的工作和生活中都曾经历过大大小小的不确定性,承受过永无休止的压力。他们之所以能够成功,是因为拥有一个共同点,都深切关注创建软件所需的各项实践。他们将软件开发视为一种需要精雕细琢加以修炼的技艺,他们以专业人士的标准要求自己,......一起来看看 《代码整洁之道:程序员的职业素养》 这本书的介绍吧!