Motan RPC 框架分析

栏目: 服务器 · 发布时间: 6年前

内容简介:SPI 机制支持 JDK 的 ServiceProvider 机制并进行了扩展,接口的实现放在每个实现类可以加上用来和注册中心交互,包括服务注册、订阅服务、接收服务变更通知、发送心跳等功能。

一、框架简介

Motan 是新浪微博开源的一套高性能、易于使用的分布式远程服务调用(RPC)框架。

Motan RPC 框架分析

Motan 的核心模块交互关系如下:

Motan RPC 框架分析

二、核心模块介绍

2.0 SPI 机制

SPI 机制支持 JDK 的 ServiceProvider 机制并进行了扩展,接口的实现放在 META-INF/services/ 目录下以接口的完全类名命名的文件里,每个实现的完全类名占一行。

每个实现类可以加上 SpiMeta(name="implName") 来指定名称, ExtensionLoader 可以通过接口类型和命名从多个实现中找到指定实现。

2.1 register

用来和注册中心交互,包括服务注册、订阅服务、接收服务变更通知、发送心跳等功能。

// 服务发现功能的抽象
public interface DiscoveryService {
    // 订阅到注册中心
    void subscribe(URL url, NotifyListener listener);

    // 取消订阅
    void unsubscribe(URL url, NotifyListener listener);

    // 根据 url 描述的服务从注册中心获取该服务的所有提供者的信息
    List<URL> discover(URL url);
}

// 服务注册功能的抽象
public interface RegistryService {
    void register(URL url);
    void unregister(URL url);
    void available(URL url);
    void unavailable(URL url);
    Collection<URL> getRegisteredServiceUrls();
}

// 表示一个注册中心的抽象
@Spi(scope = Scope.SINGLETON)
public interface Registry extends RegistryService, DiscoveryService {
    // 获取该注册中心的描述信息
    URL getUrl();
}

2.2 protocol

用来进行 RPC 服务描述和配置管理,可以通过 Filter 进行扩展、功能增强。

@Spi(scope = Scope.SINGLETON)
public interface Protocol {
    // 暴露服务
    <T> Exporter<T> export(Provider<T> provider, URL url);

    // 调用端收到注册中心通知的服务提供者实例信息 serviceUrl 后,
    // 根据 serviceUrl 创建对指定实例的引用
    <T> Referer<T> refer(Class<T> clz, URL url, URL serviceUrl);

    void destroy();
}

// Filter 用于进行功能扩展
@Spi
public interface Filter {
    Response filter(Caller<?> caller, Request request);
}

// 把 Filter 应用到 protocol 上,进行功能增强
public class ProtocolFilterDecorator implements Protocol {
    private Protocol protocol;

    public ProtocolFilterDecorator(Protocol protocol) {
        this.protocol = protocol;
    }

    @Override
    public <T> Exporter<T> export(Provider<T> provider, URL url) {
        // 对原始的 provider 进行包装增强
        return protocol.export(decorateWithFilter(provider, url), url);
    }

    @Override
    public <T> Referer<T> refer(Class<T> clz, URL url, URL serviceUrl) {
        // 对原始 protocol 生成的 Referer 进行包装增强,这样可以在调用前、后进行处理。
        return decorateWithFilter(protocol.refer(clz, url, serviceUrl), url);
    }

    @Override
    public void destroy() {
        protocol.destroy();
    }

  // 省略其它代码
}

2.3 serialize

此模块负责把请求、响应进行序列化和反序列化。默认采用 Hessian2 。

@Spi(scope=Scope.PROTOTYPE)
public interface Serialization {
    byte[] serialize(Object obj) throws IOException;
    <T> T deserialize(byte[] bytes, Class<T> clz) throws IOException;
    byte[] serializeMulti(Object[] data) throws IOException;
    Object[] deserializeMulti(byte[] data, Class<?>[] classes) throws IOException;

    /**
     * serializaion的唯一编号,用于传输协议中指定序列化方式。每种序列化的编号必须唯一。
     * @return 由于编码规范限制,序列化方式最大支持32种,因此返回值必须在0-31之间。
     */
    int getSerializationNumber();
}

2.4 transport

实现远程通信,默认采用 Netty NIO 的 TCP 长链接实现。

2.5 cluster

仅供 client 端使用的模块。

ClusterSupport 收到注册中心的通知,根据这些服务提供者的信息生成一组 Referer 对象,然后用这组 Referer 通过 Cluster,Cluster 再用这些 Referer 刷新持有的 LoadBalance 。

