使用Spring Integration和Hazelcast进行集群领导者选举

栏目: Java · 发布时间: 5年前

内容简介:最近在检查Spring Integration区域时,我注意到与如果您使用Hazelcast,那么您可以从Hazelcast支持的各种分布式数据结构中提供Spring Integration通道基础架构,如:

最近在检查Spring Integration区域时,我注意到与 Hazelcast datagrid 的非常好的集成。在以下位置查看:

https://github.com/spring-projects/spring-integration-extensions/tree/master/spring-integration-hazelcast

如果您使用Hazelcast,那么您可以从Hazelcast支持的各种分布式数据结构中提供Spring Integration通道基础架构,如:

  • com.hazelcast.core.IMap,
  • com.hazelcast.core.MultiMap,
  • com.hazelcast.core.IList,
  • com.hazelcast.core.ISet,
  • com.hazelcast.core.IQueue,
  • com.hazelcast.core.ITopic,
  • com.hazelcast.core.ReplicatedMap

真正令我印象深刻的是Spring Integration团队通过Hazelcast实施集群领导者选举。让我们来看看并测试它。但首先要做的事情是:

演示任务:我们有两个Spring Boot微服务,每10秒产生一个随机值放入分布式IMap。现在只允许一个微服务在某一时刻消费使用IMap的数据。为了使它变得有点辣,微服务应该在消息传递给其他节点之后放弃其领导。

通过Spring Integration Cluster Leadership解决方案:

首先,您需要将Spring Boot MicroService添加到领导游戏中:

@Bean
    <b>public</b> Candidate nodeService1Candidate() {
        <b>final</b> NodeCandidate candidate = <b>new</b> NodeCandidate(<font>"service1"</font><font>, HazelcastConfiguration.ROLE_JOB_MAP);
        <b>return</b> candidate;
    }


    @Bean
    <b>public</b> LeaderInitiator initiator() {
        <b>final</b> LeaderInitiator leaderInitiator = <b>new</b> LeaderInitiator(hazelcastConfiguration.hazelcastInstance(), nodeService1Candidate());
        <b>return</b> leaderInitiator;
    }
</font>

但这不是全部。我们的目标是在领导权被授予后从IMap开始数据消费,另一方面在领导被撤销后停止数据消费。

为此,我们需要监听org.springframework.integration.leader.DefaultCandidate子类NodeCandidate中的onGranted和onRevoked事件。第一个构造函数参数是节点id,第二个是角色名称。阅读 Spring Integration角色 ,但我不会使用它们。我将手动启动IMap更改生成者。

为了了解数据更改,SI Hazelcast集成提供了HazelcastEventDrivenMessageProducer,它可以监听分布式IMap更改并将适当的数据更改事件委派给Spring Integration通道基础结构。

@Configuration
<b>public</b> <b>class</b> HazelcastConfiguration {
    .
    .
    @Bean
    <b>public</b> IMap<String, String> getDistributedMapForJobInput() {
        <b>return</b> hazelcastInstance().getMap(INPUT_JOB_MAP);
    }

    @Bean
    <b>public</b> MessageChannel inputJobChannel() {
        <b>return</b> <b>new</b> DirectChannel();
    }

    @Bean
    <b>public</b> HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer() {
        <b>final</b> HazelcastEventDrivenMessageProducer producer =
                <b>new</b> HazelcastEventDrivenMessageProducer(
                        getDistributedMapForJobInput()
                );
        producer.setOutputChannel(inputJobChannel());
        producer.setCacheEventTypes(<font>"ADDED,REMOVED,UPDATED,CLEAR_ALL"</font><font>);
        producer.setCacheListeningPolicy(CacheListeningPolicyType.SINGLE);
        producer.setAutoStartup(false);

        <b>return</b> producer;
    }
}
</font>

setAutostartup(false)的注意事项。我们希望让这位生产者在获得领导时能够开始启动:

