内容简介:交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。 这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:Direct, topic, Headers and Fanout
交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。 这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。
交换机有四种类型:Direct, topic, Headers and Fanout
* Direct:direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去. * Topic:按规则转发消息(最灵活) * Headers:设置 header attribute 参数类型的交换机 * Fanout:转发消息到所有绑定队列(广播模式) 复制代码
下面介绍常用的三种模式的基础用法。
SpringBoot 整合
Pom 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
复制代码
application.properties 配置文件
# rabbitmq连接参数 spring.rabbitmq.host= # mq ip地址 spring.rabbitmq.port=5672 # 端口 默认5672 spring.rabbitmq.username=admin # 用户名 spring.rabbitmq.password=admin # 密码 # 开启发送确认(开启此模式,生产者成功发送到交换机后执行相应的回调函数) #spring.rabbitmq.publisher-confirms=true # 开启发送失败退(开启此模式,交换机路由不到队列时执行相应的回调函数) #spring.rabbitmq.publisher-returns=true # 开启消费者手动确认 ACK 默认auto #spring.rabbitmq.listener.direct.acknowledge-mode=manual #spring.rabbitmq.listener.simple.acknowledge-mode=manual 复制代码
Direct Exchange
direct类型的Exchange路由规则很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中
- 定义配置类,注册交换机和队列并进行绑定
/**
* Rabbit 配置类
* @author peng
*/
@Configuration
public class DirectRabbitConfig {
@Bean
DirectExchange directExchange(){
// 注册一个 Direct 类型的交换机 默认持久化、非自动删除
return new DirectExchange("directExchange");
}
@Bean
Queue infoQueue(){
// 注册队列
return new Queue("infoMsgQueue");
}
@Bean
Queue warnQueue(){
return new Queue("warnMsgQueue");
}
@Bean
Binding infoToExchangeBinging(Queue infoQueue, DirectExchange directExchange) {
// 将队列以 info-msg 为绑定键绑定到交换机
return BindingBuilder.bind(infoQueue).to(directExchange).with("info-msg");
}
@Bean
Binding warnToExchangeBinging(Queue warnQueue, DirectExchange directExchange) {
return BindingBuilder.bind(warnQueue).to(directExchange).with("warn-msg");
}
}
复制代码
- 定义生产者
/**
* 生产者
* @author peng
*/
@Component
public class DirectSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void sendInfo() {
String content = "I am Info msg!";
// 将消息以info-msg绑定键发送到directExchange交换机
this.rabbitTemplate.convertAndSend("directExchange", "info-msg", content);
System.out.println("########### SendInfoMsg : " + content);
}
public void sendWarn() {
String content = "I am Warn msg!";
this.rabbitTemplate.convertAndSend("directExchange", "warn-msg", content);
System.out.println("########### SendWarnMsg : " + content);
}
public void sendWarn(int i) {
String content = "I am Warn msg! " + i;
this.rabbitTemplate.convertAndSend("directExchange", "warn-msg", content);
System.out.println("########### SendWarnMsg : " + content);
}
public void sendError() {
String content = "I am Error msg!";
this.rabbitTemplate.convertAndSend("directExchange", "error-msg", content);
System.out.println("########### SendErrorMsg : " + content);
}
}
复制代码
- 定义消费者
消费者1
/**
* @author peng
*/
@Component
// 标记此类为Rabbit消息监听类,监听队列infoMsgQueue
@RabbitListener(queues = "infoMsgQueue")
public class DirectReceiver1 {
// 定义处理消息的方法
@RabbitHandler
public void process(String message) {
System.out.println("########### DirectReceiver1 Receive InfoMsg:" + message);
}
}
消费者2
@Component
@RabbitListener(queues = "warnMsgQueue")
public class DirectReceiver2 {
@RabbitHandler
public void process(String message) {
System.out.println("########### DirectReceiver2 Receive warnMsg:" + message);
}
}
复制代码
- 基础用法测试
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class DirectTest {
@Autowired
private DirectSender directSender;
@Test
public void send() {
directSender.sendInfo();
directSender.sendWarn();
directSender.sendError();
}
}
结果
########### SendInfoMsg : I am Info msg!
########### SendWarnMsg : I am Warn msg!
########### DirectReceiver2 Receive warnMsg:I am Warn msg!
########### DirectReceiver1 Receive InfoMsg:I am Info msg!
InfoMsg 以info-msg绑定键发送到directExchange交换机,交换机路由到infoMsgQueue队列,DirectReceiver1监听此队列接受消息。
WarnMsg 同理
ErrorMsg 由于没有队列的绑定键为 error-msg 所以消息会被丢弃
复制代码
- 一对多测试
消费者3
@Component
@RabbitListener(queues = "warnMsgQueue")
public class DirectReceiver3 {
@RabbitHandler
public void process(String message) {
System.out.println("########### DirectReceiver3 Receive warnMsg:" + message);
}
}
// 一对多
@Test
public void oneToMany() {
for (int i = 0; i< 100 ; i++){
directSender.sendWarn(i);
}
}
结果
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 0
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 1
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 3
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 2
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 4
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 6
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 8
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 10
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 5
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 7
消费者2 和 消费者3 均匀(条数上)的消费了消息
复制代码
- 多对多测试
/**
* 生产者3
* @author peng
*/
@Component
public class DirectSender2 {
@Autowired
private AmqpTemplate rabbitTemplate;
public void sendWarn(int i) {
String content = "I am Warn msg! " + i +" fromSend2";
this.rabbitTemplate.convertAndSend("directExchange", "warn-msg", content);
System.out.println("########### SendWarnMsg : " + content);
}
}
// 多对多
@Test
public void manyToMany() {
for (int i = 0; i< 10 ; i++){
directSender.sendWarn(i);
directSender2.sendWarn(i);
}
}
结果
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 0 fromSend2
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 0 fromSend1
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 1 fromSend1
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 1 fromSend2
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 2 fromSend2
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 2 fromSend1
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 3 fromSend2
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 3 fromSend1
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 4 fromSend2
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 4 fromSend1
消费者2和消费者3分别接受了生产者1 和生产者2的消息
复制代码
Fanout Exchang
fanout类型的Exchange路由规则非常简单,会发送给所有绑定到该交换机的队列上。会忽略路由键
配置类
/**
* @author peng
*/
@Configuration
public class FanoutRabbitConfig {
@Bean
FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}
@Bean
Queue queue1(){
return new Queue("fanout.1");
}
@Bean
Queue queue2(){
return new Queue("fanout.2");
}
@Bean
Queue queue3(){
return new Queue("fanout.3");
}
@Bean
Binding bindingExchange1(Queue queue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue1).to(fanoutExchange);
}
@Bean
Binding bindingExchange2(Queue queue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue2).to(fanoutExchange);
}
@Bean
Binding bindingExchange3(Queue queue3, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue3).to(fanoutExchange);
}
}
复制代码
生产者
@Component
public class FanoutSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hi, fanout msg ";
this.rabbitTemplate.convertAndSend("fanoutExchange", "", context);
System.out.println("######## Sender : " + context);
}
}
复制代码
消费者
消费者1
/**
* @author peng
*/
@Component
@RabbitListener(queues = "fanout.1")
public class FanoutReceiver1 {
@RabbitHandler
public void process(String message) {
System.out.println("fanout Receiver 1 : " + message);
}
}
消费者2
@Component
@RabbitListener(queues = "fanout.2")
public class FanoutReceiver2 {
@RabbitHandler
public void process(String message) {
System.out.println("fanout Receiver 2 : " + message);
}
}
消费者3
@Component
@RabbitListener(queues = "fanout.3")
public class FanoutReceiver3 {
@RabbitHandler
public void process(String message) {
System.out.println("fanout Receiver 3 : " + message);
}
}
复制代码
测试
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class FanoutTest {
@Autowired
private FanoutSender fanoutSender;
@Test
public void send() {
fanoutSender.send();
}
}
结果
######## Sender : hi, fanout msg
fanout Receiver 1 : hi, fanout msg
fanout Receiver 2 : hi, fanout msg
fanout Receiver 3 : hi, fanout msg
复制代码
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- RecyclerView使用指南(一)—— 基本使用
- 如何使用Meteorjs使用URL参数
- 使用 defer 还是不使用 defer?
- 使用 Typescript 加强 Vuex 使用体验
- [译] 何时使用 Rust?何时使用 Go?
- UDP协议的正确使用场合(谨慎使用)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Agile Web Development with Rails, Third Edition
Sam Ruby、Dave Thomas、David Heinemeier Hansson / Pragmatic Bookshelf / 2009-03-17 / USD 43.95
Rails just keeps on changing. Rails 2, released in 2008, brings hundreds of improvements, including new support for RESTful applications, new generator options, and so on. And, as importantly, we’ve a......一起来看看 《Agile Web Development with Rails, Third Edition》 这本书的介绍吧!