构建 Referer 的时候会实例化 Client ,以完成远程调用。 Client 位于传输层,完成请求、响应的编码、解码工作,然后进行序列化、反序列化,再通过 Channel 进行网络传输。

二、构建客户端代理

创建服务的代理的过程:

1. 收集服务的描述信息 refUrl ;

2. 根据 refUrl 和 registryUrl 创建 ClusterSupport;

3. 根据 interfaceClass 和 ClusterSupport.getCluster() 创建代理对象;

4. 以代理对象 RefererInvocationHandler 为例,它持有 Cluster 就可以进行元旦调用。

public class DefaultRpcReferer<T> extends AbstractReferer<T> {
    protected Client client;
    protected EndpointFactory endpointFactory;

    public DefaultRpcReferer(Class<T> clz, URL url, URL serviceUrl) {
        super(clz, url, serviceUrl);

        endpointFactory =
                ExtensionLoader.getExtensionLoader(EndpointFactory.class).getExtension(
                        url.getParameter(URLParamType.endpointFactory.getName(), URLParamType.endpointFactory.getValue()));

        // 创建 Client
        client = endpointFactory.createClient(url);
    }

    @Override
    protected Response doCall(Request request) {
        try {
            // 为了能够实现跨group请求,需要使用server端的group。
            request.setAttachment(URLParamType.group.getName(), serviceUrl.getGroup());

            // 通过 Client 完成远程调用
            return client.request(request);
        } catch (TransportException exception) {
            throw new MotanServiceException("DefaultRpcReferer call Error: url=" + url.getUri(), exception);
        }
    }

    // 省略其他代码   
}

三、服务端暴露服务

服务端对外暴露服务要考虑两点:

  1. 一个网络端口有多个服务提供者,如何找到目标提供者?

    在transport层实现, ProviderMessageRouter 相当于一个请求分发器,把请求分发调用目前提供者。分发是根据服务描述 URL 里的关键信息生成 serviceKey (组成: group + "/" + interfaceName + "/" + version )来确定提供者,再根据请求里的方法名、方法参数描述来定位要调用的 Method 。

public class ProviderMessageRouter implements MessageHandler {
    // serviceKey 到具体服务提供者的映射
    protected Map<String, Provider<?>> providers = new HashMap<>();

    public Object handle(Channel channel, Object message) {
        Request request = (Request) message;

        String serviceKey = MotanFrameworkUtil.getServiceKey(request);

        Provider<?> provider = providers.get(serviceKey);

        Method method = provider.lookupMethod(request.getMethodName(), request.getParamtersDesc());
        fillParamDesc(request, method);
        processLazyDeserialize(request, method);
        return call(request, provider);
    }

    protected Response call(Request request, Provider<?> provider) {
        try {
            return provider.call(request);
        } catch (Exception e) {
            DefaultResponse response = new DefaultResponse();
            response.setException(new MotanBizException("provider call process error", e));
            return response;
        }
    }

    public synchronized void addProvider(Provider<?> provider) {
        String serviceKey = MotanFrameworkUtil.getServiceKey(provider.getUrl());
        if (providers.containsKey(serviceKey)) {
            throw new MotanFrameworkException("provider alread exist: " + serviceKey);
        }

        providers.put(serviceKey, provider);
    }
}
  1. 有了请求分发处理器后,就可以作为 Server 的消息处理器。
public class DefaultRpcExporter<T> extends AbstractExporter<T> {
    protected final ConcurrentHashMap<String, ProviderMessageRouter> ipPort2RequestRouter;
    protected final ConcurrentHashMap<String, Exporter<?>> exporterMap;
    protected Server server;
    protected EndpointFactory endpointFactory;

    public DefaultRpcExporter(Provider<T> provider, URL url, ConcurrentHashMap<String, ProviderMessageRouter> ipPort2RequestRouter,
                              ConcurrentHashMap<String, Exporter<?>> exporterMap) {
        super(provider, url);
        this.exporterMap = exporterMap;
        this.ipPort2RequestRouter = ipPort2RequestRouter;

        // 初始化消息分发处理器
        ProviderMessageRouter requestRouter = initRequestRouter(url);
        endpointFactory =
                ExtensionLoader.getExtensionLoader(EndpointFactory.class).getExtension(
                        url.getParameter(URLParamType.endpointFactory.getName(), URLParamType.endpointFactory.getValue()));

        // 创建 Server
        server = endpointFactory.createServer(url, requestRouter);
    }
}