<font><i>/**
 * Created by tomask79 on 24.08.17.
 */</i></font><font>
<b>public</b> <b>class</b> NodeCandidate <b>extends</b> DefaultCandidate {

    @Autowired
    <b>private</b> HazelcastConfiguration hazelcastConfiguration;

    <b>public</b> NodeCandidate(String nodeId, String role) {
        <b>super</b>(nodeId, role);
    }

    @Override
    <b>public</b> <b>void</b> onGranted(Context ctx) {
        <b>super</b>.onGranted(ctx);
        System.out.println(</font><font>"Leader granted to: "</font><font>+ctx.toString());
        hazelcastConfiguration.hazelcastEventDrivenMessageProducer().start();
    }

    @Override
    <b>public</b> <b>void</b> onRevoked(Context ctx) {
        <b>super</b>.onRevoked(ctx);
        System.out.println(</font><font>"Leader revoked to: "</font><font>+ctx.toString());
        hazelcastConfiguration.hazelcastEventDrivenMessageProducer().stop();
    }
}
</font>

最后一项任务是消费来自分布式IMap的消息并放弃领导,以便其他节点可以接受工作并享受一些乐趣。因此,让我们声明ServiceActivator监听来自jobInputChannel DirectChannel 的数据:

@Bean 
    @ServiceActivator(inputChannel =“inputJobChannel”)
    <b>public</b> MessageHandler logger(){ 
        <b>return</b> <b>new</b> LogAndGiveInitiatorHandler(); 
    }        

将消息记录到标准输出:

<font><i>/**
 * Created by tomask79 on 24.08.17.
 */</i></font><font>
