使用 RabbitMQ 实现 RPC

栏目: 后端 · 发布时间: 7年前

内容简介:作者 | 乔宇杏仁后端工程师,关注服务端技术。

作者 | 乔宇

使用 RabbitMQ 实现 RPC

杏仁后端工程师,关注服务端技术。

背景知识

RabbitMQ

RabbitMQ 是基于 AMQP 协议实现的一个消息队列(Message Queue),Message Queue 是一个典型的生产者/消费者模式。生产者发布消息,消费者消费消息,生产者和消费者之间是解耦的,互相不知道对方的存在。

使用 RabbitMQ 实现 RPC

RPC

Remote Procedure Call:远程过程调用,一次远程过程调用的流程即客户端发送一个请求到服务端,服务端根据请求信息进行处理后返回响应信息,客户端收到响应信息后结束。

使用 RabbitMQ 实现 RPC

如何使用 RabbitMQ 实现 RPC?

使用 RabbitMQ 实现 RPC,相应的角色是由生产者来作为客户端,消费者作为服务端。

但 RPC 调用一般是同步的,客户端和服务器也是紧密耦合的。即客户端通过 IP/域名和端口链接到服务器,向服务器发送请求后等待服务器返回响应信息。

但 MQ 的生产者和消费者是完全解耦的,那么如何用 MQ 实现 RPC 呢?很明显就是把 MQ 当作中间件实现一次双向的消息传递:

使用 RabbitMQ 实现 RPC

客户端和服务端即是生产者也是消费者。客户端发布请求,消费响应;服务端消费请求,发布响应。

具体实现

MQ部分的定义

请求信息的队列

我们需要一个队列来存放请求信息,客户端向这个队列发布请求信息,服务端消费该队列处理请求。该队列不需要复杂的路由规则,直接使用 RabbitMQ 默认的 direct exchange 来路由消息即可。

响应信息的队列

存放响应信息的队列不应只有一个。如果存在多个客户端,不能保证响应信息被发布请求的那个客户端消费到。所以应为每一个客户端创建一个响应队列,这个队列应该由客户端来创建且只能由这个客户端使用并在使用完毕后删除,这里可以使用 RabbitMQ 提供的排他队列(Exclusive Queue):

channel.queueDeclare(queue:"", durable:false, exclusive:true, autoDelete:false, new HashMap<>())

并且要保证队列名唯一,声明队列时名称设为空 RabbitMQ 会生成一个唯一的队列名。

exclusive 设为 true 表示声明一个排他队列,排他队列的特点是只能被当前的连接使用,并且在连接关闭后被删除。

一个简单的 demo(使用 pull 机制)

我们使用一个简单的 demo 来了解客户端和服务端的处理流程。

发布请求

  • 编写代码前的一个小问题

我们在声明队列时为每一个客户端声明了独有的响应队列,那服务器在发布响应时如何知道发布到哪个队列呢?其实就是客户端需要告诉服务端将响应发布到哪个队列,RabbitMQ 提供了这个支持,消息体的 Properties 中有一个属性 reply_to 就是用来标记回调队列的名称,服务器需要将响应发布到 reply_to 指定的回调队列中。

解决了这个问题之后我们就可以编写客户端发布请求的代码了:

// 定义响应回调队列
String replyQueueName = channel.queueDeclare("", false, true, false, new HashMap<>()).getQueue();

// 设置回调队列到 Properties
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .replyTo(replyQueueName)
        .build();
String request = "request";

// 发布请求
channel.basicPublish("", "rpc_queue", properties, request.getBytes());

Direct reply-to:

RabbitMQ 提供了一种更便捷的机制来实现 RPC,不需要客户端每次都定义回调队列,客户端发布请求时将 replyTo 设为 amq.rabbitmq.reply-to ,消费响应时也指定消费 amq.rabbitmq.reply-to ,RabbitMQ 会为客户端创建一个内部队列

消费请求

接下来是服务端处理请求的部分,接收到请求后经过处理将响应信息发布到 reply_to 指定的回调队列:

// 服务端 Consumer 的定义
public class RpcServer extends DefaultConsumer {
    public RpcServer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String msg = new String(body);
        String response = (msg + " Received");
        // 获取回调队列名
        String replyTo = properties.getReplyTo();
        // 发布响应消息到回调队列
        this.getChannel().basicPublish("", replyTo, new AMQP.BasicProperties(), response.getBytes());
    }
}
...

// 启动服务端 Consumer
channel.basicConsume("rpc_queue", true, new RpcServer(channel));

接收响应

客户端如何接收服务器的响应呢?有两种方式:1.轮询的去 pull 回调队列中的消息,2.异步的消费回调队列中的消息。我们在这里简单实现第一种方案。

GetResponse getResponse = null;
while (getResponse == null) {
    getResponse = channel.basicGet(replyQueueName, true);
}
String response = new String(getResponse.getBody());

一个简单的基于 RabbitMQ 的 RPC 模型已经实现了,但这个 demo 并不实用,因为客户端每次发送完请求都要同步的轮询等待响应消息,只能每次处理一个请求。RabbitMQ 的 pull 模式效率也比较低。

实现一个完备可用的 RPC 模式需要做的工作还有很多,要处理的关键点也比较复杂,有句话叫不要重复造轮子,spring 已经实现了一个完备可用的 RPC 模式的库,接下来我们来了解一下。

Spring Rabbit 中的实现

和上面 demo 的 pull 模式一次只能处理一个请求相对应的:如何异步的接收响应并处理多个请求呢?关键点就在于我们需要记录请求和响应并将它们关联起来,RabbitMQ 也提供了支持,Properties 中的另一个属性 correlation_id 用来标识一个消息的唯一 id。

