RabbitMQ指南之一:"Hello World!"

栏目: 后端 · 发布时间: 5年前

内容简介:为什么要使用MQ消息中间件?它解决了什么问题?关于为什么要使用消息中间件?消息中间件是如何做到同步变异步、流量削锋、应用解耦的?网上已经有很多说明,我这里就不再说明。我在接下来的RabbitMq系列博客里会将官方的讲解翻译过来,同时加以自己的理解整理成博客,希望能和大家共同交流,一起进步。RabbitMq原理图

为什么要使用MQ消息中间件?它解决了什么问题?关于为什么要使用消息中间件?消息中间件是如何做到同步变异步、流量削锋、应用解耦的?网上已经有很多说明,我这里就不再说明。我在接下来的RabbitMq系列博客里会将官方的讲解翻译过来,同时加以自己的理解整理成博客,希望能和大家共同交流,一起进步。

RabbitMQ指南之一:
RabbitMQ指南之一:

RabbitMq原理图

1、RabbitMq简介

RabbitMq是一个消息中间件:它接收消息、转发消息。你可以把它理解为一个邮局:当你向邮箱里寄出一封信后,邮递员们就能最终将信送到收信人手中。类似的,RabbitMq就好比是一个邮箱、邮局和邮递员。RabbitMq和邮局最大的区别是:RabbitMq接收、转发的都是二进制数据块--消息,而不是纸质的数据文件。

RabbitMq、消息相关术语如下:

生产者:生产者只发送消息,发送消息的程序即为生产者:

RabbitMQ指南之一:

消息队列:消息队列就相当于RabbitMq中的邮箱名称。尽管消息在你的程序和RabbitMq中流动,但它只能存储在消息队列中。队列本质上是一个大的消息缓存,它能存多少消息,取决于主机的内存和磁盘限制。多个生产者可以往同一个消息队列中发送消息;多个消费者可以从同一个队列中获取数据。我们以下列图形来表示一个消息队列:

RabbitMQ指南之一:

消费者:消费者是一个等待接收消息的程序:

RabbitMQ指南之一:

注意:生产者、消费者和RabbitMq可以在不同的机器上;在很多的应用中,一个生产者同时也可能是消费者。

2、“Hello World!”

在这小节里,我们将写一个消息生产者用来发送消息、一个消息消费者来消费消息(接收消息并打印出来)。

在下面图形中,“P”是我们的生产者,“C”是我们的消费者,中间的红框是我们的消息队列,保存了从生产者那里接收到的准备转发到消费方的消息。

RabbitMQ指南之一:

Java客户端类库说明:

RabbitMq使用多种协议,本指南使用AMQP 0-9-1协议,该协议是一个开源的、通用的消息协议。RabbitMq有多种语言的客户端,这里我们使用 JAVA 语言的客户端做实验。通过以下地址下载RabbitMq客户端jar包和依赖包:

amqp-client-5.5.1.jar

slf4j-api-1.7.25.jar

slf4j-simple-1.7.25.jar

把这三个jar包拷贝到你的工作目录,包括后面教程要新建的java文件。

2.1 发送消息

生产者连接RabbitMq,发送一条简单的消息”Hello World!“后就退出。

在Send.java类中,需要引入以下依赖包:

1 import com.rabbitmq.client.ConnectionFactory;
2 import com.rabbitmq.client.Connection;
3 import com.rabbitmq.client.Channel;
复制代码

给队列起个名字:

1 public class Send {
2   private final static String QUEUE_NAME = "hello";
3   public static void main(String[] argv) throws Exception {
4       ...
5   }
6 }
复制代码

创建连接到服务器的连接Collection:

1 onnectionFactory factory = new ConnectionFactory();
2 factory.setHost("localhost");
3 try (Connection connection = factory.newConnection();
4      Channel channel = connection.createChannel()) {
5 
6 }
复制代码

