Spring整合ActiveMQ项目实战

栏目: Java · 发布时间: 5年前

内容简介:Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件,说白了就是个服务器,主要用来存放请求消息的这篇博客图文并茂,通俗易懂ActiveMQ作用总结笔者将其精炼了一下,主要有4大应用场景:异步处理,应用解耦,流量削锋,消息通讯

引言

Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件,说白了就是个服务器,主要用来存放请求消息的

原理

这篇博客图文并茂,通俗易懂ActiveMQ作用总结

笔者将其精炼了一下,主要有4大应用场景:异步处理,应用解耦,流量削锋,消息通讯

其核心思想都是把用户的请求先存放在MQ中,然后返回用户响应,后台再慢慢去处理MQ中的消息,不需要一条龙业务全部跑完再返回响应,这样的话单位时间内请求数可以更多,响应速度也更快,相当于提高了吞吐量。其实前3种场景都差不多,笔者看来没有绝对的边界,只不过异步处理强调非同时性,应用解耦强调子系统挂掉后MQ体现的作用,流量削锋强调MQ在高并发中体现的作用。消息通讯的业务模式举例子:1.用微信和微信好友聊天 2.微信群聊天

准备工作

源码地址:

安装好activeMQ,如何安装自行百度

项目说明

项目适用jdk1.8,采用idea多模块架构,涉及技术有spring, activemq, tomcat

项目结构

Spring整合ActiveMQ项目实战

client是模拟消费者,domain是公共 工具 包,被maven打成jar供其它项目适用,service是模拟消息生产者

启动activemq服务器

Spring整合ActiveMQ项目实战

双击activemq.bat启动

登陆 http://localhost:8161/admin/queues.jsp ,发现Queues是空的

Spring整合ActiveMQ项目实战

看一下service的配置activemq_config.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
   xmlns:jms="http://www.springframework.org/schema/jms"
   xsi:schemaLocation="http://www.springframework.org/schema/beans  
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
     http://www.springframework.org/schema/context  
     http://www.springframework.org/schema/context/spring-context-3.0.xsd  
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
    http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

   <!-- 这里暴露内部统一使用的MQ地址 -->
   <bean id="internalTargetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
      <property name="brokerURL" value="tcp://localhost:61616" />
   </bean>
   <bean id="internalConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
      destroy-method="stop">
      <property name="connectionFactory" ref="internalTargetConnectionFactory" />
      <property name="maxConnections" value="20" />
   </bean>
   <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
   <bean id="internalJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
      <property name="connectionFactory" ref="internalConnectionFactory" />
   </bean>

   <!-- 推送给用户信息  创建一个Queue-->
   <bean id="userServiceQueue" class="org.apache.activemq.command.ActiveMQQueue">
      <constructor-arg>
         <value>user.service.queue</value>
      </constructor-arg>
   </bean>
   <!-- 推送给新闻信息   创建一个Queue-->
   <bean id="newsServiceQueue" class="org.apache.activemq.command.ActiveMQQueue">
      <constructor-arg>
         <value>news.service.queue</value>
      </constructor-arg>
   </bean>
   <!-- 推送给客户信息   创建一个Queue-->
   <bean id="clientServiceQueue" class="org.apache.activemq.command.ActiveMQQueue">
      <constructor-arg>
         <value>client.service.queue</value>
      </constructor-arg>
   </bean>    
</beans>复制代码

一共3种推送,每种推送对应1个队列名

service结构

Spring整合ActiveMQ项目实战

PushService是1个通用接口,然后3种推送各对应1个实现,使用tomcat启动service服务

登陆localhost:8080

Spring整合ActiveMQ项目实战

按照如上填写后,我们点击推送用户信息,出现如下提示框

Spring整合ActiveMQ项目实战

登陆 http://localhost:8161/admin/queues.jsp ,

Spring整合ActiveMQ项目实战

发现新增了一个队列,待处理消息数量1,消费者数量0,消息排队1,消息已出列0

我们看看后台执行过程

js通过ajax请求到后台

@RequestMapping(value="/user",method=RequestMethod.POST)
@ResponseBody
public ResultRespone userPush(User info){
 ResultRespone respone = new ResultRespone();
 try {
  userPushService.push(info);
  respone.setData(info);
 } catch (Exception e) {
  e.printStackTrace();
  respone = new ResultRespone(false, e.getMessage());
 }
 return respone;
}复制代码

调用push()方法

@Autowired
@Qualifier("userServiceQueue")
private Destination destination;

