Spring boot 集成 ActiveMQ

栏目: 后端 · 发布时间: 5年前

内容简介:到Apache官方网站下载最新的ActiveMQ的安装包,并解压到本地目录下,下载链接如下:进入bin 目录,如果我们是32位的机器,就双击 win32 目录下的 activemq.bat,如果是64位机器,则双击 win64 目录下的 activemq.bat ,运行结果如下:

安装ActiveMQ

到Apache官方网站下载最新的ActiveMQ的安装包,并解压到本地目录下,下载链接如下: http://activemq.apache.org/do...

Spring boot 集成 ActiveMQ

进入bin 目录,如果我们是32位的机器,就双击 win32 目录下的 activemq.bat,如果是64位机器,则双击 win64 目录下的 activemq.bat ,运行结果如下:

成功之后在浏览器输入 http://127.0.0.1 :8161/ 地址,可以看到 ActiveMQ 的管理页面,用户名和密码默认都是 admin

Spring boot 集成 ActiveMQ

Spring boot 集成 ActiveMQ

Spring Boot 整合 ActiveMQ

工程结构

Spring boot 集成 ActiveMQ

添加 pom 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

config 配置

# activemq
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
#默认为true表示使用内存的activeMQ,不需要安装activeMQ server
spring.activemq.in-memory=true
 
#如果此处设置为true,需要加如下的依赖包
#          <groupId>org.apache.activemq</groupId>
#          <artifactId>activemq-pool</artifactId>
# 否则会自动配置失败,报JmsMessagingTemplate注入失败
spring.activemq.pool.enabled=false

队列模式

创建 消息提供者:Producer.java

import javax.jms.Destination;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
 
@Service
public class Producer {
 
    @Autowired // 也可以注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装
    private JmsMessagingTemplate jmsTemplate;
 
    // 发送消息,destination是发送到的队列,message是待发送的消息
    public void sendMessage(Destination destination, final String message){
        jmsTemplate.convertAndSend(destination, message);
    }
}

创建消费者一: Consumer.java

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
 
@Component
public class Consumer {
 
    // 使用JmsListener配置消费者监听的队列,其中text是接收到的消息
    @JmsListener(destination = "mytest.queue")
    public void receiveQueue(String text) {
        System.out.println("Consumer收到的报文为:"+text);
    }
}

创建消费者二:Consumer1 .java

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
 
@Component
public class Consumer1 {
 
    // 使用JmsListener配置消费者监听的队列,其中text是接收到的消息
    @JmsListener(destination = "mytest.queue")
    public void receiveQueue(String text) {
 
        System.out.println("Consumer1收到的报文为:"+text);
    }
}

测试

创建一个 ActivceMQQueue 对象,表示队列模式,下面会介绍主题模式。

import org.apache.activemq.command.ActiveMQQueue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
 
import javax.jms.Destination;
 
@RunWith(SpringRunner.class)
@SpringBootTest
public class JmsApplicationTests {
 
    @Autowired
    private Producer producer;
 
    @Test
    public void contextLoads() {
 
        Destination destination = new ActiveMQQueue("mytest.queue");
 
        for(int i=0; i<100; i++){
            producer.sendMessage(destination, "myname is chhliu!!!");
        }
    }
}

Spring boot 集成 ActiveMQ

双向队列

使用 @SendTo 注解可以将方法的返回值重新放入到消息队列中,供其他消费者消费

这里我们修改 Consumer1.java 增加注解 @SendTo

public class Consumer1 {
 
    // 使用JmsListener配置消费者监听的队列,其中text是接收到的消息
    @JmsListener(destination = "mytest.queue")
    @SendTo("out.queue")
    public String receiveQueue(String text) {
 
        System.out.println("Consumer1收到的报文为:"+text);
 
        return "return message" + text;
    }
 
    @JmsListener(destination = "out.queue")
    public void outQueue(String text){
        System.out.println("Consumer1 outQueue:"+text);
    }
}

主题模式

配置文件中需要增加

# 启用 topic 模式
spring.jms.pub-sub-domain=true

修改 Consumer1 增加对主题的监听

import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
 
@Component
public class Consumer1 {
 
    // 使用JmsListener配置消费者监听的队列,其中text是接收到的消息
    @JmsListener(destination = "mytest.queue")
    @SendTo("out.queue")
    public String receiveQueue(String text) {
 
        System.out.println("Consumer1 收到的报文为:"+text);
 
        return "return message" + text;
    }
 
    @JmsListener(destination = "out.queue")
    public void outQueue(String text){
        System.out.println("Consumer1 outQueue:"+text);
    }
 
    @JmsListener(destination = "mytest.topic")
    public void topicQueue(String text){
        System.out.println("Consumer 接收到的 topic 消息:" + text);
    }
}

修改测试类

增加发送主题消息

import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
 
import javax.jms.Destination;
 
@RunWith(SpringRunner.class)
@SpringBootTest
public class JmsApplicationTests {
 
    @Autowired
    private Producer producer;
 
    @Test
    public void contextLoads() {
 
        Destination destination = new ActiveMQQueue("mytest.queue");
        Destination topicDestination = new ActiveMQTopic("mytest.topic");
 
        for(int i=0; i<10; i++){
            producer.sendMessage(destination, "Queue Message......");
            producer.sendMessage(topicDestination, "Topic Message!!!");
        }
    }
}

消息的结果为:

Spring boot 集成 ActiveMQ

没有出现 队列消息。此时需要修改我们的监听,指定出是哪种类型的

增加配置类

这个配置类没有在那个工程图上面出现,添加一个类就好。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
 
import javax.jms.ConnectionFactory;
 
@Configuration
@EnableJms
public class JmsConfig {
    @Bean
    public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }
 
    @Bean
    public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(false);
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }
 
}

修改消费者 Consumer1.java

import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
 
@Component
public class Consumer1 {
 
    // 使用JmsListener配置消费者监听的队列,其中text是接收到的消息
    @JmsListener(destination = "mytest.queue", containerFactory = "queueListenerFactory")
    @SendTo("out.queue")
    public String receiveQueue(String text) {
 
        System.out.println("Consumer1 收到的报文为:"+text);
 
        return "return message" + text;
    }
 
    @JmsListener(destination = "out.queue", containerFactory = "queueListenerFactory")
    public void outQueue(String text){
        System.out.println("Consumer1 outQueue:"+text);
    }
 
    @JmsListener(destination = "mytest.topic", containerFactory = "topicListenerFactory")
    public void topicQueue(String text){
        System.out.println("Consumer 接收到的 topic 消息:" + text);
    }
}

重新执行

Spring boot 集成 ActiveMQ


以上所述就是小编给大家介绍的《Spring boot 集成 ActiveMQ》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

白话区块链

白话区块链

蒋勇 / 文延、嘉文 / 机械工业出版社 / 2017-10-1 / 59.00

由浅入深:从比特币开始,到区块链技术的骨骼(密码算法)和灵魂(共识算法),再到目前知名的区块链框架介绍,到最后从零构建一个微型区块链系统(微链),循序渐进。 多图多表:各种示例以及图表,通过流程图与示意图介绍比特币的源码编译、以太坊智能合约的开发部署、超级账本Fabric的配置使用、模拟比特币的微型区块链系统的设计实现等,形象而直观。 白话通俗:通过“村民账本记账”、“百花村选举记账”......一起来看看 《白话区块链》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

URL 编码/解码
URL 编码/解码

URL 编码/解码