Hazelcast JET在Spring Boot上运行

栏目: Java · 发布时间: 5年前

内容简介:Hazelcast JET目前是分布式计算框架领域的新成员。根据Hazelcast团队的说法,他们甚至比Hazelcast JET和Spring Boot依赖项设置:并且让我们尝试创建一个Hazelcast JET实例并最终将其关闭,以便验证我们是否能够正确启动Hazelcast JET:

Hazelcast JET目前是分布式计算框架领域的新成员。根据Hazelcast团队的说法,他们甚至比 Apache SparkApache Flink 更快。查看 基准 。让我们看看如何使用Hazelcast JET和 Spring Boot ,当然我会像往常一样构建一个简单的演示。

Hazelcast JET和Spring Boot依赖项设置:

<dependency>
    <groupId>com.hazelcast.jet</groupId>
    <artifactId>hazelcast-jet</artifactId>
    <version>0.3.2-SNAPSHOT</version>
</dependency>

并且让我们尝试创建一个Hazelcast JET实例并最终将其关闭,以便验证我们是否能够正确启动Hazelcast JET:

<b>try</b> {
        JetInstance instance = Jet.newJetInstance();
 } <b>finally</b> {
        Jet.shutdownAll();
 }

不幸的是,Spring Boot应用程序内部的Hazelcast JET实例创建将崩溃:

java.lang.NoSuchFieldError: JET
    at com.hazelcast.jet.impl.config.XmlJetConfigBuilder.getXmlType(XmlJetConfigBuilder.java:128) ~[hazelcast-jet-0.3.2-SNAPSHOT.jar!/:0.3.2-SNAPSHOT]
    at com.hazelcast.config.AbstractXmlConfigHelper.getNamespaceType(AbstractXmlConfigHelper.java:136) ~[hazelcast-3.7.6.jar!/:3.7.6]
    at com.hazelcast.config.AbstractXmlConfigHelper.<init>(AbstractXmlConfigHelper.java:72) ~[hazelcast-3.7.6.jar!/:3.7.6]
    at com.hazelcast.config.AbstractConfigBuilder.<init>(AbstractConfigBuilder.java:59) ~[hazelcast-3.7.6.jar!/:3.7.6]
    at com.hazelcast.jet.impl.config.XmlJetConfigBuilder.<init>(XmlJetConfigBuilder.java:67) ~[hazelcast-jet-0.3.2-SNAPSHOT.jar!/:0.3.2-SNAPSHOT]
    at com.hazelcast.jet.impl.config.XmlJetConfigBuilder.getConfig(XmlJetConfigBuilder.java:80) ~[hazelcast-jet-0.3.2-SNAPSHOT.jar!/:0.3.2-SNAPSHOT]

好吧,不要责怪Hazelcast JET团队,只是Spring Boot已经内置了Hazelcast IMDG(Spring Boot 1.5.3版本为3.7.6),其中这个版本包含了hazelcast ConfigType枚举,但没有包含JET字段。就这样。解决此问题的方法是在您的pom中升级Hazelcast IMDG。对我有用的是将hazelcast.version属性设置为:

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <hazelcast.version>3.8</hazelcast.version>
</properties>

在这个解决方法之后,一切都应该工作正常,你应该能够看到他们漂亮的ascii艺术:-)

Hazelcast JET架构

在进入JET演示之前,让我们描述一下它的架构。Hazelcast JET基于将计算工作组合到 DAG图中 。这不是革命性的东西,因为Apache Spark也基于DAG工作。但是让我们更深入地了解Hazelcast JET DAG术语::

  • Vertex 是JET作业的一个工作单元。把它作为计算工作的一步。一个Vertex 包含一个或多个您实现的处理器( 类AbstractProcessor )来执行您想要的逻辑。每个Vertex 可以有更多的处理器。Number基于本地和全局并行设置。你可以有SINK Vertex (只有输入,不发射任何东西),INTERNAL Vertex (有输入并向其序数发射输出)和SOURCE Vertex (仅发射)。
  • DAG是Vertex 的边缘连接器。每次创建JET作业时,都必须将处理器实现与Edges连接。