@Override
public void push(final Object info) {
 pushExecutor.execute(new Runnable() {
  @Override
  public void run() {
   jmsTemplate.send(destination, new MessageCreator() {
    public Message createMessage(Session session) throws JMSException {
      User p = (User) info;
     return session.createTextMessage(JSON.toJSONString(p));
    }
   });
  }    
 });
}复制代码

这个过程实际上将用户属性值组成的字符串发送到了activemq服务器,到此,生产者的任务就完成了

消费端结构

Spring整合ActiveMQ项目实战

主要通过3个listener来接收activemq发送过来的消息

看其中一个UserPushListener.java

@Component("userPushListener")
public class UserPushListener implements MessageListener {
  protected static final Logger logger = Logger.getLogger(UserPushListener.class);
 @Override
 public void onMessage(Message message) {
   logger.info("[UserPushListener.onMessage]:begin onMessage.");
         TextMessage textMessage = (TextMessage) message;
         try {
             String jsonStr = textMessage.getText();
             logger.info("[UserPushListener.onMessage]:receive message is,"+ jsonStr);
             if (jsonStr != null) {
                 User info = JSON.parseObject(jsonStr, User.class);
                 System.out.println("==============================接受到的用户信息 开始====================================");
                 System.out.println(info.toString());
                 System.out.println("==============================接受到的用户信息 结束====================================");
                 WebsocketController.broadcast("user", jsonStr);
             }
         } catch (JMSException e) {
             logger.error("[UserPushListener.onMessage]:receive message occured an exception",e);
         }
         logger.info("[UserPushListener.onMessage]:end onMessage.");
     }
}复制代码

看一下消费端的配置

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
    xmlns:jms="http://www.springframework.org/schema/jms"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans  
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
     http://www.springframework.org/schema/context  
     http://www.springframework.org/schema/context/spring-context-3.0.xsd  
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
    http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">  
     
    <!-- 内部统一使用的MQ地址 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
    </bean>
    <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">  
        <property name="connectionFactory" ref="targetConnectionFactory"/>  
        <property name="maxConnections" value="50"/>
    </bean>
    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>

   <!-- 推送给用户信息 -->
   <bean id="userPushListenerMQ" class="org.apache.activemq.command.ActiveMQQueue">
      <constructor-arg>
         <value>user.service.queue</value>
      </constructor-arg>
   </bean>
   <!-- 推送给新闻信息 -->
   <bean id="newsPushListenerMQ" class="org.apache.activemq.command.ActiveMQQueue">
      <constructor-arg>
         <value>news.service.queue</value>
      </constructor-arg>
   </bean>
   <!-- 推送给客户信息 -->
   <bean id="clientPushListenerMQ" class="org.apache.activemq.command.ActiveMQQueue">
      <constructor-arg>
         <value>client.service.queue</value>
      </constructor-arg>
   </bean>
   
   <!-- 用户接受推送 -->
    <bean id="userPushListenerConsumer"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="userPushListenerMQ" />
        <property name="messageListener" ref="userPushListener" />
    </bean>
    
   <!-- 新闻接受推送 -->
    <bean id="newsPushListenerConsumer"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="newsPushListenerMQ" />
        <property name="messageListener" ref="newsPushListener" />
    </bean>
    
   <!-- 客户接受推送 -->
    <bean id="clientPushListenerConsumer"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="clientPushListenerMQ" />
        <property name="messageListener" ref="clientPushListener" />
    </bean>
</beans>复制代码

消费端监听了3个队列,所以队列一旦有消息,消费端就会监听到,而且activemq可以确认哪些消息被推送成功了

关闭service服务,启动client服务,观察日志

Spring整合ActiveMQ项目实战

成功接收到消息,再次查看 http://localhost:8161/admin/queues.jsp

Spring整合ActiveMQ项目实战

发现user.service.queue这个队列的消息是待处理消息数量0,消费者数量1,消息排队1,消息已出列1,表明消息推送完毕,另外两个新增的队列是客户端监听造成的,可以看出待处理消息的数量都是0


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

软件的奥秘

软件的奥秘

[美] V. Anton Spraul / 解福祥 / 人们邮电出版社 / 2017-9-1 / 49

软件已经成为人们日常生活与工作中常见的辅助工具,但是对于软件的工作原理,很多人却不是非常了解。 本书对软件的工作原理进行了解析,让读者对常用软件的工作原理有一个大致的了解。内容涉及数据如何加密、密码如何使用和保护、如何创建计算机图像、如何压缩和存储视频、如何搜索数据、程序如何解决同样的问题而不会引发冲突以及如何找出最佳路径等方面。 本书适合从事软件开发工作的专业技术人员,以及对软件工作......一起来看看 《软件的奥秘》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

URL 编码/解码
URL 编码/解码

URL 编码/解码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具