RabbitMQ快速入门

栏目: 后端 · 发布时间: 5年前

内容简介:RabbitMQ其实是我最早接触的一个MQ框架,我记得当时是在大学的时候跑到图书馆一个人去看,由于RabbitMQ官网的英文还不算太难,因此也是参考官网学习的,一共有6章,当时是用Node来开发的,当时花了一下午看完了,也理解了。而现在回过头来再看,发现已经忘记了个差不多了,现在再回过头来继续看看,然乎记之。以防再忘,读者看时最好有一定的MQ基础。首先我们需要知道的是RabbitMQ它是基于高级队列协议(AMQP)的,它是Elang编写的,下面将围绕RabbitMQ队列、交换机、RPC三个重点进行展开。存储

一、前言

RabbitMQ其实是我最早接触的一个MQ框架,我记得当时是在大学的时候跑到图书馆一个人去看,由于RabbitMQ官网的英文还不算太难,因此也是参考官网学习的,一共有6章,当时是用Node来开发的,当时花了一下午看完了,也理解了。而现在回过头来再看,发现已经忘记了个差不多了,现在再回过头来继续看看,然乎记之。以防再忘,读者看时最好有一定的MQ基础。

二、RabbitMQ

首先我们需要知道的是RabbitMQ它是基于高级队列协议(AMQP)的,它是Elang编写的,下面将围绕RabbitMQ队列、交换机、RPC三个重点进行展开。

2.1、队列

存储消息的地方,多个生产者可以将消息发送到一个队列,多个消费者也可以消费同一个队列的消息。

注意:当多个消费者监听一个队列,此时生产者发送消息到队列只有一个消费者被消费,并且消费端的消费方式是按照消费端在内部启动的顺序轮询(round-robin)。

2.2、消费者

消费消息的一方

public class Send {

    private final static String QUEUE_NAME = "hello";
    private final static String IP = "172.16.12.162";
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP);
        factory.setUsername("admin");
        factory.setPassword("admin");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
public class Recv {

    private final static String QUEUE_NAME = "hello";
    private final static String IP = "172.16.12.162";

    public static void main(String[] args) {
        try {

            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(IP);
            factory.setUsername("admin");
            factory.setPassword("admin");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

2.3、小结

1、Rabbit是如何保证消息被消费的?

答:通过ack机制。每当一个消息被消费端消费的时候,消费端可以发送一个ack给RabbitMQ,这样RabbitMQ就知道了该条消息已经被完整消费并且可以被delete了。;如果一条消息被消费但是没有发送ack,那么此时RabbitMQ将会认为需要重新消费该消息,如果此时还有其它的消费者,那么此时RabbitMQ将会把这条消息交给它处理。

注意:开启ack机制的是autoAck= false ;

2、消息如何进行持久化?

  • 将queue持久化,即设置 channel.queueDeclare(QUEUE_NAME, true, false, false, null);第二个参数durable为true
  • 设置消息持久化,即设置MessageProperties.PERSISTENT_TEXT_PLAIN

注意:消息持久化并不一定保证消息不会被丢失

3、RabbitMQ如何避免两个消费者一个非常忙一个非常闲的情况?

通过如下设置,保证一个消费者一次只能消费一个消息,只有当它消费完成并且返回ack给RabbitMQ之后才给它派发新的消息。

int prefetchCount = 1 ;
channel.basicQos(prefetchCount)

4、RabbitMQ异常情况下如何保证消息不会被重复消费?

需要业务自身实现密等性,RabbitMQ没有提供比较好的方式去保证。

2.2、交换机

在RabbitMQ中,生产者其实从来不会发送消息到队列,甚至,它不知道消息被发送到了哪个队列。那它被发送到了哪里呢?就是本节的重点:交换机,下面就是它在RabbitMQ中的介绍图。(X就是交换机)生产者发送消息给交换机,然后由交换机将消息转发给队列。

从上图就产生一个问题:X怎么将消息发给queue呢?它是把消息发给所有queue还是发给一个指定的queue或者丢弃消息呢?这就是看交换机的类型了。下面一起谈谈这几种类型

2.2.1、fanout

fanout:广播模式,这个比较好理解,就是所有的队列都能收到交换机的消息。

RabbitMQ快速入门

如上面,两个队列都能收到交换机的消息。

2.2.2、direct

这个模式相当于发布/订阅模式的一种,当交换机类型为direct的时候,此时我们需要设置两个参数:

  1. channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));第二个参数,我们可以把它称呼为routeKey
  2. channel.queueBind(queueName, EXCHANGE_NAME, "");第三个参数,我们把它称呼为bindKey

有了这两个参数,我们就可以指定我们订阅哪些消息了。

RabbitMQ快速入门

如图,Q1订阅了orange的消息,Q2订阅了black、green的消息。

2.2.3、topic

其实topic和direct有一点类似,它相当于对direct作了增强。在direct中,我们上面所说的bind routeKey为black、green的它是有限制的,它只能绝对的等于routeKey,但是有时候我们的需求不是这样,我们可能想要的是正则匹配即可,那么Topic就派上用场了。

RabbitMQ快速入门

当类型为topic时,它的bindKey对应字符串需要是以“.”分割,同时RabbitMQ还提供了两个符号:

  • 星号(*):表示1个单词
  • 井号(#):表示0、多个单词

上图的意思是:所有第二个单词为orange的消息发送个Q1,所有最后一个单词为rabbit或者第一个单词为lazy的消息发送给Q2。

2.2.4、header

这一种类型官方demo没有过多解释,这里也不研究了。

2.3、RPC

RabbitMQ 还可以实现RPC(远程过程调用)。什么是RPC,简单来说就是local调用remote方法。对应于RabbitMQ中则是Client发送一个request message,Server处理完成之后将其返回给Client。这里就有了一个疑问?Server是如何将response返回给Client的,这里RabbitMQ定义了一个概念:Callback Queue。

Callback Queue

注意这个队列是独一无二的 String replyQueueName = channel.queueDeclare().getQueue();

首先我们需要明白一点的是为什么需要这个queue?我们知道在RabbitMQ作消息队列的时候,Client只需要将消息投放到queue中,然后Server从queue去取就可以了。但是在RabbitMQ作为RPC的时候多了一点就是,Client还需要返回结果,这时Server端怎么知道把消息发送给Client,这就是Callback Queue的用处了。

Correlation Id

在上面我们知道Server返回数据给Client是通过Callback Queue的,那么是为每一个request都创建一个queue吗?这未免太过浪费资源,RabbitMQ有更好的方案。在我们发送request,绑定一个唯一ID(correlationId),然后在消息被处理返回的时候取出这个ID和发出去的ID进行匹配。这样来说一个Callback Queue是Client级别而不是request级别的了。

实现

上面介绍了RabbitMQ实现RPC最重要的两个概念,具体代码比较简单还是贴下把。

client 端

public class RPCClient {
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";

    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        connection = factory.newConnection();
        channel = connection.createChannel();
    }

    public static void main(String[] argv) throws Exception{
        RPCClient fibonacciRpc = new RPCClient();
        try {
            for (int i = 0; i < 32; i++) {
                String i_str = Integer.toString(i);
                System.out.println(" [x] Requesting fib(" + i_str + ")");
                String response = fibonacciRpc.call(i_str);
                System.out.println(" [.] Got '" + response + "'");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public String call(String message) throws IOException, InterruptedException {
        final String corrId = UUID.randomUUID().toString();

        String replyQueueName = channel.queueDeclare().getQueue();
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

        String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response.offer(new String(delivery.getBody(), "UTF-8"));
            }
        }, consumerTag -> {
        });

        String result = response.take();
        channel.basicCancel(ctag);
        return result;
    }

    public void close() throws IOException {
        connection.close();
    }
}

服务端

public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static int fib(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n - 1) + fib(n - 2);
    }

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.queuePurge(RPC_QUEUE_NAME);

            channel.basicQos(1);

            System.out.println(" [x] Awaiting RPC requests");

            Object monitor = new Object();
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                        .Builder()
                        .correlationId(delivery.getProperties().getCorrelationId())
                        .build();

                String response = "";

                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    int n = Integer.parseInt(message);

                    System.out.println(" [.] fib(" + message + ")");
                    response += fib(n);
                } catch (RuntimeException e) {
                    System.out.println(" [.] " + e.toString());
                } finally {
                    channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    // RabbitMq consumer worker thread notifies the RPC server owner thread
                    synchronized (monitor) {
                        monitor.notify();
                    }
                }
            };

            channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
            // Wait and be prepared to consume the message from RPC client.
            while (true) {
                synchronized (monitor) {
                    try {
                        monitor.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

三、总结

这次回头再看RabbitMQ,再次重新理解了以下RabbitMQ,有些东西还是要慢慢嚼的。当然这些也都是官网的入门例子,后续有机会的话再深入研究。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Software Engineering for Internet Applications

Software Engineering for Internet Applications

Eve Andersson、Philip Greenspun、Andrew Grumet / The MIT Press / 2006-03-06 / USD 35.00

After completing this self-contained course on server-based Internet applications software, students who start with only the knowledge of how to write and debug a computer program will have learned ho......一起来看看 《Software Engineering for Internet Applications》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

MD5 加密
MD5 加密

MD5 加密工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具