内容简介:RPC——远程过程调用。简单的理解,调用远程服务,就像是调用本地方法一样。如果服务仅是对同一工程内使用,例如定义一个服务,接口UserService,实现UserServiceImpl,仅需创建一个对象,就可以轻松的调用。但如果把这样的服务暴露给其它工程调用,特别是目前流行的微服务框架,抽象出的模块化服务间可能要相互调用,这就需要“目前主流的RPC框架有很多,例如阿里的Dubbo,谷歌的gRPC等等。
前言
RPC——远程过程调用。简单的理解,调用远程服务,就像是调用本地方法一样。
如果服务仅是对同一工程内使用,例如定义一个服务,接口UserService,实现UserServiceImpl,仅需创建一个对象,就可以轻松的调用。但如果把这样的服务暴露给其它工程调用,特别是目前流行的微服务框架,抽象出的模块化服务间可能要相互调用,这就需要“ RPC框架 ”来实现了。
目前主流的RPC框架有很多,例如阿里的Dubbo,谷歌的gRPC等等。
各种实现的框架虽不尽相同,但主流的思想是相似的:都离不开 中心化的服务治理 (服务的发现与注册)。对此可选用的方案有多种,例如ZooKeeper、Eureka、 Redis 等等。本篇将采用 ZooKeeper 作为注册中心的方式,用代码讲解RPC调用的几个关键点。
源码地址: https://gitee.com/marvelcode/marvelcode-rpc.git
架构分析
接下来,将会对我所采用的架构进行 代入式的分析 ,如果对RPC框架有所了解的可以忽略此部分。
那就从RPC调用的源头方开始吧,来思考下一次调用的大致流程是怎样的:
-
首先,调用方发起服务调用。无论是通过服务的接口层,还是仅仅依靠服务对应的名称。
- 但无论是哪一种,必然要先知道该服务部署机器的IP、以及开放的端口号。这就是服务治理重要的一环, 服务发现。 即通过要调用的服务信息,来获取到这些服务所在的机器信息。
-
第二步,假使我们已经获取到了机器信息,接下来便是传入参数、调用服务、得到响应结果。
- 和普通HTTP接口不同的是,服务间调用传递的对象有可能是复杂的 Java 对象,仅靠Json格式的数据可能无法支撑(Ps:比如要穿一个Class对象)。这就引出了对象传输的 序列化与反序列化 。
-
第三步,假使服务提供方已经接收到服务调用的请求,最后只需根据请求的数据定位到提供服务的实例,进行反射调用即可。
- 既然能通过服务发现找到该服务,前提必然有 服务注册 这一步。当然,这一环节涉及到的服务定位,根据哪些数据能唯一标识一个服务呢?
- 分析完主要问题后,我们来看源码。
源码
服务提供方
根据上面的分析,我们来看下服务注册这一步的代码实现。
首先来看下用于标识对外暴露的服务的注解定义:
package com.menghao.rpc.provider.annotation; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * <p>服务提供方注解.<br> * <p>用于标识在接口上,只提供出接口jar</p> * * @author MarvelCode. */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) public @interface Provider { }
该注解起的作用就是个标记作用,在合适的时间将标识的服务注册到 ZooKeeper 上。来看下它是何时被解析的:
package com.menghao.rpc.spring; import com.menghao.rpc.RpcConstants; import com.menghao.rpc.provider.annotation.Provider; import com.menghao.rpc.provider.regisiter.ProviderRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; import java.util.HashMap; import java.util.Map; /** * <p>内部系统服务提供方注册.</br> * 获取内部所有的 @Provider 接口并注册到服务提供者仓库 * * @author MarvelCode * @see ProviderRepository */ public class ProviderPostProcessor implements BeanPostProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(ProviderPostProcessor.class); private Map<String, Class> candidates = new HashMap<>(8); private ProviderRepository providerRepository; public ProviderPostProcessor(ProviderRepository providerRepository) { this.providerRepository = providerRepository; } @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { for (Class sourceInterface : bean.getClass().getInterfaces()) { // 递归遍历所有父接口 recursiveInterface(sourceInterface, beanName, candidates); } return bean; } private void recursiveInterface(Class sourceInterface, String beanName, Map<String, Class> candidates) { // 接口被@Provider标识 if (sourceInterface.getAnnotation(Provider.class) != null) { LOGGER.info(RpcConstants.LOG_RPC_PREFIX + "find @Provider-" + sourceInterface.getName()); candidates.put(beanName, sourceInterface); } // 直到无父接口,递归结束 if (sourceInterface.getInterfaces().length == 0) { return; } // 否则递归遍历父接口 for (Class superInterface : sourceInterface.getInterfaces()) { recursiveInterface(superInterface, beanName, candidates); } } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { // 将符合条件的服务进行注册 if (candidates.containsKey(beanName)) { Class sourceInterface = candidates.get(beanName); providerRepository.register(sourceInterface, beanName, bean); } return bean; } }
借助 Spring提供的 bean 生命周期扩展接口,在 bean 的初始化前,遍历获取该实例的所有父接口,将被 @Provider 注解 标识的接口筛选出来,放入候选者列表(candidates :key-beanName,value-Interface)等待处理。在 bean 的初始化后,将之前筛选出的服务进行注册。
接着来看注册逻辑:
package com.menghao.rpc.provider.regisiter; import com.menghao.rpc.RpcConstants; import com.menghao.rpc.provider.exception.InitializationException; import com.menghao.rpc.provider.model.ProviderKey; import com.menghao.rpc.spring.ProviderPostProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.MessageFormat; import java.util.Collections; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** * <p>本地服务仓库.</br> * 存放了契约与调用bean的映射关系,用于rpc调用时取出,从而反射调用 * * @author MarvelCode * @see ProviderRegister */ public class ProviderRepository { private static final Logger LOGGER = LoggerFactory.getLogger(ProviderPostProcessor.class); private ConcurrentMap<ProviderKey, Object> providerMapping = new ConcurrentHashMap<>(); private Set<ProviderKey> providers = Collections.unmodifiableSet(providerMapping.keySet()); /** * 注册服务,维护契约与单例的映射关系 * * @param sourceInterface @Provider标识的接口 * @param implCode 服务实现的beanName * @param bean 服务单例 */ public void register(Class sourceInterface, String implCode, Object bean) { ProviderKey providerKey = new ProviderKey(sourceInterface.getName(), implCode); LOGGER.info(RpcConstants.LOG_RPC_PREFIX + "register provider-" + providerKey); if (providerMapping.containsKey(providerKey)) { throw new InitializationException(MessageFormat.format("contract: {} ,implCode: {} confilic", sourceInterface.getName(), implCode)); } providerMapping.putIfAbsent(providerKey, bean); } /** * 发现服务单例,根据契约与单例的映射关系查到单例,以便反射调用 * * @param providerKey 服务单例的键(契约) * @return 服务单例 */ public Object getProvider(ProviderKey providerKey) { return providerMapping.get(providerKey); } /** * 获取所有满足条件的(@Provider)的契约 * * @return 服务键集合 */ Set<ProviderKey> getProviders() { return providers; } }
该类作为服务实例的仓库,存放了 ProviderKey 与 服务实例的对应关系。这也是服务定位的逻辑所在,通过接口的全限定名,beanName作为服务的坐标( ProviderKey ),唯一确定一个服务。这样服务调用方将这些信息告诉服务提供方时,就可以知道反射调用哪个服务实例了。
服务信息收集完毕后,接下来就会进行 服务注册 :
package com.menghao.rpc.provider.regisiter; import com.menghao.rpc.provider.exception.InitializationException; import com.menghao.rpc.provider.model.ProviderKey; import com.menghao.rpc.util.ProviderHostUtils; import com.menghao.rpc.zookeeper.CuratorClient; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.util.Assert; import java.text.MessageFormat; /** * <p>服务提供方注册.</br> * 需要的信息:契约、实现、IP、端口 * * @author MarvelCode */ public class ProviderRegister implements ApplicationListener<ContextRefreshedEvent> { private CuratorClient curatorClient; private Integer port; public ProviderRegister(CuratorClient curatorClient, Integer port) { this.curatorClient = curatorClient; this.port = port; } @Override public void onApplicationEvent(ContextRefreshedEvent event) { ApplicationContext currentContext = event.getApplicationContext(); // 忽略子容器 if (currentContext.getParent() != null) { return; } ProviderRepository apiRepository = currentContext.getBean(ProviderRepository.class); Assert.notNull(apiRepository, "初始化失败"); String ip = ProviderHostUtils.getLocalHost(); String nodeValue = ip + ":" + port; for (ProviderKey providerKey : apiRepository.getProviders()) { String path = getNodePath(providerKey) + "/" + nodeValue; try { curatorClient.createEphemeralNode(path, null); } catch (Exception e) { e.printStackTrace(); throw new InitializationException(MessageFormat.format("create node: {} value: {} excecption", path, nodeValue)); } } } private String getNodePath(ProviderKey providerKey) { return "/" + providerKey.getContract() + ":" + providerKey.getImplCode(); } }
这一步利用了 Spring的事件机制 : 在容器启动后,将服务信息与机器信息注册到ZooKeeper上。
其中服务所在机器的IP是通过 ProviderHostUtils 工具类获取的,端口号是依赖外部配置文件指定的。
ZooKeeper 客户端使用了 Curator框架 ,该框架对ZooKeeper Api进行了封装,像重连机制、Watch的多次触发等都无需考虑。
注册路径的规则可自行设计,我的实现为:统一前置路径,例如 /marvel/rpc/provider,下一级路径为“接口全限定名 :beanName”,在下一级为“ip:端口号”。其实到这里已经很明显了,服务消费方只要根据服务的“契约”,就可获取指定节点下的所有子节点,也就是该服务对应的所有机器信息了。
服务消费方
介绍完服务提供方的启动流程,接下来看下服务消费方是如何进行服务调用的。
首先依然是一个注解,该注解用于将服务进行注入,类似Spring注入Bean一样:
package com.menghao.rpc.consumer.annotation; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * <p>Rpc服务消费方注解.<br> * * @author MarvelCode. */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.FIELD) public @interface Reference { /** * 用于指定implCode */ String value(); }
接下来是 @Reference 注解的处理逻辑,类比于 Spring 的@Autowired注解来看:
package com.menghao.rpc.spring; import com.menghao.rpc.consumer.annotation.Reference; import com.menghao.rpc.consumer.discovery.ReferenceRepository; import com.menghao.rpc.consumer.model.ReferenceKey; import org.springframework.beans.BeansException; import org.springframework.beans.PropertyValues; import org.springframework.beans.factory.BeanCreationException; import org.springframework.beans.factory.config.InstantiationAwareBeanPostProcessor; import java.beans.PropertyDescriptor; import java.lang.reflect.Field; /** * <p>@Reference注解增强:注入代理对象.</br> * <p>时机:属性填充时,代理对象由JDK动态代理实现</p> * * @author MarvelCode * @see ReferenceRepository */ public class ReferenceAutowired implements InstantiationAwareBeanPostProcessor { private ReferenceRepository referenceRepository; public ReferenceAutowired(ReferenceRepository referenceRepository) { this.referenceRepository = referenceRepository; } @Override public Object postProcessBeforeInstantiation(Class<?> beanClass, String beanName) throws BeansException { return null; } @Override public boolean postProcessAfterInstantiation(Object bean, String beanName) throws BeansException { return true; } @Override public PropertyValues postProcessPropertyValues(PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeansException { Class sourceClass = bean.getClass(); do { // 遍历成员变量找出所有被 @Reference注解的成员进行注入 for (Field field : sourceClass.getDeclaredFields()) { Reference reference = field.getAnnotation(Reference.class); if (reference != null) { // 找到对应的代理对象并赋值 Object candidate = findReference(field.getType(), reference); field.setAccessible(true); try { field.set(bean, candidate); } catch (IllegalAccessException e) { throw new BeanCreationException(beanName, "@Reference dependencies autowired failed", e); } } } sourceClass = sourceClass.getSuperclass(); } while (sourceClass != null && sourceClass != Object.class); return pvs; } private Object findReference(Class sourceInterface, Reference reference) { // 根据注入类型和指定实现 ReferenceKey referenceKey = new ReferenceKey(sourceInterface, reference.value()); return referenceRepository.getReference(referenceKey); } @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { return bean; } }
同样借助了 Bean 生命周期扩展接口,不过这次处理是在“ 属性填充 ”的阶段,将 @Reference 标识的服务接口进行代理赋值。即,通过代理拦截该接口的所有方法调用,转而以Http请求的方式请求服务提供方(本篇为Http方式,以后会提供Tcp方式的讲解,主要逻辑大致相似)。
同提供方相似的,消费方同样有仓库 ReferenceRepository ,通过接口的全限定名,implCode(对应服务端的beanName)构成 ProviderKey :
package com.menghao.rpc.consumer.discovery; import com.menghao.rpc.consumer.JdkProxyFactory; import com.menghao.rpc.consumer.RpcAgent; import com.menghao.rpc.consumer.model.HttpReferenceAgent; import com.menghao.rpc.consumer.model.ReferenceKey; import org.springframework.web.client.RestTemplate; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** * <p>Http方式代理对象仓库.</br> * * @author MarvelCode */ public class HttpReferenceRepository implements ReferenceRepository { private JdkProxyFactory proxyFactory; private ProviderDiscovery providerDiscovery; private RestTemplate restTemplate; private final ConcurrentMap<ReferenceKey, RpcAgent> referenceCache = new ConcurrentHashMap<>(8); public HttpReferenceRepository(JdkProxyFactory proxyFactory, ProviderDiscovery providerDiscovery, RestTemplate restTemplate) { this.proxyFactory = proxyFactory; this.providerDiscovery = providerDiscovery; this.restTemplate = restTemplate; } @Override public Object getReference(ReferenceKey referenceKey) { if (referenceCache.get(referenceKey) != null) { return proxyFactory.getProxy(referenceCache.get(referenceKey)); } RpcAgent referenceAgent = new HttpReferenceAgent(referenceKey, restTemplate); // Http方式 @Reference 代理初始化 providerDiscovery.initReferenceAgent(referenceAgent); referenceCache.putIfAbsent(referenceKey, referenceAgent); return proxyFactory.getProxy(referenceAgent); } }
这里有一层缓存,意图是将已经初始化的ReferenceAgent直接代理返回,否则初始化过程需要从ZooKeeper获取一次机器信息,缓存后就可以避免同一个服务的多次ZK请求。 ReferenceAgent 就是真正发送Http请求代理类,用来与服务通信以获取响应结果:
package com.menghao.rpc.consumer.model; import com.menghao.rpc.RpcConstants; import com.menghao.rpc.consumer.ReferenceAgent; import com.menghao.rpc.consumer.balance.LoadBalancer; import com.menghao.rpc.consumer.balance.RandomLoadBalancer; import com.menghao.rpc.provider.exception.InvokeException; import com.menghao.rpc.provider.model.RpcResponse; import lombok.Getter; import lombok.Setter; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.client.RestTemplate; import java.lang.reflect.Method; import java.text.MessageFormat; import java.util.List; /** * <p>@Reference代理对象.</br> * <p>调用原始接口的任意方法会被该类的invoke方法代理:使用RestTemplate发送请求</p> * <p>sourceInterface/implCode:唯一标识一个服务</p> * * @author MarvelCode */ public class HttpReferenceAgent implements ReferenceAgent { @Getter private Class sourceInterface; @Getter private String implCode; @Setter private List<String> providerHosts; private RestTemplate restTemplate; private LoadBalancer defaultBalancer = new RandomLoadBalancer(); public HttpReferenceAgent(ReferenceKey referenceKey, RestTemplate restTemplate) { this.sourceInterface = referenceKey.getSourceInterface(); this.implCode = referenceKey.getName(); this.restTemplate = restTemplate; } @Override public Object invoke(Method method, Object[] args) { // 构造请求参数 RpcRequest apiParam = makeParam(method, args); if (providerHosts == null || providerHosts.size() == 0) { throw new InvokeException("There are currently no service providers available"); } String url = select(providerHosts); // 发送Http请求 ResponseEntity<RpcResponse> responseEntity = restTemplate.postForEntity(url, apiParam, RpcResponse.class); if (responseEntity.getStatusCode() == HttpStatus.OK) { if (responseEntity.getBody().getResult() != null) { return responseEntity.getBody().getResult(); } if (responseEntity.getBody().getThrowable() != null) { throw new InvokeException(responseEntity.getBody().getThrowable()); } } return new InvokeException(MessageFormat.format(RpcConstants.LOG_RPC_PREFIX + "invoke {} response status code {}", sourceInterface.getName() + ":" + method.getName(), responseEntity.getStatusCode())); } private RpcRequest makeParam(Method method, Object[] args) { return RpcRequest.builder() .method(method.getName()) .contract(sourceInterface.getName()) .implCode(implCode) .args(args) .argsType(method.getParameterTypes()) .build(); } private String select(List<String> providerHosts) { // 负载均衡 String ip = defaultBalancer.select(providerHosts); return "http://" + ip + "/marvel/rpc/entrance"; } }
这个类封装了具体的调用过程,包含了请求实体的封装、负载均衡(随机访问),最后通过 Spring RestTemplate 发送请求(其中序列化与反序列化后面再展开讲解)。接下来看下代理对象是如何跟服务接口建立关系的,使用到了 JDK动态代理 :
package com.menghao.rpc.consumer; import com.menghao.rpc.consumer.model.ReferenceAgent; import com.menghao.rpc.provider.exception.InitializationException; import com.menghao.rpc.provider.exception.InvokeException; import lombok.Getter; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.text.MessageFormat; /** * <p>JDK动态代理工厂</br> * <p>对@Reference标识的成员变量进行动态代理,最终进行http请求</p> * * @author MarvelCode * @see ReferenceAgent */ public class JdkProxyFactory { /** * 根据@Reference创建代理对象 * * @param referenceAgent 实际处理对象 * @return 代理后的对象 */ public <T> T getProxy(ReferenceAgent referenceAgent) { return new RpcProxy<T>(referenceAgent).getProxy(); } private static class RpcProxy<T> implements InvocationHandler { private ReferenceAgent referenceAgent; @Getter private T proxy; private static Method hashcodeMethod; private static Method toStringMethod; private static Method equalsMethod; static { try { hashcodeMethod = Object.class.getMethod("hashCode"); toStringMethod = Object.class.getMethod("toString"); equalsMethod = Object.class.getMethod("equals", Object.class); } catch (NoSuchMethodException e) { throw new InitializationException(""); } } @SuppressWarnings("unchecked") RpcProxy(ReferenceAgent referenceAgent) { this.referenceAgent = referenceAgent; proxy = (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{referenceAgent.getSourceInterface()}, this); } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (method.getDeclaringClass() == Object.class) { if (hashcodeMethod == method) { return System.identityHashCode(proxy); } if (toStringMethod == method) { return referenceAgent.toString(); } if (equalsMethod == method) { return proxy == args[0]; } return new InvokeException(MessageFormat.format("method {} not support", method.getName())); } return referenceAgent.invoke(method, args); } } }
可以看到,除了 toString、hashCode、equals方法外,其余方法调用都转而调用 ReferenceAgent.invoke 。
到目前为之,大体的调用逻辑已经逐渐清晰,但最重要的一步,要向谁去请求呢?这时需要依赖 ZooKeeper 进行 服务发现 :
package com.menghao.rpc.consumer.discovery; import com.menghao.rpc.consumer.model.ReferenceAgent; import com.menghao.rpc.zookeeper.ChildChangeListener; import com.menghao.rpc.zookeeper.CuratorClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * <p>对于 @Reference代理所需信息的初始化.</br> * <p>需要根据契约及实现找到服务提供方机器信息</p> * * @author MarvelCode */ public class ProviderDiscovery { private final static Logger LOG = LoggerFactory.getLogger(ProviderDiscovery.class); private CuratorClient curatorClient; public ProviderDiscovery(CuratorClient curatorClient) { this.curatorClient = curatorClient; } public void initReferenceAgent(ReferenceAgent referenceAgent) { detect(referenceAgent); } private void detect(final ReferenceAgent referenceAgent) { // 服务信息节点路径 final String nodePath = "/" + referenceAgent.getContract() + ":" + referenceAgent.getImplCode(); LOG.info("look up zookeeper node {}", nodePath); try { // 监听 ZK节点 curatorClient.addPathListener(nodePath, new ChildChangeListener() { @Override public void onChange(String path, String data) { try { referenceAgent.setProviderHosts(curatorClient.getChildren(nodePath)); } catch (Exception e) { e.printStackTrace(); } } }); // 设置服务机器信息列表 referenceAgent.setProviderHosts(curatorClient.getChildren(nodePath)); } catch (Exception e) { LOG.error("zookeeper getNode child {} exception", nodePath, e); } } }
可以看到,通过服务“契约”,借助 Curator 我们很容易可以获取到指定服务的服务信息列表。当然我们还需要 设置对指定节点的监听 ,当服务所在的机器宕机时,临时服务节点就会下线,我们需要重新获取机器列表,以保证请求的有效性。
序列化与反序列化
最后就是请求的发送与响应了,来看下请求实体类定义:
package com.menghao.rpc.consumer.model; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; /** * <p>Rpc统一请求封装.</br> * <p>包含了服务契约、调用方法名、入参类型、入参数组</p> * * @author MarvelCode */ @Data @Builder @AllArgsConstructor @NoArgsConstructor public class RpcRequest implements Serializable { private String contract; private String implCode; private String method; private Class[] argsType; private Object[] args; }
包含了:接口的全限定名、implCode,这两者用于从容器中定位到服务。除此之外就是反射调用需要的三个参数了,方法、入参类型、以及具体的参数。
这些数据将会被序列化后传输到服务提供方,这里将使用 Hessian 序列化框架。考虑到本次采用的 Http 请求的方式,为了将调用代码对序列化无感知,借助了 Spring 的 HttpMessageConverter ,将Http请求和响应自动的序列化与反序列化。
package com.menghao.rpc.spring; import com.menghao.rpc.consumer.model.RpcRequest; import com.menghao.rpc.serialize.hessian.HessianObjectInput; import com.menghao.rpc.serialize.hessian.HessianObjectOutput; import org.springframework.http.HttpInputMessage; import org.springframework.http.HttpOutputMessage; import org.springframework.http.MediaType; import org.springframework.http.converter.AbstractHttpMessageConverter; import org.springframework.http.converter.HttpMessageNotReadableException; import org.springframework.http.converter.HttpMessageNotWritableException; import java.io.IOException; /** * <p>RpcRequest消息转换器.<br> * * @author MarvelCode. */ public class RpcRequestConvert extends AbstractHttpMessageConverter<RpcRequest> { public RpcRequestConvert(MediaType supportedMediaType) { super(supportedMediaType); } @Override protected boolean supports(Class<?> clazz) { return RpcRequest.class.isAssignableFrom(clazz); } @Override protected RpcRequest readInternal(Class<? extends RpcRequest> clazz, HttpInputMessage inputMessage) throws IOException, HttpMessageNotReadableException { HessianObjectInput input = new HessianObjectInput(inputMessage.getBody()); return (RpcRequest) input.readObject(RpcRequest.class); } @Override protected void writeInternal(RpcRequest rpcRequest, HttpOutputMessage outputMessage) throws IOException, HttpMessageNotWritableException { HessianObjectOutput output = new HessianObjectOutput(outputMessage.getBody()); output.writeObject(rpcRequest); output.flush(); } }
package com.menghao.rpc.spring; import com.menghao.rpc.provider.model.RpcResponse; import com.menghao.rpc.serialize.hessian.HessianObjectInput; import com.menghao.rpc.serialize.hessian.HessianObjectOutput; import org.springframework.http.HttpInputMessage; import org.springframework.http.HttpOutputMessage; import org.springframework.http.MediaType; import org.springframework.http.converter.AbstractHttpMessageConverter; import org.springframework.http.converter.HttpMessageNotReadableException; import org.springframework.http.converter.HttpMessageNotWritableException; import java.io.IOException; /** * <p>RpcResponse消息转换器.<br> * * @author MarvelCode. */ public class RpcResponseConvert extends AbstractHttpMessageConverter<RpcResponse> { public RpcResponseConvert(MediaType supportedMediaType) { super(supportedMediaType); } @Override protected boolean supports(Class<?> clazz) { return RpcResponse.class.isAssignableFrom(clazz); } @Override protected RpcResponse readInternal(Class<? extends RpcResponse> clazz, HttpInputMessage inputMessage) throws IOException, HttpMessageNotReadableException { HessianObjectInput input = new HessianObjectInput(inputMessage.getBody()); return (RpcResponse) input.readObject(RpcResponse.class); } @Override protected void writeInternal(RpcResponse rpcRequest, HttpOutputMessage outputMessage) throws IOException, HttpMessageNotWritableException { HessianObjectOutput output = new HessianObjectOutput(outputMessage.getBody()); output.writeObject(rpcRequest); output.flush(); } }
以上这两个转换器,装配到 RestTemplate、以及 mvc后,就能实现自动序列化的功能。接下来看下服务提供方接到请求后是如何处理的:
package com.menghao.rpc.provider; import com.menghao.rpc.consumer.model.RpcRequest; import com.menghao.rpc.provider.exception.InvokeException; import com.menghao.rpc.provider.model.ProviderKey; import com.menghao.rpc.provider.model.RpcResponse; import com.menghao.rpc.provider.regisiter.ProviderRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.ReflectionUtils; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; import java.lang.reflect.Method; /** * <p>服务提供方统一入口.</br> * * @author MarvelCode */ @RequestMapping("/marvel/rpc/entrance") public class HttpProviderEntrance { private static final Logger LOGGER = LoggerFactory.getLogger(HttpProviderEntrance.class); private ProviderRepository providerRepository; public HttpProviderEntrance(ProviderRepository providerRepository) { this.providerRepository = providerRepository; } @ResponseBody @RequestMapping(method = RequestMethod.POST) public RpcResponse api(@RequestBody RpcRequest rpcRequest) { // 通过 contract、implCode定位服务bean ProviderKey providerKey = new ProviderKey(rpcRequest.getContract(), rpcRequest.getImplCode()); Object bean = providerRepository.getProvider(providerKey); if (bean == null) { return new RpcResponse(new InvokeException("")); } try { // 反射调用服务方法 Method method = ReflectionUtils.findMethod(bean.getClass(), rpcRequest.getMethod(), rpcRequest.getArgsType()); Object result = ReflectionUtils.invokeMethod(method, bean, rpcRequest.getArgs()); return new RpcResponse(result); } catch (Throwable e) { LOGGER.error("invoke exception,", e); return new RpcResponse(e); } } }
响应实体:
package com.menghao.rpc.provider.model; import lombok.Data; import java.io.Serializable; /** * <p>Rpc调用响应实体封装.<br> * * @author MarvelCode. * @version 2018/8/1. */ @Data public class RpcResponse implements Serializable { private Object result; private Throwable throwable; public RpcResponse(Object result) { this.result = result; } public RpcResponse(Throwable throwable) { this.throwable = throwable; } }
至此,基于Http的远程服务调用的大致流程就结束了。
配置
该工程基于 Spring Boot开发,配置采用自配置方式。
依赖
<dependency> <groupId>com.menghao</groupId> <artifactId>marvelcode-rpc</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
服务提供方
# 自定义端口 server.port= # ZooKeeper所在机器 ip:port marvel.rpc.zkServerHost = marvel.rpc.consumerEnable=false marvel.rpc.providerEnable=true marvel.rpc.type=http
服务消费方
# ZooKeeper所在机器 ip:port marvel.rpc.zkServerHost= marvel.rpc.providerEnable=false marvel.rpc.consumerEnable=true marvel.rpc.type=http
总结
代码均已测试,这里测试结果就不赘述了。各位可以本地先简写一个服务接口层,在服务提供方实现,并部署等待消费方调用。消费方引入服务接口层,编写调用代码,同样部署。工程运行依赖Spring Boot框架。
本篇目的在于讲解RPC调用实现的一种思路,很多功能尚不完备,例如负载均衡采用了简单的随机访问方式。完整的Rpc框架需要包括权重负载、幂等、重试等额外功能。另外,采用 Http 方式的远程调用性能上,不如 Tcp 方式的。
有时间我会整理基于 Tcp 方式远程服务调用,敬请期待。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。