这个连接即套接字连接,为我们处理协议版本协商和身份验证等。这里我们连接一个本地的RabbitMq:因此是localhost,如果你想要连接一个远程机器上的RabbitMq,只需要把localhst改成那台机器的计算机名或是IP地址。

创建完连接之后,我们继续创建一个信道:Channel。我们需要使用try-with-resource表达式,因为Connection和Channel都实现了JAVA接口Closeable,属于资源,需要关闭,这样我们就不需要显示地在我们的代码中进行关闭了。(关于信道,请参考文章最顶部的RabbitMq原理图,是TCP里面的虚拟链接,例如:电缆相当于一个TCP,信道就是里面的一个独立光纤,一条TCP上面创建多条信道是没有问题的;TCP一旦打开就分创建AMQP信道;无论是发布消息、接收消息、订阅队列,这些动作都是通过信道完成的)。

为了发送消息,我们还必须要定义一个需要发送到的消息队列,这些都要使用try-with-resource表达式:

1 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
2 String message = "Hello World!";
3 channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
4 System.out.println(" [x] Sent '" + message + "'");
复制代码

定义一个消息队列是幂等的:只有该队列不存在的时候才能被创建,消息是二进制数组,因此你可以根据需要指定编码。

完成的Send.java如下:

1 import com.rabbitmq.client.Channel;
 2 import com.rabbitmq.client.Connection;
 3 import com.rabbitmq.client.ConnectionFactory;
 4 
 5 public class Send {
 6 
 7     private final static String QUEUE_NAME = "hello";
 8 
 9     public static void main(String[] argv) throws Exception {
10         ConnectionFactory factory = new ConnectionFactory();
11         factory.setHost("localhost");
12         try (Connection connection = factory.newConnection();
13              Channel channel = connection.createChannel()) {
14             channel.queueDeclare(QUEUE_NAME, false, false, false, null);
15             String message = "Hello World!";
16             channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
17             System.out.println(" [x] Sent '" + message + "'");
18         }
19     }
20 }
复制代码

2.2 接收消息

消费者监听RabbitMq中的消息,因此与生产者发送一条消息就退出不同,消费者要保持运行状态来接收消息并打印出来。

Recv.java同样需要导入以下依赖包:

1 import com.rabbitmq.client.Channel;
2 import com.rabbitmq.client.Connection;
3 import com.rabbitmq.client.ConnectionFactory;
4 import com.rabbitmq.client.DeliverCallback;
复制代码

与生产者相同,我们需要创建Connetcion和Channel、定义队列(需要监听并接收消息的队列):

1 public class Recv {
 2 
 3   private final static String QUEUE_NAME = "hello";
 4 
 5   public static void main(String[] argv) throws Exception {
 6     ConnectionFactory factory = new ConnectionFactory();
 7     factory.setHost("localhost");
 8     Connection connection = factory.newConnection();
 9     Channel channel = connection.createChannel();
10 
11     channel.queueDeclare(QUEUE_NAME, false, false, false, null);
12     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
13 
14   }
15 }
复制代码

注意我们也在这里声明队列,因为我们可能在生产者之前启动消费者,我们想要确保在我们尝试消费消息的时候队列就已经存在了。

这里我们为什么不使用try-with-resource表达式自动关闭channl和connection?通过这样,我们就可以使我们的程序一直保持运行状态,如果把这些关了,程序也就停止了。这就尴尬了,因为我们需要保持消费者一直处于异步监听消息过来的状态。

RabbitMq会将队列中的消息异步地推送过来,我们需要提供一个回调函数来缓存消息直到我们需要用到这些消息:

1 DeliverCallback deliverCallback = (consumerTag, delivery) -> {
2     String message = new String(delivery.getBody(), "UTF-8");
3     System.out.println(" [x] Received '" + message + "'");
4 };
5 channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
复制代码

Rec.java完整代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Recv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}
复制代码

3、测试

在官方手册中,测试部分他们是将客户端jar和依赖jar添加到classpath路径,然后在cmd终端来运行的,我觉得麻烦,因此,我这里放到IDEA中来运行,效果是一样的。

