内容简介:RPC——远程过程调用。简单的理解,调用远程服务,就像是调用本地方法一样。如果服务仅是对同一工程内使用,例如定义一个服务,接口UserService,实现UserServiceImpl,仅需创建一个对象,就可以轻松的调用。但如果把这样的服务暴露给其它工程调用,特别是目前流行的微服务框架,抽象出的模块化服务间可能要相互调用,这就需要“目前主流的RPC框架有很多,例如阿里的Dubbo,谷歌的gRPC等等。
前言
RPC——远程过程调用。简单的理解,调用远程服务,就像是调用本地方法一样。
如果服务仅是对同一工程内使用,例如定义一个服务,接口UserService,实现UserServiceImpl,仅需创建一个对象,就可以轻松的调用。但如果把这样的服务暴露给其它工程调用,特别是目前流行的微服务框架,抽象出的模块化服务间可能要相互调用,这就需要“ RPC框架 ”来实现了。
目前主流的RPC框架有很多,例如阿里的Dubbo,谷歌的gRPC等等。
各种实现的框架虽不尽相同,但主流的思想是相似的:都离不开 中心化的服务治理 (服务的发现与注册)。对此可选用的方案有多种,例如ZooKeeper、Eureka、 Redis 等等。本篇将采用 ZooKeeper 作为注册中心的方式,用代码讲解RPC调用的几个关键点。
源码地址: https://gitee.com/marvelcode/marvelcode-rpc.git
架构分析
接下来,将会对我所采用的架构进行 代入式的分析 ,如果对RPC框架有所了解的可以忽略此部分。
那就从RPC调用的源头方开始吧,来思考下一次调用的大致流程是怎样的:
-
首先,调用方发起服务调用。无论是通过服务的接口层,还是仅仅依靠服务对应的名称。
- 但无论是哪一种,必然要先知道该服务部署机器的IP、以及开放的端口号。这就是服务治理重要的一环, 服务发现。 即通过要调用的服务信息,来获取到这些服务所在的机器信息。
-
第二步,假使我们已经获取到了机器信息,接下来便是传入参数、调用服务、得到响应结果。
- 和普通HTTP接口不同的是,服务间调用传递的对象有可能是复杂的 Java 对象,仅靠Json格式的数据可能无法支撑(Ps:比如要穿一个Class对象)。这就引出了对象传输的 序列化与反序列化 。
-
第三步,假使服务提供方已经接收到服务调用的请求,最后只需根据请求的数据定位到提供服务的实例,进行反射调用即可。
- 既然能通过服务发现找到该服务,前提必然有 服务注册 这一步。当然,这一环节涉及到的服务定位,根据哪些数据能唯一标识一个服务呢?
- 分析完主要问题后,我们来看源码。
源码
服务提供方
根据上面的分析,我们来看下服务注册这一步的代码实现。
首先来看下用于标识对外暴露的服务的注解定义:
package com.menghao.rpc.provider.annotation; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * <p>服务提供方注解.<br> * <p>用于标识在接口上,只提供出接口jar</p> * * @author MarvelCode. */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) public @interface Provider { }
该注解起的作用就是个标记作用,在合适的时间将标识的服务注册到 ZooKeeper 上。来看下它是何时被解析的:
package com.menghao.rpc.spring; import com.menghao.rpc.RpcConstants; import com.menghao.rpc.provider.annotation.Provider; import com.menghao.rpc.provider.regisiter.ProviderRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; import java.util.HashMap; import java.util.Map; /** * <p>内部系统服务提供方注册.</br> * 获取内部所有的 @Provider 接口并注册到服务提供者仓库 * * @author MarvelCode * @see ProviderRepository */ public class ProviderPostProcessor implements BeanPostProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(ProviderPostProcessor.class); private Map<String, Class> candidates = new HashMap<>(8); private ProviderRepository providerRepository; public ProviderPostProcessor(ProviderRepository providerRepository) { this.providerRepository = providerRepository; } @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { for (Class sourceInterface : bean.getClass().getInterfaces()) { // 递归遍历所有父接口 recursiveInterface(sourceInterface, beanName, candidates); } return bean; } private void recursiveInterface(Class sourceInterface, String beanName, Map<String, Class> candidates) { // 接口被@Provider标识 if (sourceInterface.getAnnotation(Provider.class) != null) { LOGGER.info(RpcConstants.LOG_RPC_PREFIX + "find @Provider-" + sourceInterface.getName()); candidates.put(beanName, sourceInterface); } // 直到无父接口,递归结束 if (sourceInterface.getInterfaces().length == 0) { return; } // 否则递归遍历父接口 for (Class superInterface : sourceInterface.getInterfaces()) { recursiveInterface(superInterface, beanName, candidates); } } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { // 将符合条件的服务进行注册 if (candidates.containsKey(beanName)) { Class sourceInterface = candidates.get(beanName); providerRepository.register(sourceInterface, beanName, bean); } return bean; } }
借助 Spring提供的 bean 生命周期扩展接口,在 bean 的初始化前,遍历获取该实例的所有父接口,将被 @Provider 注解 标识的接口筛选出来,放入候选者列表(candidates :key-beanName,value-Interface)等待处理。在 bean 的初始化后,将之前筛选出的服务进行注册。
接着来看注册逻辑:
package com.menghao.rpc.provider.regisiter; import com.menghao.rpc.RpcConstants; import com.menghao.rpc.provider.exception.InitializationException; import com.menghao.rpc.provider.model.ProviderKey; import com.menghao.rpc.spring.ProviderPostProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.MessageFormat; import java.util.Collections; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** * <p>本地服务仓库.</br> * 存放了契约与调用bean的映射关系,用于rpc调用时取出,从而反射调用 * * @author MarvelCode * @see ProviderRegister */ public class ProviderRepository { private static final Logger LOGGER = LoggerFactory.getLogger(ProviderPostProcessor.class); private ConcurrentMap<ProviderKey, Object> providerMapping = new ConcurrentHashMap<>(); private Set<ProviderKey> providers = Collections.unmodifiableSet(providerMapping.keySet()); /** * 注册服务,维护契约与单例的映射关系 * * @param sourceInterface @Provider标识的接口 * @param implCode 服务实现的beanName * @param bean 服务单例 */ public void register(Class sourceInterface, String implCode, Object bean) { ProviderKey providerKey = new ProviderKey(sourceInterface.getName(), implCode); LOGGER.info(RpcConstants.LOG_RPC_PREFIX + "register provider-" + providerKey); if (providerMapping.containsKey(providerKey)) { throw new InitializationException(MessageFormat.format("contract: {} ,implCode: {} confilic", sourceInterface.getName(), implCode)); } providerMapping.putIfAbsent(providerKey, bean); } /** * 发现服务单例,根据契约与单例的映射关系查到单例,以便反射调用 * * @param providerKey 服务单例的键(契约) * @return 服务单例 */ public Object getProvider(ProviderKey providerKey) { return providerMapping.get(providerKey); } /** * 获取所有满足条件的(@Provider)的契约 * * @return 服务键集合 */ Set<ProviderKey> getProviders() { return providers; } }
该类作为服务实例的仓库,存放了 ProviderKey 与 服务实例的对应关系。这也是服务定位的逻辑所在,通过接口的全限定名,beanName作为服务的坐标( ProviderKey ),唯一确定一个服务。这样服务调用方将这些信息告诉服务提供方时,就可以知道反射调用哪个服务实例了。
服务信息收集完毕后,接下来就会进行 服务注册 :
package com.menghao.rpc.provider.regisiter; import com.menghao.rpc.provider.exception.InitializationException; import com.menghao.rpc.provider.model.ProviderKey; import com.menghao.rpc.util.ProviderHostUtils; import com.menghao.rpc.zookeeper.CuratorClient; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.util.Assert; import java.text.MessageFormat; /** * <p>服务提供方注册.</br> * 需要的信息:契约、实现、IP、端口 * * @author MarvelCode */ public class ProviderRegister implements ApplicationListener<ContextRefreshedEvent> { private CuratorClient curatorClient; private Integer port; public ProviderRegister(CuratorClient curatorClient, Integer port) { this.curatorClient = curatorClient; this.port = port; } @Override public void onApplicationEvent(ContextRefreshedEvent event) { ApplicationContext currentContext = event.getApplicationContext(); // 忽略子容器 if (currentContext.getParent() != null) { return; } ProviderRepository apiRepository = currentContext.getBean(ProviderRepository.class); Assert.notNull(apiRepository, "初始化失败"); String ip = ProviderHostUtils.getLocalHost(); String nodeValue = ip + ":" + port; for (ProviderKey providerKey : apiRepository.getProviders()) { String path = getNodePath(providerKey) + "/" + nodeValue; try { curatorClient.createEphemeralNode(path, null); } catch (Exception e) { e.printStackTrace(); throw new InitializationException(MessageFormat.format("create node: {} value: {} excecption", path, nodeValue)); } } } private String getNodePath(ProviderKey providerKey) { return "/" + providerKey.getContract() + ":" + providerKey.getImplCode(); } }
这一步利用了 Spring的事件机制 : 在容器启动后,将服务信息与机器信息注册到ZooKeeper上。
其中服务所在机器的IP是通过 ProviderHostUtils 工具类获取的,端口号是依赖外部配置文件指定的。
ZooKeeper 客户端使用了 Curator框架 ,该框架对ZooKeeper Api进行了封装,像重连机制、Watch的多次触发等都无需考虑。
注册路径的规则可自行设计,我的实现为:统一前置路径,例如 /marvel/rpc/provider,下一级路径为“接口全限定名 :beanName”,在下一级为“ip:端口号”。其实到这里已经很明显了,服务消费方只要根据服务的“契约”,就可获取指定节点下的所有子节点,也就是该服务对应的所有机器信息了。
服务消费方
介绍完服务提供方的启动流程,接下来看下服务消费方是如何进行服务调用的。
首先依然是一个注解,该注解用于将服务进行注入,类似Spring注入Bean一样:
package com.menghao.rpc.consumer.annotation; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * <p>Rpc服务消费方注解.<br> * * @author MarvelCode. */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.FIELD) public @interface Reference { /** * 用于指定implCode */ String value(); }
接下来是 @Reference 注解的处理逻辑,类比于 Spring 的@Autowired注解来看:
package com.menghao.rpc.spring; import com.menghao.rpc.consumer.annotation.Reference; import com.menghao.rpc.consumer.discovery.ReferenceRepository; import com.menghao.rpc.consumer.model.ReferenceKey; import org.springframework.beans.BeansException; import org.springframework.beans.PropertyValues; import org.springframework.beans.factory.BeanCreationException; import org.springframework.beans.factory.config.InstantiationAwareBeanPostProcessor; import java.beans.PropertyDescriptor; import java.lang.reflect.Field; /** * <p>@Reference注解增强:注入代理对象.</br> * <p>时机:属性填充时,代理对象由JDK动态代理实现</p> * * @author MarvelCode * @see ReferenceRepository */ public class ReferenceAutowired implements InstantiationAwareBeanPostProcessor { private ReferenceRepository referenceRepository; public ReferenceAutowired(ReferenceRepository referenceRepository) { this.referenceRepository = referenceRepository; } @Override public Object postProcessBeforeInstantiation(Class<?> beanClass, String beanName) throws BeansException { return null; } @Override public boolean postProcessAfterInstantiation(Object bean, String beanName) throws BeansException { return true; } @Override public PropertyValues postProcessPropertyValues(PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeansException { Class sourceClass = bean.getClass(); do { // 遍历成员变量找出所有被 @Reference注解的成员进行注入 for (Field field : sourceClass.getDeclaredFields()) { Reference reference = field.getAnnotation(Reference.class); if (reference != null) { // 找到对应的代理对象并赋值 Object candidate = findReference(field.getType(), reference); field.setAccessible(true); try { field.set(bean, candidate); } catch (IllegalAccessException e) { throw new BeanCreationException(beanName, "@Reference dependencies autowired failed", e); } } } sourceClass = sourceClass.getSuperclass(); } while (sourceClass != null && sourceClass != Object.class); return pvs; } private Object findReference(Class sourceInterface, Reference reference) { // 根据注入类型和指定实现 ReferenceKey referenceKey = new ReferenceKey(sourceInterface, reference.value()); return referenceRepository.getReference(referenceKey); } @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { return bean; } }
同样借助了 Bean 生命周期扩展接口,不过这次处理是在“ 属性填充 ”的阶段,将 @Reference 标识的服务接口进行代理赋值。即,通过代理拦截该接口的所有方法调用,转而以Http请求的方式请求服务提供方(本篇为Http方式,以后会提供Tcp方式的讲解,主要逻辑大致相似)。
同提供方相似的,消费方同样有仓库 ReferenceRepository ,通过接口的全限定名,implCode(对应服务端的beanName)构成 ProviderKey :
package com.menghao.rpc.consumer.discovery; import com.menghao.rpc.consumer.JdkProxyFactory; import com.menghao.rpc.consumer.RpcAgent; import com.menghao.rpc.consumer.model.HttpReferenceAgent; import com.menghao.rpc.consumer.model.ReferenceKey; import org.springframework.web.client.RestTemplate; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** * <p>Http方式代理对象仓库.</br> * * @author MarvelCode */ public class HttpReferenceRepository implements ReferenceRepository { private JdkProxyFactory proxyFactory; private ProviderDiscovery providerDiscovery; private RestTemplate restTemplate; private final ConcurrentMap<ReferenceKey, RpcAgent> referenceCache = new ConcurrentHashMap<>(8); public HttpReferenceRepository(JdkProxyFactory proxyFactory, ProviderDiscovery providerDiscovery, RestTemplate restTemplate) { this.proxyFactory = proxyFactory; this.providerDiscovery = providerDiscovery; this.restTemplate = restTemplate; } @Override public Object getReference(ReferenceKey referenceKey) { if (referenceCache.get(referenceKey) != null) { return proxyFactory.getProxy(referenceCache.get(referenceKey)); } RpcAgent referenceAgent = new HttpReferenceAgent(referenceKey, restTemplate); // Http方式 @Reference 代理初始化 providerDiscovery.initReferenceAgent(referenceAgent); referenceCache.putIfAbsent(referenceKey, referenceAgent); return proxyFactory.getProxy(referenceAgent); } }
这里有一层缓存,意图是将已经初始化的ReferenceAgent直接代理返回,否则初始化过程需要从ZooKeeper获取一次机器信息,缓存后就可以避免同一个服务的多次ZK请求。 ReferenceAgent 就是真正发送Http请求代理类,用来与服务通信以获取响应结果:
package com.menghao.rpc.consumer.model; import com.menghao.rpc.RpcConstants; import com.menghao.rpc.consumer.ReferenceAgent; import com.menghao.rpc.consumer.balance.LoadBalancer; import com.menghao.rpc.consumer.balance.RandomLoadBalancer; import com.menghao.rpc.provider.exception.InvokeException; import com.menghao.rpc.provider.model.RpcResponse; import lombok.Getter; import lombok.Setter; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.client.RestTemplate; import java.lang.reflect.Method; import java.text.MessageFormat; import java.util.List; /** * <p>@Reference代理对象.</br> * <p>调用原始接口的任意方法会被该类的invoke方法代理:使用RestTemplate发送请求</p> * <p>sourceInterface/implCode:唯一标识一个服务</p> * * @author MarvelCode */ public class HttpReferenceAgent implements ReferenceAgent { @Getter private Class sourceInterface; @Getter private String implCode; @Setter private List<String> providerHosts; private RestTemplate restTemplate; private LoadBalancer defaultBalancer = new RandomLoadBalancer(); public HttpReferenceAgent(ReferenceKey referenceKey, RestTemplate restTemplate) { this.sourceInterface = referenceKey.getSourceInterface(); this.implCode = referenceKey.getName(); this.restTemplate = restTemplate; } @Override public Object invoke(Method method, Object[] args) { // 构造请求参数 RpcRequest apiParam = makeParam(method, args); if (providerHosts == null || providerHosts.size() == 0) { throw new InvokeException("There are currently no service providers available"); } String url = select(providerHosts); // 发送Http请求 ResponseEntity<RpcResponse> responseEntity = restTemplate.postForEntity(url, apiParam, RpcResponse.class); if (responseEntity.getStatusCode() == HttpStatus.OK) { if (responseEntity.getBody().getResult() != null) { return responseEntity.getBody().getResult(); } if (responseEntity.getBody().getThrowable() != null) { throw new InvokeException(responseEntity.getBody().getThrowable()); } } return new InvokeException(MessageFormat.format(RpcConstants.LOG_RPC_PREFIX + "invoke {} response status code {}", sourceInterface.getName() + ":" + method.getName(), responseEntity.getStatusCode())); } private RpcRequest makeParam(Method method, Object[] args) { return RpcRequest.builder() .method(method.getName()) .contract(sourceInterface.getName()) .implCode(implCode) .args(args) .argsType(method.getParameterTypes()) .build(); } private String select(List<String> providerHosts) { // 负载均衡 String ip = defaultBalancer.select(providerHosts); return "http://" + ip + "/marvel/rpc/entrance"; } }
这个类封装了具体的调用过程,包含了请求实体的封装、负载均衡(随机访问),最后通过 Spring RestTemplate 发送请求(其中序列化与反序列化后面再展开讲解)。接下来看下代理对象是如何跟服务接口建立关系的,使用到了 JDK动态代理 :
package com.menghao.rpc.consumer; import com.menghao.rpc.consumer.model.ReferenceAgent; import com.menghao.rpc.provider.exception.InitializationException; import com.menghao.rpc.provider.exception.InvokeException; import lombok.Getter; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.text.MessageFormat; /** * <p>JDK动态代理工厂</br> * <p>对@Reference标识的成员变量进行动态代理,最终进行http请求</p> * * @author MarvelCode * @see ReferenceAgent */ public class JdkProxyFactory { /** * 根据@Reference创建代理对象 * * @param referenceAgent 实际处理对象 * @return 代理后的对象 */ public <T> T getProxy(ReferenceAgent referenceAgent) { return new RpcProxy<T>(referenceAgent).getProxy(); } private static class RpcProxy<T> implements InvocationHandler { private ReferenceAgent referenceAgent; @Getter private T proxy; private static Method hashcodeMethod; private static Method toStringMethod; private static Method equalsMethod; static { try { hashcodeMethod = Object.class.getMethod("hashCode"); toStringMethod = Object.class.getMethod("toString"); equalsMethod = Object.class.getMethod("equals", Object.class); } catch (NoSuchMethodException e) { throw new InitializationException(""); } } @SuppressWarnings("unchecked") RpcProxy(ReferenceAgent referenceAgent) { this.referenceAgent = referenceAgent; proxy = (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{referenceAgent.getSourceInterface()}, this); } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (method.getDeclaringClass() == Object.class) { if (hashcodeMethod == method) { return System.identityHashCode(proxy); } if (toStringMethod == method) { return referenceAgent.toString(); } if (equalsMethod == method) { return proxy == args[0]; } return new InvokeException(MessageFormat.format("method {} not support", method.getName())); } return referenceAgent.invoke(method, args); } } }
可以看到,除了 toString、hashCode、equals方法外,其余方法调用都转而调用 ReferenceAgent.invoke 。
到目前为之,大体的调用逻辑已经逐渐清晰,但最重要的一步,要向谁去请求呢?这时需要依赖 ZooKeeper 进行 服务发现 :
package com.menghao.rpc.consumer.discovery; import com.menghao.rpc.consumer.model.ReferenceAgent; import com.menghao.rpc.zookeeper.ChildChangeListener; import com.menghao.rpc.zookeeper.CuratorClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * <p>对于 @Reference代理所需信息的初始化.</br> * <p>需要根据契约及实现找到服务提供方机器信息</p> * * @author MarvelCode */ public class ProviderDiscovery { private final static Logger LOG = LoggerFactory.getLogger(ProviderDiscovery.class); private CuratorClient curatorClient; public ProviderDiscovery(CuratorClient curatorClient) { this.curatorClient = curatorClient; } public void initReferenceAgent(ReferenceAgent referenceAgent) { detect(referenceAgent); } private void detect(final ReferenceAgent referenceAgent) { // 服务信息节点路径 final String nodePath = "/" + referenceAgent.getContract() + ":" + referenceAgent.getImplCode(); LOG.info("look up zookeeper node {}", nodePath); try { // 监听 ZK节点 curatorClient.addPathListener(nodePath, new ChildChangeListener() { @Override public void onChange(String path, String data) { try { referenceAgent.setProviderHosts(curatorClient.getChildren(nodePath)); } catch (Exception e) { e.printStackTrace(); } } }); // 设置服务机器信息列表 referenceAgent.setProviderHosts(curatorClient.getChildren(nodePath)); } catch (Exception e) { LOG.error("zookeeper getNode child {} exception", nodePath, e); } } }
可以看到,通过服务“契约”,借助 Curator 我们很容易可以获取到指定服务的服务信息列表。当然我们还需要 设置对指定节点的监听 ,当服务所在的机器宕机时,临时服务节点就会下线,我们需要重新获取机器列表,以保证请求的有效性。
序列化与反序列化
最后就是请求的发送与响应了,来看下请求实体类定义:
package com.menghao.rpc.consumer.model; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; /** * <p>Rpc统一请求封装.</br> * <p>包含了服务契约、调用方法名、入参类型、入参数组</p> * * @author MarvelCode */ @Data @Builder @AllArgsConstructor @NoArgsConstructor public class RpcRequest implements Serializable { private String contract; private String implCode; private String method; private Class[] argsType; private Object[] args; }
包含了:接口的全限定名、implCode,这两者用于从容器中定位到服务。除此之外就是反射调用需要的三个参数了,方法、入参类型、以及具体的参数。
这些数据将会被序列化后传输到服务提供方,这里将使用 Hessian 序列化框架。考虑到本次采用的 Http 请求的方式,为了将调用代码对序列化无感知,借助了 Spring 的 HttpMessageConverter ,将Http请求和响应自动的序列化与反序列化。
package com.menghao.rpc.spring; import com.menghao.rpc.consumer.model.RpcRequest; import com.menghao.rpc.serialize.hessian.HessianObjectInput; import com.menghao.rpc.serialize.hessian.HessianObjectOutput; import org.springframework.http.HttpInputMessage; import org.springframework.http.HttpOutputMessage; import org.springframework.http.MediaType; import org.springframework.http.converter.AbstractHttpMessageConverter; import org.springframework.http.converter.HttpMessageNotReadableException; import org.springframework.http.converter.HttpMessageNotWritableException; import java.io.IOException; /** * <p>RpcRequest消息转换器.<br> * * @author MarvelCode. */ public class RpcRequestConvert extends AbstractHttpMessageConverter<RpcRequest> { public RpcRequestConvert(MediaType supportedMediaType) { super(supportedMediaType); } @Override protected boolean supports(Class<?> clazz) { return RpcRequest.class.isAssignableFrom(clazz); } @Override protected RpcRequest readInternal(Class<? extends RpcRequest> clazz, HttpInputMessage inputMessage) throws IOException, HttpMessageNotReadableException { HessianObjectInput input = new HessianObjectInput(inputMessage.getBody()); return (RpcRequest) input.readObject(RpcRequest.class); } @Override protected void writeInternal(RpcRequest rpcRequest, HttpOutputMessage outputMessage) throws IOException, HttpMessageNotWritableException { HessianObjectOutput output = new HessianObjectOutput(outputMessage.getBody()); output.writeObject(rpcRequest); output.flush(); } }
package com.menghao.rpc.spring; import com.menghao.rpc.provider.model.RpcResponse; import com.menghao.rpc.serialize.hessian.HessianObjectInput; import com.menghao.rpc.serialize.hessian.HessianObjectOutput; import org.springframework.http.HttpInputMessage; import org.springframework.http.HttpOutputMessage; import org.springframework.http.MediaType; import org.springframework.http.converter.AbstractHttpMessageConverter; import org.springframework.http.converter.HttpMessageNotReadableException; import org.springframework.http.converter.HttpMessageNotWritableException; import java.io.IOException; /** * <p>RpcResponse消息转换器.<br> * * @author MarvelCode. */ public class RpcResponseConvert extends AbstractHttpMessageConverter<RpcResponse> { public RpcResponseConvert(MediaType supportedMediaType) { super(supportedMediaType); } @Override protected boolean supports(Class<?> clazz) { return RpcResponse.class.isAssignableFrom(clazz); } @Override protected RpcResponse readInternal(Class<? extends RpcResponse> clazz, HttpInputMessage inputMessage) throws IOException, HttpMessageNotReadableException { HessianObjectInput input = new HessianObjectInput(inputMessage.getBody()); return (RpcResponse) input.readObject(RpcResponse.class); } @Override protected void writeInternal(RpcResponse rpcRequest, HttpOutputMessage outputMessage) throws IOException, HttpMessageNotWritableException { HessianObjectOutput output = new HessianObjectOutput(outputMessage.getBody()); output.writeObject(rpcRequest); output.flush(); } }
以上这两个转换器,装配到 RestTemplate、以及 mvc后,就能实现自动序列化的功能。接下来看下服务提供方接到请求后是如何处理的:
package com.menghao.rpc.provider; import com.menghao.rpc.consumer.model.RpcRequest; import com.menghao.rpc.provider.exception.InvokeException; import com.menghao.rpc.provider.model.ProviderKey; import com.menghao.rpc.provider.model.RpcResponse; import com.menghao.rpc.provider.regisiter.ProviderRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.ReflectionUtils; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; import java.lang.reflect.Method; /** * <p>服务提供方统一入口.</br> * * @author MarvelCode */ @RequestMapping("/marvel/rpc/entrance") public class HttpProviderEntrance { private static final Logger LOGGER = LoggerFactory.getLogger(HttpProviderEntrance.class); private ProviderRepository providerRepository; public HttpProviderEntrance(ProviderRepository providerRepository) { this.providerRepository = providerRepository; } @ResponseBody @RequestMapping(method = RequestMethod.POST) public RpcResponse api(@RequestBody RpcRequest rpcRequest) { // 通过 contract、implCode定位服务bean ProviderKey providerKey = new ProviderKey(rpcRequest.getContract(), rpcRequest.getImplCode()); Object bean = providerRepository.getProvider(providerKey); if (bean == null) { return new RpcResponse(new InvokeException("")); } try { // 反射调用服务方法 Method method = ReflectionUtils.findMethod(bean.getClass(), rpcRequest.getMethod(), rpcRequest.getArgsType()); Object result = ReflectionUtils.invokeMethod(method, bean, rpcRequest.getArgs()); return new RpcResponse(result); } catch (Throwable e) { LOGGER.error("invoke exception,", e); return new RpcResponse(e); } } }
响应实体:
package com.menghao.rpc.provider.model; import lombok.Data; import java.io.Serializable; /** * <p>Rpc调用响应实体封装.<br> * * @author MarvelCode. * @version 2018/8/1. */ @Data public class RpcResponse implements Serializable { private Object result; private Throwable throwable; public RpcResponse(Object result) { this.result = result; } public RpcResponse(Throwable throwable) { this.throwable = throwable; } }
至此,基于Http的远程服务调用的大致流程就结束了。
配置
该工程基于 Spring Boot开发,配置采用自配置方式。
依赖
<dependency> <groupId>com.menghao</groupId> <artifactId>marvelcode-rpc</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
服务提供方
# 自定义端口 server.port= # ZooKeeper所在机器 ip:port marvel.rpc.zkServerHost = marvel.rpc.consumerEnable=false marvel.rpc.providerEnable=true marvel.rpc.type=http
服务消费方
# ZooKeeper所在机器 ip:port marvel.rpc.zkServerHost= marvel.rpc.providerEnable=false marvel.rpc.consumerEnable=true marvel.rpc.type=http
总结
代码均已测试,这里测试结果就不赘述了。各位可以本地先简写一个服务接口层,在服务提供方实现,并部署等待消费方调用。消费方引入服务接口层,编写调用代码,同样部署。工程运行依赖Spring Boot框架。
本篇目的在于讲解RPC调用实现的一种思路,很多功能尚不完备,例如负载均衡采用了简单的随机访问方式。完整的Rpc框架需要包括权重负载、幂等、重试等额外功能。另外,采用 Http 方式的远程调用性能上,不如 Tcp 方式的。
有时间我会整理基于 Tcp 方式远程服务调用,敬请期待。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Music Recommendation and Discovery
Òscar Celma / Springer / 2010-9-7 / USD 49.95
With so much more music available these days, traditional ways of finding music have diminished. Today radio shows are often programmed by large corporations that create playlists drawn from a limited......一起来看看 《Music Recommendation and Discovery》 这本书的介绍吧!