五分钟快速了解ActiveMQ,案例简单且详细!

栏目: Java · 发布时间: 6年前

内容简介:最近得闲,探索了一下ActiveMQ。ActiveMQ消息队列,信息收发的容器,作用有异步消息,流量削锋,应用耦合。下载地址:

最近得闲,探索了一下ActiveMQ。

ActiveMQ消息队列,信息收发的容器,作用有异步消息,流量削锋,应用耦合。

安装

下载地址: http://activemq.apache.org/co...

window版本的解压后双击/bin/ activemq.bat 即可启动

它有自己的可视化页面: http://localhost :8161/admin/

五分钟快速了解ActiveMQ,案例简单且详细!

默认访问密码是:admin/admin

如果需要修改在:/conf/jetty-realm.properties 中修改

JmsTemplate

springboot 上整合的,使用spring 的 JmsTemplate 来操作ActiveMQ

一、首先在pom文件中导入所需的jar包坐标:

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-pool</artifactId>
    </dependency>

二、新增一个ActiveMQ的配置文件 spring-jms.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">

    <!-- 配置JMS连接工厂 -->
    <bean id="innerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="${spring.activemq.broker-url}" />
    </bean>
    <!--配置连接池-->
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
        destroy-method="stop">
        <property name="connectionFactory" ref="innerConnectionFactory" />
        <property name="maxConnections" value="100"></property>
    </bean>
    <!-- 配置JMS模板,Spring提供的JMS工具类 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="pooledConnectionFactory" />
        <property name="defaultDestination" ref="JmsSenderDestination" />
        <property name="receiveTimeout" value="10000" />
    </bean>
</beans>

三、在启动类上配置以生效

@ImportResource(locations={"classpath:/config/spring-jms.xml"})

四、在application.properties中配置ActiveMQ 的连接地址

spring.activemq.broker-url=tcp://localhost:61616

准备就绪;开始写生产者和消费者,我这里把生产者和消费者写在一个项目里面。在这之前需要明白两个概念

传递模型

队列(Queue)和主题(Topic)是JMS支持的两种消息传递模型:

  1. 点对点(point-to-point,简称PTP)Queue消息传递模型:
    一个消息生产者对应一个消费者
  2. 发布/订阅(publish/subscribe,简称pub/sub)Topic消息传递模型:
    一个消息生产者对应多个个消费者

QUEUE

  1. 先在 spring-jms.xml 里添加配置一个队列名称 Queue_love

    <bean id="JmsSenderDestination" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg>
        <value>Queue_love</value>
    </constructor-arg>

    </bean>

  2. 创建一个生产者来发送消息; @Qualifier("JmsSenderDestination") 指定了发送到上面配置的 Queue_love 队列

    @Component
    public class JmsSender {
    @Autowired
    private JmsTemplate jmsTemplate;
    
    @Qualifier("JmsSenderDestination")
    @Autowired
    protected Destination destination;
    
    public void sendMessage(final String msg) {
    
        logger.info("QUEUE destination :" + destination.toString() + ", 发送消息:" + msg);
        jmsTemplate.send(destination, new MessageCreator() {
            @Override
            public Message createMessage(final Session session) throws JMSException {
                return session.createTextMessage(msg);
            }
        });
    }}
  3. 创建一个消费者来消费消息:

    @Component
    public class JmsTemplateListener implements MessageListener {
    
    @Override
    public void onMessage(Message message) {
        final TextMessage tm = (TextMessage) message;
        try {
            logger.info("QUEUE接收信息==="+tm.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }}
  4. 消费者需要在 spring-jms.xml 配置一下该消费者需要消费哪个队列的消息

    <!-- 配置消息队列监听者 -->
    <bean id="JmsListener" class="com.mashu.activeMq.jmsTemplate.JmsTemplateListener" />
    
    <!-- 使用spring进行配置 监听 -->
    <bean id="JmsTemplateListenerContainer"
      class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="pooledConnectionFactory"></property>
    <property name="destination" ref="JmsSenderDestination"></property>
    <property name="messageListener" ref="JmsListener"></property>
    <property name="sessionTransacted" value="false"></property>
    <property name="concurrentConsumers" value="6"></property>
    <property name="concurrency" value="2-4"></property>
    <property name="maxConcurrentConsumers" value="10"></property>
    </bean>

这样一个简单的消息队列收发程序已经写好了,看看结果

五分钟快速了解ActiveMQ,案例简单且详细!

发出的消息立马就被消费了,我们可以先把消费者注释掉,只用生产者发送消息就可以在可视化页面上看到消息内容。

五分钟快速了解ActiveMQ,案例简单且详细!

Topic

Topic的方式和Queue类似,只需要在定义队列的时候 calss = org.apache.activemq.command.ActiveMQTopic 即可

<bean id="JmsSenderTDestination" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg>
        <value>Topic_love</value>
    </constructor-arg>
</bean>

Message

除了使用 createTextMessage() 方法发送纯字符串消息,还有

  1. 序列化对象的形式
    session.createObjectMessage() ;
  2. 流的形式,可以用来传递文件
    session.createStreamMessage() ;
  3. 字节的形式
    session.createBytesMessage() ;
  4. map的形式
    session.createMapMessage() ;

安全配置

ActiveMQ在使用的时候和 MySQL 一样,也可以配置用户名密码,默认不没有,我们可以打开:

  1. 在conf/ activemq.xml 添加以下信息(务必在 <systemUsage> 标签上面)

    <plugins>
             <simpleAuthenticationPlugin>
                 <users>
                     <authenticationUser username="${activemq.username}"
                      password="${activemq.password}" groups="users,admins"/>
                 </users>
             </simpleAuthenticationPlugin>
         </plugins>
  2. 对应的用户名密码在/conf/ credentials.properties 中配置

    activemq.username=admin
    activemq.password=123456
    guest.password=password
  3. 那么我们在项目中的 application.properties 需要加上也要用户名密码:

    spring.activemq.broker-url=tcp://localhost:61616
    spring.activemq.user=admin
    spring.activemq.password=123456

消息持久化

默认保存的消息在\data\kahadb目录下;也支持保存到 MySQL 里面;

生产者发送消息存储到数据库,消费者消费后消息从数据库消失。

  1. 修改/conf/activemq.xml

    将:

    <persistenceAdapter>
                <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>

    修改为:

    <persistenceAdapter>
        <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/>
    </persistenceAdapter>
  2. 添加/conf/activemq.xml 配置 MySQL 连接信息

    <bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost:3306/db_activemq?relaxAutoCommit=true"/>
    <property name="username" value="root"/>
    <property name="password" value="123456"/>
    <property name="poolPreparedStatements" value="true"/>
    </bean>
  3. 添加 mysql-connector-java.jar 到/bin目录
  4. 新建数据库db_activemq

重启 ActiveMQ 后数据库产生三个表 activemq_acksactivemq_lockactivemq_msgs


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Haskell School of Expression

The Haskell School of Expression

Paul Hudak / Cambridge University Press / 2000-01 / USD 95.00

Functional programming is a style of programming that emphasizes the use of functions (in contrast to object-oriented programming, which emphasizes the use of objects). It has become popular in recen......一起来看看 《The Haskell School of Expression》 这本书的介绍吧!

html转js在线工具
html转js在线工具

html转js在线工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具