内容简介:SPI 机制支持 JDK 的 ServiceProvider 机制并进行了扩展,接口的实现放在每个实现类可以加上用来和注册中心交互,包括服务注册、订阅服务、接收服务变更通知、发送心跳等功能。
一、框架简介
Motan 是新浪微博开源的一套高性能、易于使用的分布式远程服务调用(RPC)框架。
Motan 的核心模块交互关系如下:
二、核心模块介绍
2.0 SPI 机制
SPI 机制支持 JDK 的 ServiceProvider 机制并进行了扩展,接口的实现放在 META-INF/services/
目录下以接口的完全类名命名的文件里,每个实现的完全类名占一行。
每个实现类可以加上 SpiMeta(name="implName")
来指定名称, ExtensionLoader
可以通过接口类型和命名从多个实现中找到指定实现。
2.1 register
用来和注册中心交互,包括服务注册、订阅服务、接收服务变更通知、发送心跳等功能。
// 服务发现功能的抽象 public interface DiscoveryService { // 订阅到注册中心 void subscribe(URL url, NotifyListener listener); // 取消订阅 void unsubscribe(URL url, NotifyListener listener); // 根据 url 描述的服务从注册中心获取该服务的所有提供者的信息 List<URL> discover(URL url); } // 服务注册功能的抽象 public interface RegistryService { void register(URL url); void unregister(URL url); void available(URL url); void unavailable(URL url); Collection<URL> getRegisteredServiceUrls(); } // 表示一个注册中心的抽象 @Spi(scope = Scope.SINGLETON) public interface Registry extends RegistryService, DiscoveryService { // 获取该注册中心的描述信息 URL getUrl(); }
2.2 protocol
用来进行 RPC 服务描述和配置管理,可以通过 Filter 进行扩展、功能增强。
@Spi(scope = Scope.SINGLETON) public interface Protocol { // 暴露服务 <T> Exporter<T> export(Provider<T> provider, URL url); // 调用端收到注册中心通知的服务提供者实例信息 serviceUrl 后, // 根据 serviceUrl 创建对指定实例的引用 <T> Referer<T> refer(Class<T> clz, URL url, URL serviceUrl); void destroy(); } // Filter 用于进行功能扩展 @Spi public interface Filter { Response filter(Caller<?> caller, Request request); } // 把 Filter 应用到 protocol 上,进行功能增强 public class ProtocolFilterDecorator implements Protocol { private Protocol protocol; public ProtocolFilterDecorator(Protocol protocol) { this.protocol = protocol; } @Override public <T> Exporter<T> export(Provider<T> provider, URL url) { // 对原始的 provider 进行包装增强 return protocol.export(decorateWithFilter(provider, url), url); } @Override public <T> Referer<T> refer(Class<T> clz, URL url, URL serviceUrl) { // 对原始 protocol 生成的 Referer 进行包装增强,这样可以在调用前、后进行处理。 return decorateWithFilter(protocol.refer(clz, url, serviceUrl), url); } @Override public void destroy() { protocol.destroy(); } // 省略其它代码 }
2.3 serialize
此模块负责把请求、响应进行序列化和反序列化。默认采用 Hessian2 。
@Spi(scope=Scope.PROTOTYPE) public interface Serialization { byte[] serialize(Object obj) throws IOException; <T> T deserialize(byte[] bytes, Class<T> clz) throws IOException; byte[] serializeMulti(Object[] data) throws IOException; Object[] deserializeMulti(byte[] data, Class<?>[] classes) throws IOException; /** * serializaion的唯一编号,用于传输协议中指定序列化方式。每种序列化的编号必须唯一。 * @return 由于编码规范限制,序列化方式最大支持32种,因此返回值必须在0-31之间。 */ int getSerializationNumber(); }
2.4 transport
实现远程通信,默认采用 Netty NIO 的 TCP 长链接实现。
2.5 cluster
仅供 client 端使用的模块。
ClusterSupport
收到注册中心的通知,根据这些服务提供者的信息生成一组 Referer
对象,然后用这组 Referer 通过 Cluster,Cluster 再用这些 Referer 刷新持有的 LoadBalance 。
构建 Referer
的时候会实例化 Client
,以完成远程调用。 Client
位于传输层,完成请求、响应的编码、解码工作,然后进行序列化、反序列化,再通过 Channel
进行网络传输。
二、构建客户端代理
创建服务的代理的过程:
1. 收集服务的描述信息 refUrl ;
2. 根据 refUrl 和 registryUrl 创建 ClusterSupport;
3. 根据 interfaceClass 和 ClusterSupport.getCluster() 创建代理对象;
4. 以代理对象 RefererInvocationHandler 为例,它持有 Cluster 就可以进行元旦调用。
public class DefaultRpcReferer<T> extends AbstractReferer<T> { protected Client client; protected EndpointFactory endpointFactory; public DefaultRpcReferer(Class<T> clz, URL url, URL serviceUrl) { super(clz, url, serviceUrl); endpointFactory = ExtensionLoader.getExtensionLoader(EndpointFactory.class).getExtension( url.getParameter(URLParamType.endpointFactory.getName(), URLParamType.endpointFactory.getValue())); // 创建 Client client = endpointFactory.createClient(url); } @Override protected Response doCall(Request request) { try { // 为了能够实现跨group请求,需要使用server端的group。 request.setAttachment(URLParamType.group.getName(), serviceUrl.getGroup()); // 通过 Client 完成远程调用 return client.request(request); } catch (TransportException exception) { throw new MotanServiceException("DefaultRpcReferer call Error: url=" + url.getUri(), exception); } } // 省略其他代码 }
三、服务端暴露服务
服务端对外暴露服务要考虑两点:
-
一个网络端口有多个服务提供者,如何找到目标提供者?
在transport层实现,
ProviderMessageRouter
相当于一个请求分发器,把请求分发调用目前提供者。分发是根据服务描述 URL 里的关键信息生成 serviceKey (组成:group + "/" + interfaceName + "/" + version
)来确定提供者,再根据请求里的方法名、方法参数描述来定位要调用的 Method 。
public class ProviderMessageRouter implements MessageHandler { // serviceKey 到具体服务提供者的映射 protected Map<String, Provider<?>> providers = new HashMap<>(); public Object handle(Channel channel, Object message) { Request request = (Request) message; String serviceKey = MotanFrameworkUtil.getServiceKey(request); Provider<?> provider = providers.get(serviceKey); Method method = provider.lookupMethod(request.getMethodName(), request.getParamtersDesc()); fillParamDesc(request, method); processLazyDeserialize(request, method); return call(request, provider); } protected Response call(Request request, Provider<?> provider) { try { return provider.call(request); } catch (Exception e) { DefaultResponse response = new DefaultResponse(); response.setException(new MotanBizException("provider call process error", e)); return response; } } public synchronized void addProvider(Provider<?> provider) { String serviceKey = MotanFrameworkUtil.getServiceKey(provider.getUrl()); if (providers.containsKey(serviceKey)) { throw new MotanFrameworkException("provider alread exist: " + serviceKey); } providers.put(serviceKey, provider); } }
- 有了请求分发处理器后,就可以作为 Server 的消息处理器。
public class DefaultRpcExporter<T> extends AbstractExporter<T> { protected final ConcurrentHashMap<String, ProviderMessageRouter> ipPort2RequestRouter; protected final ConcurrentHashMap<String, Exporter<?>> exporterMap; protected Server server; protected EndpointFactory endpointFactory; public DefaultRpcExporter(Provider<T> provider, URL url, ConcurrentHashMap<String, ProviderMessageRouter> ipPort2RequestRouter, ConcurrentHashMap<String, Exporter<?>> exporterMap) { super(provider, url); this.exporterMap = exporterMap; this.ipPort2RequestRouter = ipPort2RequestRouter; // 初始化消息分发处理器 ProviderMessageRouter requestRouter = initRequestRouter(url); endpointFactory = ExtensionLoader.getExtensionLoader(EndpointFactory.class).getExtension( url.getParameter(URLParamType.endpointFactory.getName(), URLParamType.endpointFactory.getValue())); // 创建 Server server = endpointFactory.createServer(url, requestRouter); } } public abstract class AbstractEndpointFactory implements EndpointFactory { /** 维持share channel 的service列表 **/ protected Map<String, Server> ipPort2ServerShareChannel = new HashMap<String, Server>(); protected ConcurrentMap<Server, Set<String>> server2UrlsShareChannel = new ConcurrentHashMap<Server, Set<String>>(); private EndpointManager heartbeatClientEndpointManager = null; public AbstractEndpointFactory() { // 心跳管理 heartbeatClientEndpointManager = new HeartbeatClientEndpointManager(); heartbeatClientEndpointManager.init(); } public Server createServer(URL url, MessageHandler messageHandler) { messageHandler = getHeartbeatFactory(url).wrapMessageHandler(messageHandler); synchronized (ipPort2ServerShareChannel) { String ipPort = url.getServerPortStr(); String protocolKey = MotanFrameworkUtil.getProtocolKey(url); boolean shareChannel = url.getBooleanParameter(URLParamType.shareChannel.getName(), URLParamType.shareChannel.getBooleanValue()); if (!shareChannel) { // 独享一个端口 LoggerUtil.info(this.getClass().getSimpleName() + " create no_share_channel server: url={}", url); // 如果端口已经被使用了,使用该server bind 会有异常 return innerCreateServer(url, messageHandler); } Server server = ipPort2ServerShareChannel.get(ipPort); if (server != null) { // 省略:无法共享 channel 的抛出异常。 saveEndpoint2Urls(server2UrlsShareChannel, server, protocolKey); // 共享已经存在的 Server return server; } url = url.createCopy(); url.setPath(""); // 共享server端口,由于有多个interfaces存在,所以把path设置为空 // ipPort 上还没创建过 Server,创建一个新的 server = innerCreateServer(url, messageHandler); ipPort2ServerShareChannel.put(ipPort, server); saveEndpoint2Urls(server2UrlsShareChannel, server, protocolKey); return server; } } }
四、调用过程实现
-
Cluster 收到调用请求后,通过自身的 HA 策略进行调用;
-
HA 策略实现根据传入 LB 获取到一个 Referer 实例进行调用;
-
根据 Referer 的调用结果来执行 HA 策略。
// ClusterSpi.java public Response call(Request request) { if (available.get()) { try { return haStrategy.call(request, loadBalance); } catch (Exception e) { return callFalse(request, e); } } return callFalse(request, new MotanServiceException(MotanErrorMsgConstant.SERVICE_UNFOUND)); } @SpiMeta(name = "failfast") public class FailfastHaStrategy<T> extends AbstractHaStrategy<T> { @Override public Response call(Request request, LoadBalance<T> loadBalance) { Referer<T> refer = loadBalance.select(request); return refer.call(request); } }
4. Referer 通过持有的 Client 进行调用;
5. Client 调用时从 Channel 连接池获取到一个 Channel 后再进行写入请求;
6. Channel 首先给请求生成一个 ResponseFuture,注册到 Client 的 callbackMap 上,这样可以阻塞住调用线程,实现同步调用,也方便收到响应时找到对应的调用线程;
7. Channel 对于写入的请求根据 Client 在 Netty pipeline 上设置的 codec 组件进行编码、解码操作;
8. codec 会用 Serialization 组件进行序列化、反序列化;
9. 最终把序列化后的请求写入网络连接。
10. 服务端的网络监听接收到请求后,用 codec 组件进行反序列化、解码得到一个请求对象;
11. Server 的消息处理器 ProviderMessageRouter.handle(Channel channel, Object message)
方法里,根据请求信息得到要调用的 serviceKey, 从而找到对应的 provider ;
12. 再根据要调用的方法名、参数信息找到对应的实现方法,进行调用,得到响应;
13. 再把响应写回到网络连接里;
14. 客户端最终得到响应对象后,从 Client 维护的 callbackMap 找出请求 的 ResponseFuture 并设置结果,使调用线程可以返回。
欢迎关注我的微信公众号: coderbee笔记 ,可以更及时回复你的讨论。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 数据分析的三大框架:底层技术、分析建模、工具选择
- beego框架代码分析
- beego框架代码分析
- 笔记 | 数据分析学习框架
- Flutter框架分析(六)-- 布局
- Flutter框架分析(七)-- 绘制
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Python自然语言处理
(英)伯德、(英)克莱因、(美)洛普 / 东南大学出版社 / 2010-6 / 64.00元
《Python自然语言处理(影印版)》提供了非常易学的自然语言处理入门介绍,该领域涵盖从文本和电子邮件预测过滤,到自动总结和翻译等多种语言处理技术。在《Python自然语言处理(影印版)》中,你将学会编写Python程序处理大量非结构化文本。你还将通过使用综合语言数据结构访问含有丰富注释的数据集,理解用于分析书面通信内容和结构的主要算法。 《Python自然语言处理》准备了充足的示例和练习,......一起来看看 《Python自然语言处理》 这本书的介绍吧!