public abstract class AbstractEndpointFactory implements EndpointFactory {
    /** 维持share channel 的service列表 **/
    protected Map<String, Server> ipPort2ServerShareChannel = new HashMap<String, Server>();
    protected ConcurrentMap<Server, Set<String>> server2UrlsShareChannel = new ConcurrentHashMap<Server, Set<String>>();

    private EndpointManager heartbeatClientEndpointManager = null;

    public AbstractEndpointFactory() {
        // 心跳管理
        heartbeatClientEndpointManager = new HeartbeatClientEndpointManager();
        heartbeatClientEndpointManager.init();
    }

    public Server createServer(URL url, MessageHandler messageHandler) {
        messageHandler = getHeartbeatFactory(url).wrapMessageHandler(messageHandler);

        synchronized (ipPort2ServerShareChannel) {
            String ipPort = url.getServerPortStr();
            String protocolKey = MotanFrameworkUtil.getProtocolKey(url);

            boolean shareChannel =
                    url.getBooleanParameter(URLParamType.shareChannel.getName(), URLParamType.shareChannel.getBooleanValue());

            if (!shareChannel) { // 独享一个端口
                LoggerUtil.info(this.getClass().getSimpleName() + " create no_share_channel server: url={}", url);

                // 如果端口已经被使用了,使用该server bind 会有异常
                return innerCreateServer(url, messageHandler);
            }

            Server server = ipPort2ServerShareChannel.get(ipPort);

            if (server != null) {
                // 省略:无法共享 channel 的抛出异常。

                saveEndpoint2Urls(server2UrlsShareChannel, server, protocolKey);

                // 共享已经存在的 Server
                return server;
            }

            url = url.createCopy();
            url.setPath(""); // 共享server端口,由于有多个interfaces存在,所以把path设置为空

            // ipPort 上还没创建过 Server,创建一个新的
            server = innerCreateServer(url, messageHandler);

            ipPort2ServerShareChannel.put(ipPort, server);
            saveEndpoint2Urls(server2UrlsShareChannel, server, protocolKey);

            return server;
        }
    }
}

四、调用过程实现

  1. Cluster 收到调用请求后,通过自身的 HA 策略进行调用;

  2. HA 策略实现根据传入 LB 获取到一个 Referer 实例进行调用;

  3. 根据 Referer 的调用结果来执行 HA 策略。

// ClusterSpi.java
public Response call(Request request) {
    if (available.get()) {
        try {
            return haStrategy.call(request, loadBalance);
        } catch (Exception e) {
            return callFalse(request, e);
        }
    }
    return callFalse(request, new MotanServiceException(MotanErrorMsgConstant.SERVICE_UNFOUND));
}

@SpiMeta(name = "failfast")
public class FailfastHaStrategy<T> extends AbstractHaStrategy<T> {
    @Override
    public Response call(Request request, LoadBalance<T> loadBalance) {
        Referer<T> refer = loadBalance.select(request);
        return refer.call(request);
    }
}

4. Referer 通过持有的 Client 进行调用;

5. Client 调用时从 Channel 连接池获取到一个 Channel 后再进行写入请求;

6. Channel 首先给请求生成一个 ResponseFuture,注册到 Client 的 callbackMap 上,这样可以阻塞住调用线程,实现同步调用,也方便收到响应时找到对应的调用线程;

7. Channel 对于写入的请求根据 Client 在 Netty pipeline 上设置的 codec 组件进行编码、解码操作;

8. codec 会用 Serialization 组件进行序列化、反序列化;

9. 最终把序列化后的请求写入网络连接。

10. 服务端的网络监听接收到请求后,用 codec 组件进行反序列化、解码得到一个请求对象;

11. Server 的消息处理器 ProviderMessageRouter.handle(Channel channel, Object message) 方法里,根据请求信息得到要调用的 serviceKey, 从而找到对应的 provider ;

12. 再根据要调用的方法名、参数信息找到对应的实现方法,进行调用,得到响应;

13. 再把响应写回到网络连接里;

14. 客户端最终得到响应对象后,从 Client 维护的 callbackMap 找出请求 的 ResponseFuture 并设置结果,使调用线程可以返回。

欢迎关注我的微信公众号: coderbee笔记 ,可以更及时回复你的讨论。

Motan RPC 框架分析

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Professional JavaScript for Web Developers

Professional JavaScript for Web Developers

Nicholas C. Zakas / Wrox / 2009-1-14 / USD 49.99

This eagerly anticipated update to the breakout book on JavaScript offers you an in-depth look at the numerous advances to the techniques and technology of the JavaScript language. You'll see why Java......一起来看看 《Professional JavaScript for Web Developers》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具