RabbitMQ指南之一:

第一步:首先运行Send.java:

输出结果:

[x] Sent 'Hello World!'
复制代码

查看RabbitMq控制台:

RabbitMQ指南之一:
RabbitMQ指南之一:

说明消息已经发送成功。

第二步:启动消费者Recv.java:

输出结果:

[x] Received 'Hello World!'
复制代码

说明消息已经消费成功了,此时再查看控制台:

RabbitMQ指南之一:
RabbitMQ指南之一:

消息依然存在在队列中,但是区别是,在第一张图中Ready由1变成了0,Unacknowledged由0变成了1;第二张图中Ready也由1变成0,Unacked由0变成了1。为什么会这样?按道理,消息消费了之后就应该删除掉,否则可能造成重复消费。关于这方面知识,将会在后面的章节中再介绍(Ack机制)。

4、用SpringBoot实现

上面虽然实现了功能,但在实际工作中,我们更多的可能是使用SpringBoot、SpringCloud等成熟的框架来实现。本小节就通过SpringBoot来实现以上功能。

创建工程的时候选择RabbitMq:

RabbitMQ指南之一:

工程目录如下:

RabbitMQ指南之一:

Provider和Consumer的配置文件相同,IP请替换成你自己的:

1 #RabbitMq
2 spring.rabbitmq.host=192.168.xx.xx  
3 spring.rabbitmq.username=rabbitmq
4 spring.rabbitmq.password=123456
5 
6 hello_world.queue=hello
复制代码

为方便让系统启动时就往队列发送消息,所以写了一个SenderRunner类:

1 @Component
 2 public class SenderRunner implements ApplicationRunner {
 3 
 4     @Autowired
 5     private Send send;
 6 
 7     @Override
 8     public void run(ApplicationArguments args) throws Exception {
 9         send.doSender("Hello RabbitMq");
10     }
11 }
复制代码

Send.java

1 @Component
 2 public class Send {
 3 
 4     @Value("${hello_world.queue}")
 5     private String queueName;
 6 
 7     @Autowired
 8     private AmqpTemplate amqpTemplate;
 9 
10     public void doSender(String msg) {
11 
12         amqpTemplate.convertAndSend(queueName,msg);
13         System.out.println("发送消息:" + msg);
14     }
15 }
复制代码

启动类:

1 @SpringBootApplication
2 public class ProviderApplication {
3     public static void main(String[] args) {
4         SpringApplication.run(ProviderApplication.class, args);
5     }
6 }
复制代码

Recv.java

@Component
public class Recv {

    @RabbitListener(queues = "${hello_world.queue}")
    public void receive(String msg) {
        System.out.println("接收到消息:" + msg);
    }
}
复制代码

启动Provider:

RabbitMQ指南之一:

查看控制台:

RabbitMQ指南之一:

启动Consumer:

RabbitMQ指南之一:

可见,SpringBoot为我们做了很多封装,隐藏了很多底层的细节,使用起来简单多了。(未完待续......)


以上所述就是小编给大家介绍的《RabbitMQ指南之一:"Hello World!"》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

并行计算导论

并行计算导论

Ananth Grama、George Karypis、张武、毛国勇、Anshul Gupta、Vipin Kumar、程海英 / 张武、毛国勇、程海英 / 机械工业出版社 / 2005-1-1 / 49.00元

《并行计算导论》(原书第2版)全面介绍并行计算的各个方面,包括体系结构、编程范例、算法与应用和标准等,涉及并行计算的新技术,也覆盖了较传统的算法,如排序、搜索、图和动态编程等。《并行计算导论》(原书第2版)尽可能采用与底层平台无关的体系结构并且针对抽象模型来设计处落地。书中选择MPI、POSIX线程和OpenMP作为编程模型,并在不同例子中反映了并行计算的不断变化的应用组合。一起来看看 《并行计算导论》 这本书的介绍吧!

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换