参考 spring-rabbit 中的 convertSendAndReceive 方法的实现,为每一次请求生成一个唯一的 correlation_id

private final AtomicInteger messageTagProvider = new AtomicInteger();
...
String messageTag = String.valueOf(this.messageTagProvider.incrementAndGet());
...
message.getMessageProperties().setCorrelationId(messageTag);

并使用一个 ConcurrentHashMap 来维护 correlation_id 和响应信息的映射:

private final Map<String, PendingReply> replyHolder = new ConcurrentHashMap<String, PendingReply>();
...
final PendingReply pendingReply = new PendingReply();
this.replyHolder.put(correlationId, pendingReply);

PendingReply 中有一个 BlockingQueue 存放响应信息,在发送完请求信息后调用 BlockingQueuepull 方法并设置超时时间来获取响应:

private final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(1);
public Message get ( long timeout , TimeUnit unit ) throws InterruptedException { Object reply = this . queue . poll ( timeout , unit ); return reply == null ? null : processReply ( reply

);

}

在获取响应后不论结果如何,都会将 PendingReplyreplyHolder 中移除,防止 replyHolder 中积压超时的响应消息:

try {
    reply = exchangeMessages(exchange, routingKey, message, correlationData, channel, pendingReply,messageTag);
} finally {
    this.replyHolder.remove(messageTag);
    ...
}

响应信息是何时如何被放到这个 BlockingQueue 中的呢?看一下 RabbitTemplate 接收消息的地方:

public void onMessage(Message message) {
String messageTag;
    if (this.correlationKey == null) { // using standard correlationId property
        messageTag = message.getMessageProperties().getCorrelationId();
    } else {
        messageTag = (String) message.getMessageProperties()
                .getHeaders().get(this.correlationKey);
    }

    // 存在 correlation_id 才认为是RPC的响应信息,不存在时不处理
    if (messageTag == null) {
        logger.error("No correlation header in reply");
        return;
    }

    // 从 replyHolder 中取出 correlation_id 对应的 PendingReply
    PendingReply pendingReply = this.replyHolder.get(messageTag);
    if (pendingReply == null) {
        if (logger.isWarnEnabled()) {
            logger.warn("Reply received after timeout for " + messageTag);
        }
        throw new AmqpRejectAndDontRequeueException("Reply received after timeout");
    }
    else {
        restoreProperties(message, pendingReply);
        // 将响应信息 add 到 BlockingQueue 中
        pendingReply.reply(message);
    }
}

以上的 spring 代码隐去了很多额外部分的处理和细节,只关注关键的部分。

至此一个完整可用的由 RabbitMQ 作为中间件实现的 RPC 模式就完成了。

总结

服务端

服务端的实现比较简单,和一般的 Consumer 的区别只在于需要将请求回复到 replyTo 指定的 queue 中并带上消息标识 correlation_id 即可

服务端的一点小优化:

超时的处理是由客户端来实现的,那服务端有没有可以优化的地方呢?

答案是有的:如果我们的服务端处理比较耗时,如何判断客户端是否还在等待响应呢?

我们可以使用 passive 参数去检查 replyTo 的 queue 是否存在,因为客户端声明的是内部队列,客户端如果断掉链接了这个 queue 就不存在了,这时服务端就无需处理这个消息了。

客户端

客户端承担了更多的工作量,包括:

  • 声明 replyTo 队列(使用 amq.rabbitmq.reply-to 会简单很多)

  • 维护请求和响应消息(使用唯一的 correlation_id 来关联)

  • 消费服务端的返回

  • 处理超时等异常情况(使用BlockingQueue来阻塞获取)

好在 spring 已经实现了一套完备可靠的代码,我们在清楚了流程和关键点之后,可以直接使用 spring 提供的 RabbitTemplate ,无需自己实现。

使用 MQ 实现 RPC 的意义

通过 MQ 实现 RPC 看起来比客户端和服务器直接通讯要复杂一些,那我们为什么要这样做呢?或者说这样做有什么好处:

  1. 将客户端和服务器解耦:客户端只是发布一个请求到 MQ 并消费这个请求的响应。并不关心具体由谁来处理这个请求,MQ 另一端的请求的消费者可以随意替换成任何可以处理请求的服务器,并不影响到客户端。

  2. 减轻服务器的压力:传统的 RPC 模式中如果客户端和请求过多,服务器的压力会过大。由 MQ 作为中间件的话,过多的请求而是被 MQ 消化掉,服务器可以控制消费请求的频次,并不会影响到服务器。

  3. 服务器的横向扩展更加容易:如果服务器的处理能力不能满足请求的频次,只需要增加服务器来消费 MQ 的消息即可,MQ会帮我们实现消息消费的负载均衡。

  4. 可以看出 RabbitMQ 对于 RPC 模式的支持也是比较友好地,

    amq.rabbitmq.reply-to ,   reply_to ,   correlation_id 这些特性都说明了这一点,再加上 spring-rabbit 的实现,可以让我们很简单的使用消息队列模式的 RPC 调用。

全文完

以下文章您可能也会感兴趣:

我们正在招聘 Java 工程师,欢迎有兴趣的同学投递简历到 rd-hr@xingren.com 。

使用 RabbitMQ 实现 RPC

杏仁技术站

长按左侧二维码关注我们,这里有一群热血青年期待着与您相会。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Automate This

Automate This

Christopher Steiner / Portfolio / 2013-8-9 / USD 25.95

"The rousing story of the last gasp of human agency and how today's best and brightest minds are endeavoring to put an end to it." It used to be that to diagnose an illness, interpret legal docume......一起来看看 《Automate This》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码