内容简介:在ignite中,有传统的MapReduce模型的分布式计算,也有基于分布式存储的并置计算,当数据分散到不同的节点上时,根据提供的并置键,计算会传播到数据所在的节点进行计算,再结合数据并置,相关联的数据存储在相同节点,这样可以避免在计算过程中涉及到大量的数据移动,有效保证计算的性能。ignite分布式计算的主要特点如下:Ignite计算网格可以对集群或者集群组内的任何闭包进行广播和负载平衡,包括纯Java的
ignite分布式计算
在ignite中,有传统的MapReduce模型的分布式计算,也有基于分布式存储的并置计算,当数据分散到不同的节点上时,根据提供的并置键,计算会传播到数据所在的节点进行计算,再结合数据并置,相关联的数据存储在相同节点,这样可以避免在计算过程中涉及到大量的数据移动,有效保证计算的性能。
ignite分布式计算的主要特点如下:
特性 | 描述 |
---|---|
自动部署 | 计算用到的类可以自动传播,而不需要在每个节点都部署相关的类,这个可以通过配置 peerClassLoadingEnabled 选项开启计算类的自动传播,但是缓存的实体类是无法自动传播的。 |
平衡加载 | 数据在加载之后会在集群中进行一个再平衡的过程,保证数据均匀分布在各个节点,当有计算在集群中执行的时候,可以根据提供的并置键定位到数据所在节点进行计算,也就是并置计算。 |
故障转移 | 当节点出现故障或者其它计算的时候,任务会自动转移到集群中的其他节点执行 |
1.分布式闭包:
Ignite计算网格可以对集群或者集群组内的任何闭包进行广播和负载平衡,包括纯 Java 的 runnables
和 callables
闭包类型 | 功能 |
---|---|
broadcast | 将任务传播到部分指定节点或者全部节点 |
call/run | 执行单个任务或者任务集 |
apply | apply接收一个闭包和一个集合作为参数,生成与参数数量等量的任务,每个任务分别是将闭包应用在其中一个参数上,并且会返回结果集。 |
ComputeTestController.java
/** broadCast测试*/ @RequestMapping("/broadcast") String broadcastTest(HttpServletRequest request, HttpServletResponse response) { // IgniteCompute compute = ignite.compute(ignite.cluster().forRemotes()); //只传播远程节点 IgniteCompute compute = ignite.compute(); compute.broadcast(() -> System.out.println("Hello Node: " + ignite.cluster().localNode().id())); return "all executed."; } /** call和run测试 */ @RequestMapping("/call") public @ResponseBody String callTest(HttpServletRequest request, HttpServletResponse response) { Collection<IgniteCallable<Integer>> calls = new ArrayList<>(); /** call */ System.out.println("-----------call-----------"); for(String word : "How many characters".split(" ")) { calls.add(word::length); // calls.add(() -> word.length()); } Collection<Integer> res = ignite.compute().call(calls); int total = res.stream().mapToInt(Integer::intValue).sum(); System.out.println(String.format("the total lengths of all words is [%s].", total)); /** run */ System.out.println("-----------run-----------"); for (String word : "Print words on different cluster nodes".split(" ")) { ignite.compute().run(() -> System.out.println(word)); } /** async call */ System.out.println("-----------async call-----------"); IgniteCompute asyncCompute = ignite.compute().withAsync(); asyncCompute.call(calls); asyncCompute.future().listen(fut -> { Collection<Integer> result = (Collection<Integer>)fut.get(); int t = result.stream().mapToInt(Integer::intValue).sum(); System.out.println("Total number of characters: " + total); }); /** async run */ System.out.println("-----------async run-----------"); Collection<ComputeTaskFuture<?>> futs = new ArrayList<>(); asyncCompute = ignite.compute().withAsync(); for (String word : "Print words on different cluster nodes".split(" ")) { asyncCompute.run(() -> System.out.println(word)); futs.add(asyncCompute.future()); } futs.stream().forEach(ComputeTaskFuture::get); return "all executed."; } /** apply测试 */ @RequestMapping("/apply") public @ResponseBody String applyTest(HttpServletRequest request, HttpServletResponse response) { /** apply */ System.out.println("-----------apply-----------"); IgniteCompute compute = ignite.compute(); Collection<Integer> res = compute.apply( String::length, Arrays.asList("How many characters".split(" ")) ); int total = res.stream().mapToInt(Integer::intValue).sum(); System.out.println(String.format("the total lengths of all words is [%s].", total)); /** async apply */ IgniteCompute asyncCompute = ignite.compute().withAsync(); res = asyncCompute.apply( String::length, Arrays.asList("How many characters".split(" ")) ); asyncCompute.future().listen(fut -> { int t = ((Collection<Integer>)fut.get()).stream().mapToInt(Integer::intValue).sum(); System.out.println(String.format("Total number of characters: " + total)); }); return "all executed."; }
2. MapReduce:
在ignite中MapReduce的实现是 ComputeTask
,其主要方法是map()和reduce(),map()可以控制任务映射到节点的过程,而reduce()则是对最终计算结果集的一个处理。 ComputeTask
有两个主要实现 ComputeTaskAdapter
和 ComputeTaskSplitAdapter
, 主要的区别在于 ComputeTaskAdapter
需要手动实现map()方法,而 ComputeTaskSplitAdapter
可以自动映射任务。
ComputeTaskAdapter:
/**ComputeTaskAdapter*/ @RequestMapping("/taskMap") public @ResponseBody String taskMapTest(HttpServletRequest request, HttpServletResponse response) { /**ComputeTaskMap*/ int cnt = ignite.compute().execute(MapExampleCharacterCountTask.class, "Hello Ignite Enable World!"); System.out.println(String.format(">>> Total number of characters in the phrase is %s.", cnt)); return "all executed."; } private static class MapExampleCharacterCountTask extends ComputeTaskAdapter<String, Integer> { /**节点映射*/ @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> nodes, String arg) throws IgniteException { Map<ComputeJob, ClusterNode> map = new HashMap<>(); Iterator<ClusterNode> it = nodes.iterator(); for (final String word : arg.split(" ")) { // If we used all nodes, restart the iterator. if (!it.hasNext()) { it = nodes.iterator(); } ClusterNode node = it.next(); map.put(new ComputeJobAdapter() { @Override public Object execute() throws IgniteException { System.out.println("-------------------------------------"); System.out.println(String.format(">>> Printing [%s] on this node from ignite job.", word)); return word.length(); } }, node); } return map; } /**结果汇总*/ @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteException { int sum = 0; for (ComputeJobResult res : results) { sum += res.<Integer>getData(); } return sum; } }
运行结果:
------------------------------------- >>> Printing [Ignite] on this node from ignite job. ------------------------------------- >>> Printing [World!] on this node from ignite job. >>> Total number of characters in the phrase is 23.
ComputeTaskSplitAdapter:
/**ComputeTaskSplitAdapter*/ @RequestMapping("/taskSplit") public @ResponseBody String taskSplitTest(HttpServletRequest request, HttpServletResponse response) { /**ComputeTaskSplitAdapter(自动映射) */ int result = ignite.compute().execute(SplitExampleDistributedCompute.class, null); System.out.println(String.format(">>> result: [%s]", result)); return "all executed."; } private static class SplitExampleDistributedCompute extends ComputeTaskSplitAdapter<String, Integer> { @Override protected Collection<? extends ComputeJob> split(int gridSize, String arg) throws IgniteException { Collection<ComputeJob> jobs = new LinkedList<>(); jobs.add(new ComputeJobAdapter() { @Override public Object execute() throws IgniteException { // IgniteCache<Long, Student> cache = Ignition.ignite().cache(CacheKeyConstant.STUDENT); IgniteCache<Long, BinaryObject> cache = Ignition.ignite().cache(CacheKeyConstant.STUDENT).withKeepBinary(); /**普通查询*/ String sql_query = "name = ? and email = ?"; // SqlQuery<Long, Student> cSqlQuery = new SqlQuery<>(Student.class, sql_query); SqlQuery<Long, BinaryObject> cSqlQuery = new SqlQuery<>(Student.class, sql_query); cSqlQuery.setReplicatedOnly(true).setArgs("student_54", "student_54gmail.com"); // List<Cache.Entry<Long, Student>> result = cache.query(cSqlQuery).getAll(); List<Cache.Entry<Long, BinaryObject>> result = cache.query(cSqlQuery).getAll(); System.out.println("--------------------"); result.stream().map(x -> { Integer studId = x.getValue().field("studId"); String name = x.getValue().field("name"); return String.format("name=[%s], studId=[%s].", name, studId); }).forEach(System.out::println); System.out.println(String.format("the query size is [%s].", result.size())); return result.size(); } }); return jobs; } @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteException { int sum = results.stream().mapToInt(x -> x.<Integer>getData()).sum(); return sum; } }
运行结果:
-------------------- name=[student_54], studId=[54]. the query size is [1]. >>> result: [1]
MapReduce的局限性:
MapReduce适合解决并行和批处理的场景,不适合串行,迭代和递归一类无法并行和分割任务的场景。
分布式计算存在的问题以及注意点
在使用ignite的分布式计算功能的时候,如果用到了缓存, 并且缓存value不是平台类型(java基础类型),则需要考虑反序列化的问题。
现有两种解决方案:
- 部署缓存实体类包到ignite节点
缓存实体类得实现Serializable接口, 并且得指定serialVersionUID
serialVersionUID表示实体类的当前版本,每个实现Serializable接口的类都有,如果没有的设置该值,java序列化机制会帮你默认生成一个。最好在使用serializable接口时,设定serialVersionUID为某个值,不然当在传输的某一端修改实体类时,serialVersionUID会被虚拟机设置成一个新的值,造成两端的serialVersionUID不一致会发生异常。
public class Student implements Serializable { private static final long serialVersionUID = -5941489737545326242L; .... }
将实体类打包成普通jar包,并放在$IGNITE_HOME/libs/路径下面:
注意:打包的时候不能打包成spring-boot的可执行包,要打包成普通jar包,这样相关类才能正常加载。当然如果集群里的节点均为应用节点,则可以不用考虑这个问题。
-
使用二进制对象对缓存进行操作
Ignite默认使用反序列化值作为最常见的使用场景,要启用
BinaryObject
处理,需要获得一个IgniteCache
的实例然后使用withKeepBinary()
方法。启用之后,如果可能,这个标志会确保从缓存返回的对象都是BinaryObject
格式的。
IgniteCache<Long, BinaryObject> cache = ignite.cache("student").withKeepBinary(); BinaryObject obj = cache.get(k); //获取二进制对象 String name = obj.<String>field("name"); //读取二进制对象属性值<使用field方法>
3.并置计算:
affinityCall(...)
和 affinityRun(...)
方法使作业和缓存着数据的节点位于一处,换句话说,给定缓存名字和关系键,这些方法会试图在指定的缓存中定位键所在的节点,然后在那里执行作业。
并置的两种类型以及区别:
并置 | 特点 |
---|---|
数据并置 | 将相关的缓存数据并置到一起,确保其所有键会缓存在同一个节点上,避免节点间数据移动产生的网络开销。 |
计算并置 | 根据关系键和缓存名称,定位关系键所在节点,并在该节点执行作业单元。 |
ComputeTestController.class
/**并置计算测试*/ @RequestMapping("/affinity") public @ResponseBody String affinityTest(HttpServletRequest request, HttpServletResponse response) { /** affinityRun call */ System.out.println("-----------affinityRun call-----------"); IgniteCompute compute = ignite.compute(); // IgniteCompute compute = ignite.compute(ignite.cluster().forRemotes()); for(int key = 0; key < 100; key++) { // final long k = key; //生成随机k值 final long k = IntStream.generate(() -> (int)(System.nanoTime() % 100)).limit(1).findFirst().getAsInt(); compute.affinityRun(CacheKeyConstant.STUDENT, k, () -> { IgniteCache<Long, BinaryObject> cache = ignite.cache(CacheKeyConstant.STUDENT).withKeepBinary(); BinaryObject obj = cache.get(k); if(obj!=null) { System.out.println(String.format("Co-located[key= %s, value= %s]", k, obj.<String>field("name"))); } }); } IgniteCache<Long, BinaryObject> cache = ignite.cache(CacheKeyConstant.STUDENT).withKeepBinary(); cache.forEach(lo -> compute.affinityRun(CacheKeyConstant.STUDENT, lo.getKey(), () -> { System.out.println(lo.getValue().<String>field("name")); })); return "all executed."; }
运行结果:
-----------affinityRun call----------- student_495 student_496 student_498 ...
至此,ignite分布式计算完毕。
以上所述就是小编给大家介绍的《ignite分布式计算》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- CSharpFlink 分布式实时计算框架,增加 Kafka 数据接口和加载多个计算任务
- 分布式计算框架MapReduce
- 关于分布式计算的一些概念
- 阿里开源首款自研科学计算引擎 Mars :基于张量的统一分布式计算框架
- 在家搭建大数据分布式计算环境
- Onyx 0.12.6 发布,分布式计算系统
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Text Processing in Python
David Mertz / Addison-Wesley Professional / 2003-6-12 / USD 54.99
Text Processing in Python describes techniques for manipulation of text using the Python programming language. At the broadest level, text processing is simply taking textual information and doing som......一起来看看 《Text Processing in Python》 这本书的介绍吧!