SpringBoot 中使用RabbitMQ(一)

栏目: Java · 发布时间: 6年前

内容简介:交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。 这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:Direct, topic, Headers and Fanout

交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。 这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。

交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。

交换机有四种类型:Direct, topic, Headers and Fanout

* Direct:direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去.
  * Topic:按规则转发消息(最灵活)
  * Headers:设置 header attribute 参数类型的交换机
  * Fanout:转发消息到所有绑定队列(广播模式)
复制代码

下面介绍常用的三种模式的基础用法。

SpringBoot 整合

Pom 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
复制代码

application.properties 配置文件

# rabbitmq连接参数
spring.rabbitmq.host=  # mq ip地址
spring.rabbitmq.port=5672 # 端口 默认5672
spring.rabbitmq.username=admin # 用户名
spring.rabbitmq.password=admin # 密码
# 开启发送确认(开启此模式,生产者成功发送到交换机后执行相应的回调函数)
#spring.rabbitmq.publisher-confirms=true
# 开启发送失败退(开启此模式,交换机路由不到队列时执行相应的回调函数)
#spring.rabbitmq.publisher-returns=true
# 开启消费者手动确认 ACK 默认auto
#spring.rabbitmq.listener.direct.acknowledge-mode=manual
#spring.rabbitmq.listener.simple.acknowledge-mode=manual
复制代码

Direct Exchange

direct类型的Exchange路由规则很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中

SpringBoot 中使用RabbitMQ(一)
  • 定义配置类,注册交换机和队列并进行绑定
/**
 * Rabbit 配置类
 * @author peng
 */
@Configuration
public class DirectRabbitConfig {

    @Bean
    DirectExchange directExchange(){
        // 注册一个 Direct 类型的交换机 默认持久化、非自动删除
        return new DirectExchange("directExchange");
    }

    @Bean
    Queue infoQueue(){
        // 注册队列
        return new Queue("infoMsgQueue");
    }
    
    @Bean
    Queue warnQueue(){
        return new Queue("warnMsgQueue");
    }
    
    @Bean
    Binding infoToExchangeBinging(Queue infoQueue, DirectExchange directExchange) {
        // 将队列以 info-msg 为绑定键绑定到交换机
        return BindingBuilder.bind(infoQueue).to(directExchange).with("info-msg");
    }
    
    @Bean
    Binding warnToExchangeBinging(Queue warnQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(warnQueue).to(directExchange).with("warn-msg");
    }
}
复制代码
  • 定义生产者
/**
 * 生产者
 * @author peng
 */
@Component
public class DirectSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void sendInfo() {
        String content = "I am Info msg!";
        // 将消息以info-msg绑定键发送到directExchange交换机
        this.rabbitTemplate.convertAndSend("directExchange", "info-msg", content);
        System.out.println("########### SendInfoMsg : " + content);
    }
    
    public void sendWarn() {
        String content = "I am Warn msg!";
        this.rabbitTemplate.convertAndSend("directExchange", "warn-msg", content);
        System.out.println("########### SendWarnMsg : " + content);
    }
    
    public void sendWarn(int i) {
        String content = "I am Warn msg! " + i;
        this.rabbitTemplate.convertAndSend("directExchange", "warn-msg", content);
        System.out.println("########### SendWarnMsg : " + content);
    }
    
    public void sendError() {
        String content = "I am Error msg!";
        this.rabbitTemplate.convertAndSend("directExchange", "error-msg", content);
        System.out.println("########### SendErrorMsg : " + content);
    }
}

复制代码
  • 定义消费者
消费者1
/**
 * @author peng
 */
@Component
// 标记此类为Rabbit消息监听类,监听队列infoMsgQueue
@RabbitListener(queues = "infoMsgQueue")
public class DirectReceiver1 {

    // 定义处理消息的方法
    @RabbitHandler
    public void process(String message) {
        System.out.println("########### DirectReceiver1 Receive InfoMsg:" + message);
    }
}

消费者2 
@Component
@RabbitListener(queues = "warnMsgQueue")
public class DirectReceiver2 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("########### DirectReceiver2 Receive warnMsg:" + message);
    }
}
复制代码
  • 基础用法测试
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class DirectTest {
    @Autowired
    private DirectSender directSender;

    @Test
    public void send() {
        directSender.sendInfo();
        directSender.sendWarn();
        directSender.sendError();
    }
}