<b>public</b> <b>class</b> LogAndGiveInitiatorHandler implements MessageHandler{

    @Autowired
    <b>private</b> JobServices jobServices;

    @Override
    <b>public</b> <b>void</b> handleMessage(Message<?> message) throws MessagingException {
        System.out.println(message.toString());
        System.out.println(</font><font>"Waiting for another node to take the work...!"</font><font>);
        jobServices.giveUp();
        <b>try</b> {
            Thread.sleep(5000);
        } <b>catch</b> (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(</font><font>"........"</font><font>);
    }
}
</font>

并命令微服务放弃其领导地位:

<font><i>/**
 * Created by tomask79 on 10.08.17.
 */</i></font><font>
@Service
<b>public</b> <b>class</b> JobServices {    
    @Autowired
    <b>private</b> LeaderInitiator initiator;
    .
    .
    <b>public</b> <b>void</b> giveUp() {
        <b>if</b> (initiator.getContext().isLeader()) {
            System.out.println(</font><font>"Giving up on leadership: "</font><font>+initiator.getContext().toString());
            initiator.getContext().yield();
        }
    }
}
</font>

就是这样!让我们测试整个包。

  • git clone https://bitbucket.org/tomask79/spring-leader-hazelcast.git
  • mvn clean all (in the directory with top pom.xml to build all three projects)

输出:

[INFO] Reactor Summary:
<p>[INFO] 
<p>[INFO] spring-cloud-cluster-demo .......................... SUCCESS [  0.412 s]
<p>[INFO] spring-microservice-hazelcast ...................... SUCCESS [  2.380 s]
<p>[INFO] spring-microservice-service1 ....................... SUCCESS [  3.685 s]
<p>[INFO] spring-microservice-service2 ....................... SUCCESS [  2.745 s]
<p>[INFO] ------------------------------------------------------------------------
<p>[INFO] BUILD SUCCESS
<p>[INFO] ------------------------------------------------------------------------
<p>[INFO] Total time: 10.047 s
<p>[INFO] Finished at: 2017-08-28T19:57:53+02:00
<p>[INFO] Final Memory: 40M/532M
<p>[INFO] ------------------------------------------------------------------------

现在打开两个终端并运行:

  • java -jar spring-microservice-service1 / target / service1-0.0.1-SNAPSHOT.war(在第一个终端)
  • java -jar spring-microservice-service2 / target / service2-0.0.1-SNAPSHOT.war(在第二个终端)

要验证两个微服务是否形成有效的Hazelcast集群,您应该看到类似的内容:

Members [2] {
    Member [192.168.1.112]:5702
    Member [192.168.1.112]:5701 <b>this</b>
}

在形成Hazelcast群集设置后,您应该看到以下输出

第一终端(获取领导权并放弃给服务2):

[st-leadership-0] com.example.hazelcast.NodeCandidate      : DefaultCandidate{role=leader, id=service1} has been granted leadership; context: HazelcastContext{role=leader, id=service1, isLeader=<b>true</b>}
Leader granted to: HazelcastContext{role=leader, id=service1, isLeader=<b>true</b>}
<p>[st-leadership-0] .h.i.HazelcastEventDrivenMessageProducer : started hazelcastEventDrivenMessageProducer
GenericMessage [payload=EntryEventMessagePayload [key=service18eff005d-6da8-4fb8-b747-f977ad8e1544, value=a61b5f9a-1b96-493d-b240-61ccb549ba17, oldValue=<b>null</b>], headers={hazelcast_cacheName=randomInputDataMap, hazelcast_member=/192.168.1.112:5702, id=f9c5455b-b42d-3ab7-ec49-9bd33db9ec5f, hazelcast_eventType=ADDED, timestamp=1503945864993}]
Waiting <b>for</b> another node to take the work...!
Giving up on leadership: HazelcastContext{role=leader, id=service1, isLeader=<b>true</b>}

第二终端(获取领导权并放弃给服务1)

Leader granted to: HazelcastContext{role=leader, id=service2, isLeader=<b>true</b>}
2017-08-28 20:47:08.001  INFO 1357 --- [st-leadership-0] .h.i.HazelcastEventDrivenMessageProducer : started hazelcastEventDrivenMessageProducer
2017-08-28 20:47:08.019  INFO 1357 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8082 (http)
2017-08-28 20:47:08.029  INFO 1357 --- [           main] c.e.SpringMicroserviceServiceComponent   : Started SpringMicroserviceServiceComponent in 12.807 seconds (JVM running <b>for</b> 13.507)
........
GenericMessage [payload=EntryEventMessagePayload [key=service249cde108-5045-4b77-84f7-cdc9f524df04, value=c4fe775f-e44d-4f10-ac43-0fe7157c0e67, oldValue=<b>null</b>], headers={hazelcast_cacheName=randomInputDataMap, hazelcast_member=/192.168.1.112:5701, id=df474178-1ff1-35e5-e1e1-d3f6f25d6d68, hazelcast_eventType=ADDED, timestamp=1503946037904}]
Waiting <b>for</b> another node to take the work...!
Giving up on leadership: HazelcastContext{role=leader, id=service2, isLeader=<b>true</b>}

总结

只是一些想法。如果在生产过程中流经系统的消息数量每天只有几千(我们在Embedit的生产系统中的速率),那么Hazelcast肯定是一种有过度杀伤力工作。建议始终使用JMS / AMPQ以循环方式将数据分发到您的节点。 但是当处理存储在内存中的大数据时。你不应该错过由Hazelcast支持的Spring Integration Election算法。


以上所述就是小编给大家介绍的《使用Spring Integration和Hazelcast进行集群领导者选举》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

PHP for the World Wide Web, Second Edition (Visual QuickStart Gu

PHP for the World Wide Web, Second Edition (Visual QuickStart Gu

Larry Ullman / Peachpit Press / 2004-02-02 / USD 29.99

So you know HTML, even JavaScript, but the idea of learning an actual programming language like PHP terrifies you? Well, stop quaking and get going with this easy task-based guide! Aimed at beginning ......一起来看看 《PHP for the World Wide Web, Second Edition (Visual QuickStart Gu》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

html转js在线工具
html转js在线工具

html转js在线工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试