内容简介:Hazelcast JET目前是分布式计算框架领域的新成员。根据Hazelcast团队的说法,他们甚至比Hazelcast JET和Spring Boot依赖项设置:并且让我们尝试创建一个Hazelcast JET实例并最终将其关闭,以便验证我们是否能够正确启动Hazelcast JET:
Hazelcast JET目前是分布式计算框架领域的新成员。根据Hazelcast团队的说法,他们甚至比 Apache Spark 和 Apache Flink 更快。查看 基准 。让我们看看如何使用Hazelcast JET和 Spring Boot ,当然我会像往常一样构建一个简单的演示。
Hazelcast JET和Spring Boot依赖项设置:
<dependency> <groupId>com.hazelcast.jet</groupId> <artifactId>hazelcast-jet</artifactId> <version>0.3.2-SNAPSHOT</version> </dependency>
并且让我们尝试创建一个Hazelcast JET实例并最终将其关闭,以便验证我们是否能够正确启动Hazelcast JET:
<b>try</b> { JetInstance instance = Jet.newJetInstance(); } <b>finally</b> { Jet.shutdownAll(); }
不幸的是,Spring Boot应用程序内部的Hazelcast JET实例创建将崩溃:
java.lang.NoSuchFieldError: JET at com.hazelcast.jet.impl.config.XmlJetConfigBuilder.getXmlType(XmlJetConfigBuilder.java:128) ~[hazelcast-jet-0.3.2-SNAPSHOT.jar!/:0.3.2-SNAPSHOT] at com.hazelcast.config.AbstractXmlConfigHelper.getNamespaceType(AbstractXmlConfigHelper.java:136) ~[hazelcast-3.7.6.jar!/:3.7.6] at com.hazelcast.config.AbstractXmlConfigHelper.<init>(AbstractXmlConfigHelper.java:72) ~[hazelcast-3.7.6.jar!/:3.7.6] at com.hazelcast.config.AbstractConfigBuilder.<init>(AbstractConfigBuilder.java:59) ~[hazelcast-3.7.6.jar!/:3.7.6] at com.hazelcast.jet.impl.config.XmlJetConfigBuilder.<init>(XmlJetConfigBuilder.java:67) ~[hazelcast-jet-0.3.2-SNAPSHOT.jar!/:0.3.2-SNAPSHOT] at com.hazelcast.jet.impl.config.XmlJetConfigBuilder.getConfig(XmlJetConfigBuilder.java:80) ~[hazelcast-jet-0.3.2-SNAPSHOT.jar!/:0.3.2-SNAPSHOT]
好吧,不要责怪Hazelcast JET团队,只是Spring Boot已经内置了Hazelcast IMDG(Spring Boot 1.5.3版本为3.7.6),其中这个版本包含了hazelcast ConfigType枚举,但没有包含JET字段。就这样。解决此问题的方法是在您的pom中升级Hazelcast IMDG。对我有用的是将hazelcast.version属性设置为:
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> <hazelcast.version>3.8</hazelcast.version> </properties>
在这个解决方法之后,一切都应该工作正常,你应该能够看到他们漂亮的ascii艺术:-)
Hazelcast JET架构
在进入JET演示之前,让我们描述一下它的架构。Hazelcast JET基于将计算工作组合到 DAG图中 。这不是革命性的东西,因为Apache Spark也基于DAG工作。但是让我们更深入地了解Hazelcast JET DAG术语::
- Vertex 是JET作业的一个工作单元。把它作为计算工作的一步。一个Vertex 包含一个或多个您实现的处理器( 类AbstractProcessor )来执行您想要的逻辑。每个Vertex 可以有更多的处理器。Number基于本地和全局并行设置。你可以有SINK Vertex (只有输入,不发射任何东西),INTERNAL Vertex (有输入并向其序数发射输出)和SOURCE Vertex (仅发射)。
- DAG是Vertex 的边缘连接器。每次创建JET作业时,都必须将处理器实现与Edges连接。
为了向您展示一个示例,我们可以构建一个简单的JET Job,它将 RabbitMQ代理中的 数据注入到分布式Hazelcast Map中。我们的工作将包含两个Vertex ,第一个Vertex 将发出来自RabbitMQ的项目,第二个Vertex 将是简单的Sink进入Map。
<b>private</b> <b>static</b> Job createJetJob(JetInstance instance) { DAG dag = <b>new</b> DAG(); Properties props = props( <font>"server"</font><font>, </font><font>"localhost"</font><font>, </font><font>"user"</font><font>, </font><font>"guest"</font><font>, </font><font>"password"</font><font>, </font><font>"guest"</font><font>); Vertex source = dag.newVertex(</font><font>"source"</font><font>, readRabbitMQ(props, </font><font>"jetInputQueue"</font><font>)); Vertex sink = dag.newVertex(</font><font>"sink"</font><font>, writeMap(</font><font>"sink"</font><font>)); dag.edge(between(source, sink)); <b>return</b> instance.newJob(dag); } </font>
详细代码
方法readRabbitMQ返回一个JET Processor实现,用于从RabbitMQ读取消息。该实现的灵感来自官方的 ReadKafkaP处理器 。我刚刚以轮询方式重写了连接器以与RabbitMQ代理进行通信。读取部分的消息在AbstractProcessor.complete方法中。要了解这种方法,您需要了解两件事:
- 重复调用方法,直到返回true。由于我的作业中的第一个Vertex 是Source,因此我的ReadRabbitMQP.complete方法始终返回false。
- 来自RabbitMQ代理的轮询消息需要通过 Traverser接口 返回。另请参阅 Traversers util类以获得方便的方法。
轮询RabbitMQ消息的完整方法:
@Override <b>public</b> <b>boolean</b> complete() { System.out.println(<font>"....Invoking RabbitMQ vertex processor complete..."</font><font>); <b>if</b> (emitCooperatively(traverser)) { <b>final</b> Message message = <b>this</b>.rabbitTemplate.receive(<b>this</b>.queueNames[0]); <b>if</b> (message != <b>null</b>) { System.out.println(</font><font>"Message payload: "</font><font> + <b>new</b> String(message.getBody())); <b>final</b> List<Message> list = <b>new</b> ArrayList<>(); list.add(message); Random rn = <b>new</b> Random(); traverser = traverseStream(list.stream()).map(r -> entry(String.valueOf(rn.nextInt()), <b>new</b> String(r.getBody())) ); } } <b>return</b> false; } </font>
Hazelcast JET和工作分配到集群
将来自RabbitMQ代理的流式传输写入Hazelcast JET时,接收器Vertex 中的每个处理器都将成为消费者,从而完成相同的工作。
来自AMPQ队列的有效负载将在JET处理器之间进行负载平衡。无论如何,为特定Vertex 创建处理器的方式是 ProcessorMetaSupplier 和 ProcessorSupplier 实现的工作。
简而言之,JET使用ProcessorMetaSupplier实现为DAG图中的每个Vertex 获取ProcessorSupplier。然后将ProcessorSupplier发送到Vertex ,根据并行度设置创建处理器。我强烈建议您阅读JET文档中的 NumberGenerator 示例,这有助于了解JET如何创建处理器。
在RabbitMQ流媒体的情况下,我再说一遍,每个处理器都将做同样的工作,因此:
<b>private</b> <b>static</b> <b>final</b> <b>class</b> MetaSupplier<K, V> implements ProcessorMetaSupplier { <b>static</b> <b>final</b> <b>long</b> serialVersionUID = 1L; <b>private</b> <b>final</b> String[] queueNames; <b>private</b> Properties properties; <b>private</b> MetaSupplier(String[] queueNames, Properties properties) { <b>this</b>.queueNames = queueNames; <b>this</b>.properties = properties; } @Override <b>public</b> Function<Address, ProcessorSupplier> get(List<Address> addresses) { <b>return</b> address -> <b>new</b> Supplier<>(queueNames, properties); } } <b>private</b> <b>static</b> <b>class</b> Supplier<K, V> implements ProcessorSupplier { <b>static</b> <b>final</b> <b>long</b> serialVersionUID = 1L; <b>private</b> <b>final</b> String[] queueNames; <b>private</b> <b>final</b> Properties properties; <b>private</b> <b>transient</b> List<Processor> processors; Supplier(String[] topicIds, Properties properties) { <b>this</b>.properties = properties; <b>this</b>.queueNames = topicIds; } @Override <b>public</b> List<Processor> get(<b>int</b> count) { <b>return</b> processors = range(0, count) .mapToObj(i -> <b>new</b> ReadRabbitMQP<>(queueNames, properties)) .collect(toList()); } @Override <b>public</b> <b>void</b> complete(Throwable error) { processors.stream() .filter(p -> p instanceof ReadRabbitMQP) .map(p -> (ReadRabbitMQP) p) .forEach(p -> Util.uncheckRun(p::close)); } }
测试此演示
- git clone https://bitbucket.org/tomask79/spring-hazelcast-jet-streaming.git
- mvn clean install
- 在RabbitMQ中创建一个名为“jetInputQueue”的队列
- java -jar target / demo-0.0.1-SNAPSHOT.jar
现在将一些消息发送到“jetInputQueue”输入4条以后输出:
Received 4 entries in 78080 milliseconds. ....Invoking RabbitMQ vertex processor complete... ....Invoking RabbitMQ vertex processor complete... Received 4 entries in 78182 milliseconds. ....Invoking RabbitMQ vertex processor complete... ....Invoking RabbitMQ vertex processor complete... Received 4 entries in 78283 milliseconds. ....Invoking RabbitMQ vertex processor complete... ....Invoking RabbitMQ vertex processor complete... Received 4 entries in 78385 milliseconds. ....Invoking RabbitMQ vertex processor complete... ....Invoking RabbitMQ vertex processor complete... Received 4 entries in 78486 milliseconds. ....Invoking RabbitMQ vertex processor complete..
我对Hazelcast JET的看法
优点:
- 令人难以置信的性能。与Apache Spark相比,甚至更好一点。
- 非常编码友好的DAG API。仅仅两天后,我就能够编写JET DAG作业。
缺点:
- 更好的容错能力。目前,如果我的RabbitMQ JET消费者死亡,那么整个工作将因数据丢失而中止.. :(
- 我想看看 Apache Hive 对Hazelcast JET的支持。就像编写 SQL 样式语句一样,Hive会为我生成JET DAG,就像它与Apache Spark一样。
无论如何JET引起了我的注意,我期待着下一个版本。
点击标题看原文
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 运行过程+运行效率
- dotnet 判断程序当前使用管理员运行降低权使用普通权限运行
- Flink 集群运行原理兼部署及Yarn运行模式深入剖析-Flink牛刀小试
- MapReduce运行流程分析
- 浅析Docker运行安全
- JavaScript运行原理分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
深入浅出WebAssembly
于航 / 电子工业出版社 / 2018-11 / 128.00元
WebAssembly是一种新的二进制格式,它可以方便地将C/C++等静态语言的代码快速地“运行”在浏览器中,这一特性为前端密集计算场景提供了无限可能。不仅如此,通过WebAssembly技术,我们还可以将基于Unity等游戏引擎开发的大型游戏快速地移植到Web端。WebAssembly技术现在已经被计划设计成W3C的标准,众多浏览器厂商已经提供了对其MVP版本标准的支持。在Google I/O ......一起来看看 《深入浅出WebAssembly》 这本书的介绍吧!