结果

    ########### SendInfoMsg : I am Info msg!
    ########### SendWarnMsg : I am Warn msg!
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg!
    ########### DirectReceiver1 Receive InfoMsg:I am Info msg!
    
    InfoMsg 以info-msg绑定键发送到directExchange交换机,交换机路由到infoMsgQueue队列,DirectReceiver1监听此队列接受消息。
    WarnMsg 同理
    ErrorMsg 由于没有队列的绑定键为 error-msg 所以消息会被丢弃

复制代码
  • 一对多测试
消费者3
@Component
@RabbitListener(queues = "warnMsgQueue")
public class DirectReceiver3 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("########### DirectReceiver3 Receive warnMsg:" + message);
    }
}

// 一对多
@Test
public void oneToMany() {
    for (int i = 0; i< 100 ; i++){
        directSender.sendWarn(i);
    }
}

结果
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 0
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 1
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 3
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 2
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 4
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 6
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 8
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 10
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 5
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 7
    
    消费者2 和 消费者3 均匀(条数上)的消费了消息

复制代码
  • 多对多测试
/**
 * 生产者3
 * @author peng
 */
@Component
public class DirectSender2 {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void sendWarn(int i) {
        String content = "I am Warn msg! " + i +" fromSend2";
        this.rabbitTemplate.convertAndSend("directExchange", "warn-msg", content);
        System.out.println("########### SendWarnMsg : " + content);
    }
}

// 多对多
@Test
public void manyToMany() {
    for (int i = 0; i< 10 ; i++){
        directSender.sendWarn(i);
        directSender2.sendWarn(i);
    }
}

结果

    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 0 fromSend2
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 0 fromSend1
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 1 fromSend1
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 1 fromSend2
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 2 fromSend2
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 2 fromSend1
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 3 fromSend2
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 3 fromSend1
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 4 fromSend2
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 4 fromSend1
   
    消费者2和消费者3分别接受了生产者1 和生产者2的消息
复制代码

Fanout Exchang

fanout类型的Exchange路由规则非常简单,会发送给所有绑定到该交换机的队列上。会忽略路由键

SpringBoot 中使用RabbitMQ(一)

配置类

/**
 * @author peng
 */
@Configuration
public class FanoutRabbitConfig {

    @Bean
    FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Queue queue1(){
        return new Queue("fanout.1");
    }
    @Bean
    Queue queue2(){
        return new Queue("fanout.2");
    }
    @Bean
    Queue queue3(){
        return new Queue("fanout.3");
    }

    @Bean
    Binding bindingExchange1(Queue queue1, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue1).to(fanoutExchange);
    }
    @Bean
    Binding bindingExchange2(Queue queue2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue2).to(fanoutExchange);
    }
    @Bean
    Binding bindingExchange3(Queue queue3, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue3).to(fanoutExchange);
    }
}
复制代码

生产者

@Component
public class FanoutSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "hi, fanout msg ";
        this.rabbitTemplate.convertAndSend("fanoutExchange", "", context);
        System.out.println("######## Sender : " + context);
    }
}

复制代码

消费者

消费者1
/**
 * @author peng
 */
@Component
@RabbitListener(queues = "fanout.1")
public class FanoutReceiver1 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("fanout Receiver 1  : " + message);
    }
}

消费者2
@Component
@RabbitListener(queues = "fanout.2")
public class FanoutReceiver2 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("fanout Receiver 2  : " + message);
    }
}

消费者3
@Component
@RabbitListener(queues = "fanout.3")
public class FanoutReceiver3 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("fanout Receiver 3  : " + message);
    }
}

复制代码

测试

@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class FanoutTest {
    @Autowired
    private FanoutSender fanoutSender;

    @Test
    public void send() {
        fanoutSender.send();
    }
}

结果

######## Sender : hi, fanout msg 
fanout Receiver 1  : hi, fanout msg 
fanout Receiver 2  : hi, fanout msg 
fanout Receiver 3  : hi, fanout msg
复制代码

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Agile Web Development with Rails, Third Edition

Agile Web Development with Rails, Third Edition

Sam Ruby、Dave Thomas、David Heinemeier Hansson / Pragmatic Bookshelf / 2009-03-17 / USD 43.95

Rails just keeps on changing. Rails 2, released in 2008, brings hundreds of improvements, including new support for RESTful applications, new generator options, and so on. And, as importantly, we’ve a......一起来看看 《Agile Web Development with Rails, Third Edition》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具