为了向您展示一个示例,我们可以构建一个简单的JET Job,它将 RabbitMQ代理中的 数据注入到分布式Hazelcast Map中。我们的工作将包含两个Vertex ,第一个Vertex 将发出来自RabbitMQ的项目,第二个Vertex 将是简单的Sink进入Map。

<b>private</b> <b>static</b> Job createJetJob(JetInstance instance) {
        DAG dag = <b>new</b> DAG();
        Properties props = props(
                <font>"server"</font><font>, </font><font>"localhost"</font><font>,
                </font><font>"user"</font><font>, </font><font>"guest"</font><font>,
                </font><font>"password"</font><font>, </font><font>"guest"</font><font>);
        Vertex source = dag.newVertex(</font><font>"source"</font><font>, readRabbitMQ(props, </font><font>"jetInputQueue"</font><font>));
        Vertex sink = dag.newVertex(</font><font>"sink"</font><font>, writeMap(</font><font>"sink"</font><font>));
        dag.edge(between(source, sink));
        <b>return</b> instance.newJob(dag);
    }
</font>

详细代码

方法readRabbitMQ返回一个JET Processor实现,用于从RabbitMQ读取消息。该实现的灵感来自官方的 ReadKafkaP处理器 。我刚刚以轮询方式重写了连接器以与RabbitMQ代理进行通信。读取部分的消息在AbstractProcessor.complete方法中。要了解这种方法,您需要了解两件事:

  • 重复调用方法,直到返回true。由于我的作业中的第一个Vertex 是Source,因此我的ReadRabbitMQP.complete方法始终返回false。
  • 来自RabbitMQ代理的轮询消息需要通过 Traverser接口 返回。另请参阅 Traversers  util类以获得方便的方法。

轮询RabbitMQ消息的完整方法:

@Override
    <b>public</b> <b>boolean</b> complete() {
        System.out.println(<font>"....Invoking RabbitMQ vertex processor complete..."</font><font>);
        <b>if</b> (emitCooperatively(traverser)) {
            <b>final</b> Message message =
                    <b>this</b>.rabbitTemplate.receive(<b>this</b>.queueNames[0]);
            <b>if</b> (message != <b>null</b>) {
                System.out.println(</font><font>"Message payload: "</font><font> + <b>new</b> String(message.getBody()));
                <b>final</b> List<Message> list = <b>new</b> ArrayList<>();
                list.add(message);
                Random rn = <b>new</b> Random();
                traverser = traverseStream(list.stream()).map(r ->
                        entry(String.valueOf(rn.nextInt()), <b>new</b> String(r.getBody()))
                );
            }
        }
        <b>return</b> false;
    }
</font>

Hazelcast JET和工作分配到集群

将来自RabbitMQ代理的流式传输写入Hazelcast JET时,接收器Vertex 中的每个处理器都将成为消费者,从而完成相同的工作。

来自AMPQ队列的有效负载将在JET处理器之间进行负载平衡。无论如何,为特定Vertex 创建处理器的方式是 ProcessorMetaSupplierProcessorSupplier 实现的工作。

简而言之,JET使用ProcessorMetaSupplier实现为DAG图中的每个Vertex 获取ProcessorSupplier。然后将ProcessorSupplier发送到Vertex ,根据并行度设置创建处理器。我强烈建议您阅读JET文档中的 NumberGenerator 示例,这有助于了解JET如何创建处理器。

在RabbitMQ流媒体的情况下,我再说一遍,每个处理器都将做同样的工作,因此:

<b>private</b> <b>static</b> <b>final</b> <b>class</b> MetaSupplier<K, V> implements ProcessorMetaSupplier {

        <b>static</b> <b>final</b> <b>long</b> serialVersionUID = 1L;
        <b>private</b> <b>final</b> String[] queueNames;
        <b>private</b> Properties properties;

        <b>private</b> MetaSupplier(String[] queueNames, Properties properties) {
            <b>this</b>.queueNames = queueNames;
            <b>this</b>.properties = properties;
        }

        @Override
        <b>public</b> Function<Address, ProcessorSupplier> get(List<Address> addresses) {
            <b>return</b> address -> <b>new</b> Supplier<>(queueNames, properties);
        }
    }

    <b>private</b> <b>static</b> <b>class</b> Supplier<K, V> implements ProcessorSupplier {

        <b>static</b> <b>final</b> <b>long</b> serialVersionUID = 1L;

        <b>private</b> <b>final</b> String[] queueNames;
        <b>private</b> <b>final</b> Properties properties;
        <b>private</b> <b>transient</b> List<Processor> processors;

        Supplier(String[] topicIds, Properties properties) {
            <b>this</b>.properties = properties;
            <b>this</b>.queueNames = topicIds;
        }

        @Override
        <b>public</b> List<Processor> get(<b>int</b> count) {
            <b>return</b> processors = range(0, count)
                    .mapToObj(i -> <b>new</b> ReadRabbitMQP<>(queueNames, properties))
                    .collect(toList());
        }

        @Override
        <b>public</b> <b>void</b> complete(Throwable error) {
            processors.stream()
                    .filter(p -> p instanceof ReadRabbitMQP)
                    .map(p -> (ReadRabbitMQP) p)
                    .forEach(p -> Util.uncheckRun(p::close));
        }
    }

测试此演示

  • git clone https://bitbucket.org/tomask79/spring-hazelcast-jet-streaming.git
  • mvn clean install
  • 在RabbitMQ中创建一个名为“jetInputQueue”的队列
  • java -jar target / demo-0.0.1-SNAPSHOT.jar

现在将一些消息发送到“jetInputQueue”输入4条以后输出:

Received 4 entries in 78080 milliseconds.
....Invoking RabbitMQ vertex processor complete...
....Invoking RabbitMQ vertex processor complete...
Received 4 entries in 78182 milliseconds.
....Invoking RabbitMQ vertex processor complete...
....Invoking RabbitMQ vertex processor complete...
Received 4 entries in 78283 milliseconds.
....Invoking RabbitMQ vertex processor complete...
....Invoking RabbitMQ vertex processor complete...
Received 4 entries in 78385 milliseconds.
....Invoking RabbitMQ vertex processor complete...
....Invoking RabbitMQ vertex processor complete...
Received 4 entries in 78486 milliseconds.
....Invoking RabbitMQ vertex processor complete..

我对Hazelcast JET的看法

优点:

  • 令人难以置信的性能。与Apache Spark相比,甚至更好一点。
  • 非常编码友好的DAG API。仅仅两天后,我就能够编写JET DAG作业。

缺点:

  • 更好的容错能力。目前,如果我的RabbitMQ JET消费者死亡,那么整个工作将因数据丢失而中止.. :(
  • 我想看看 Apache Hive 对Hazelcast JET的支持。就像编写 SQL 样式语句一样,Hive会为我生成JET DAG,就像它与Apache Spark一样。

无论如何JET引起了我的注意,我期待着下一个版本。

点击标题看原文


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

复杂网络理论及其应用

复杂网络理论及其应用

汪小帆、李翔、陈关荣 / 清华大学出版社 / 2006 / 45.00元

国内首部复杂网络专著 【图书目录】 第1章 引论 1.1 引言 1.2 复杂网络研究简史 1.3 基本概念 1.4 本书内容简介 参考文献 第2章 网络拓扑基本模型及其性质 2.1 引言 2.2 规则网络 2.3 随机图 2.4 小世界网络模型 2.5 无标度网络模型 ......一起来看看 《复杂网络理论及其应用》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器