原 荐 基于HTTP的远程服务调用

栏目: 服务器 · 发布时间: 6年前

内容简介:RPC——远程过程调用。简单的理解,调用远程服务,就像是调用本地方法一样。如果服务仅是对同一工程内使用,例如定义一个服务,接口UserService,实现UserServiceImpl,仅需创建一个对象,就可以轻松的调用。但如果把这样的服务暴露给其它工程调用,特别是目前流行的微服务框架,抽象出的模块化服务间可能要相互调用,这就需要“目前主流的RPC框架有很多,例如阿里的Dubbo,谷歌的gRPC等等。

前言

RPC——远程过程调用。简单的理解,调用远程服务,就像是调用本地方法一样。

如果服务仅是对同一工程内使用,例如定义一个服务,接口UserService,实现UserServiceImpl,仅需创建一个对象,就可以轻松的调用。但如果把这样的服务暴露给其它工程调用,特别是目前流行的微服务框架,抽象出的模块化服务间可能要相互调用,这就需要“ RPC框架 ”来实现了。

目前主流的RPC框架有很多,例如阿里的Dubbo,谷歌的gRPC等等。

各种实现的框架虽不尽相同,但主流的思想是相似的:都离不开 中心化的服务治理 (服务的发现与注册)。对此可选用的方案有多种,例如ZooKeeper、Eureka、 Redis 等等。本篇将采用 ZooKeeper 作为注册中心的方式,用代码讲解RPC调用的几个关键点。

源码地址: https://gitee.com/marvelcode/marvelcode-rpc.git

架构分析

接下来,将会对我所采用的架构进行 代入式的分析 ,如果对RPC框架有所了解的可以忽略此部分。

那就从RPC调用的源头方开始吧,来思考下一次调用的大致流程是怎样的:

  • 首先,调用方发起服务调用。无论是通过服务的接口层,还是仅仅依靠服务对应的名称。
    • 但无论是哪一种,必然要先知道该服务部署机器的IP、以及开放的端口号。这就是服务治理重要的一环, 服务发现。 即通过要调用的服务信息,来获取到这些服务所在的机器信息。
  • 第二步,假使我们已经获取到了机器信息,接下来便是传入参数、调用服务、得到响应结果。
    • 和普通HTTP接口不同的是,服务间调用传递的对象有可能是复杂的 Java 对象,仅靠Json格式的数据可能无法支撑(Ps:比如要穿一个Class对象)。这就引出了对象传输的 序列化与反序列化
  • 第三步,假使服务提供方已经接收到服务调用的请求,最后只需根据请求的数据定位到提供服务的实例,进行反射调用即可。
    • 既然能通过服务发现找到该服务,前提必然有 服务注册 这一步。当然,这一环节涉及到的服务定位,根据哪些数据能唯一标识一个服务呢?
  • 分析完主要问题后,我们来看源码。

源码

服务提供方

根据上面的分析,我们来看下服务注册这一步的代码实现。

首先来看下用于标识对外暴露的服务的注解定义:

package com.menghao.rpc.provider.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * <p>服务提供方注解.<br>
 * <p>用于标识在接口上,只提供出接口jar</p>
 *
 * @author MarvelCode.
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Provider {
}

该注解起的作用就是个标记作用,在合适的时间将标识的服务注册到 ZooKeeper 上。来看下它是何时被解析的:

package com.menghao.rpc.spring;

import com.menghao.rpc.RpcConstants;
import com.menghao.rpc.provider.annotation.Provider;
import com.menghao.rpc.provider.regisiter.ProviderRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;

import java.util.HashMap;
import java.util.Map;

/**
 * <p>内部系统服务提供方注册.</br>
 * 获取内部所有的 @Provider 接口并注册到服务提供者仓库
 *
 * @author MarvelCode
 * @see ProviderRepository
 */
public class ProviderPostProcessor implements BeanPostProcessor {

    private static final Logger LOGGER = LoggerFactory.getLogger(ProviderPostProcessor.class);

    private Map<String, Class> candidates = new HashMap<>(8);

    private ProviderRepository providerRepository;

    public ProviderPostProcessor(ProviderRepository providerRepository) {
        this.providerRepository = providerRepository;
    }

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        for (Class sourceInterface : bean.getClass().getInterfaces()) {
            // 递归遍历所有父接口
            recursiveInterface(sourceInterface, beanName, candidates);
        }
        return bean;
    }

    private void recursiveInterface(Class sourceInterface, String beanName, Map<String, Class> candidates) {
        // 接口被@Provider标识
        if (sourceInterface.getAnnotation(Provider.class) != null) {
            LOGGER.info(RpcConstants.LOG_RPC_PREFIX + "find @Provider-" + sourceInterface.getName());
            candidates.put(beanName, sourceInterface);
        }
        // 直到无父接口,递归结束
        if (sourceInterface.getInterfaces().length == 0) {
            return;
        }
        // 否则递归遍历父接口
        for (Class superInterface : sourceInterface.getInterfaces()) {
            recursiveInterface(superInterface, beanName, candidates);
        }
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        // 将符合条件的服务进行注册
        if (candidates.containsKey(beanName)) {
            Class sourceInterface = candidates.get(beanName);
            providerRepository.register(sourceInterface, beanName, bean);
        }
        return bean;
    }
}

借助 Spring提供的 bean 生命周期扩展接口,在 bean 的初始化前,遍历获取该实例的所有父接口,将被 @Provider 注解 标识的接口筛选出来,放入候选者列表(candidates :key-beanName,value-Interface)等待处理。在 bean 的初始化后,将之前筛选出的服务进行注册。

接着来看注册逻辑:

package com.menghao.rpc.provider.regisiter;

import com.menghao.rpc.RpcConstants;
import com.menghao.rpc.provider.exception.InitializationException;
import com.menghao.rpc.provider.model.ProviderKey;
import com.menghao.rpc.spring.ProviderPostProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.MessageFormat;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
 * <p>本地服务仓库.</br>
 * 存放了契约与调用bean的映射关系,用于rpc调用时取出,从而反射调用
 *
 * @author MarvelCode
 * @see ProviderRegister
 */
public class ProviderRepository {

    private static final Logger LOGGER = LoggerFactory.getLogger(ProviderPostProcessor.class);

    private ConcurrentMap<ProviderKey, Object> providerMapping = new ConcurrentHashMap<>();

    private Set<ProviderKey> providers = Collections.unmodifiableSet(providerMapping.keySet());

    /**
     * 注册服务,维护契约与单例的映射关系
     *
     * @param sourceInterface @Provider标识的接口
     * @param implCode        服务实现的beanName
     * @param bean            服务单例
     */
    public void register(Class sourceInterface, String implCode, Object bean) {
        ProviderKey providerKey = new ProviderKey(sourceInterface.getName(), implCode);
        LOGGER.info(RpcConstants.LOG_RPC_PREFIX + "register provider-" + providerKey);
        if (providerMapping.containsKey(providerKey)) {
            throw new InitializationException(MessageFormat.format("contract: {} ,implCode: {} confilic",
                    sourceInterface.getName(), implCode));
        }
        providerMapping.putIfAbsent(providerKey, bean);
    }

    /**
     * 发现服务单例,根据契约与单例的映射关系查到单例,以便反射调用
     *
     * @param providerKey 服务单例的键(契约)
     * @return 服务单例
     */
    public Object getProvider(ProviderKey providerKey) {
        return providerMapping.get(providerKey);
    }

    /**
     * 获取所有满足条件的(@Provider)的契约
     *
     * @return 服务键集合
     */
    Set<ProviderKey> getProviders() {
        return providers;
    }
}

该类作为服务实例的仓库,存放了 ProviderKey 与 服务实例的对应关系。这也是服务定位的逻辑所在,通过接口的全限定名,beanName作为服务的坐标( ProviderKey ),唯一确定一个服务。这样服务调用方将这些信息告诉服务提供方时,就可以知道反射调用哪个服务实例了。

服务信息收集完毕后,接下来就会进行 服务注册

package com.menghao.rpc.provider.regisiter;

import com.menghao.rpc.provider.exception.InitializationException;
import com.menghao.rpc.provider.model.ProviderKey;
import com.menghao.rpc.util.ProviderHostUtils;
import com.menghao.rpc.zookeeper.CuratorClient;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.util.Assert;

import java.text.MessageFormat;

/**
 * <p>服务提供方注册.</br>
 * 需要的信息:契约、实现、IP、端口
 *
 * @author MarvelCode
 */
public class ProviderRegister implements ApplicationListener<ContextRefreshedEvent> {

    private CuratorClient curatorClient;

    private Integer port;

    public ProviderRegister(CuratorClient curatorClient, Integer port) {
        this.curatorClient = curatorClient;
        this.port = port;
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        ApplicationContext currentContext = event.getApplicationContext();
        // 忽略子容器
        if (currentContext.getParent() != null) {
            return;
        }
        ProviderRepository apiRepository = currentContext.getBean(ProviderRepository.class);
        Assert.notNull(apiRepository, "初始化失败");
        String ip = ProviderHostUtils.getLocalHost();
        String nodeValue = ip + ":" + port;
        for (ProviderKey providerKey : apiRepository.getProviders()) {
            String path = getNodePath(providerKey) + "/" + nodeValue;
            try {
                curatorClient.createEphemeralNode(path, null);
            } catch (Exception e) {
                e.printStackTrace();
                throw new InitializationException(MessageFormat.format("create node: {} value: {} excecption", path, nodeValue));
            }
        }
    }

    private String getNodePath(ProviderKey providerKey) {
        return "/" + providerKey.getContract() + ":" + providerKey.getImplCode();
    }
}

这一步利用了 Spring的事件机制 在容器启动后,将服务信息与机器信息注册到ZooKeeper上。

其中服务所在机器的IP是通过 ProviderHostUtils 工具类获取的,端口号是依赖外部配置文件指定的。

ZooKeeper 客户端使用了 Curator框架 ,该框架对ZooKeeper Api进行了封装,像重连机制、Watch的多次触发等都无需考虑。

注册路径的规则可自行设计,我的实现为:统一前置路径,例如 /marvel/rpc/provider,下一级路径为“接口全限定名 :beanName”,在下一级为“ip:端口号”。其实到这里已经很明显了,服务消费方只要根据服务的“契约”,就可获取指定节点下的所有子节点,也就是该服务对应的所有机器信息了。

服务消费方

介绍完服务提供方的启动流程,接下来看下服务消费方是如何进行服务调用的。

首先依然是一个注解,该注解用于将服务进行注入,类似Spring注入Bean一样:

package com.menghao.rpc.consumer.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * <p>Rpc服务消费方注解.<br>
 *
 * @author MarvelCode.
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface Reference {
    /**
     * 用于指定implCode
     */
    String value();
}

接下来是 @Reference 注解的处理逻辑,类比于 Spring 的@Autowired注解来看:

package com.menghao.rpc.spring;

import com.menghao.rpc.consumer.annotation.Reference;
import com.menghao.rpc.consumer.discovery.ReferenceRepository;
import com.menghao.rpc.consumer.model.ReferenceKey;
import org.springframework.beans.BeansException;
import org.springframework.beans.PropertyValues;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.beans.factory.config.InstantiationAwareBeanPostProcessor;

import java.beans.PropertyDescriptor;
import java.lang.reflect.Field;

/**
 * <p>@Reference注解增强:注入代理对象.</br>
 * <p>时机:属性填充时,代理对象由JDK动态代理实现</p>
 *
 * @author MarvelCode
 * @see ReferenceRepository
 */
public class ReferenceAutowired implements InstantiationAwareBeanPostProcessor {

    private ReferenceRepository referenceRepository;

    public ReferenceAutowired(ReferenceRepository referenceRepository) {
        this.referenceRepository = referenceRepository;
    }

    @Override
    public Object postProcessBeforeInstantiation(Class<?> beanClass, String beanName) throws BeansException {
        return null;
    }

    @Override
    public boolean postProcessAfterInstantiation(Object bean, String beanName) throws BeansException {
        return true;
    }

    @Override
    public PropertyValues postProcessPropertyValues(PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeansException {
        Class sourceClass = bean.getClass();
        do {
            // 遍历成员变量找出所有被 @Reference注解的成员进行注入
            for (Field field : sourceClass.getDeclaredFields()) {
                Reference reference = field.getAnnotation(Reference.class);
                if (reference != null) {
                    // 找到对应的代理对象并赋值
                    Object candidate = findReference(field.getType(), reference);
                    field.setAccessible(true);
                    try {
                        field.set(bean, candidate);
                    } catch (IllegalAccessException e) {
                        throw new BeanCreationException(beanName, "@Reference dependencies autowired failed", e);
                    }
                }
            }
            sourceClass = sourceClass.getSuperclass();
        } while (sourceClass != null && sourceClass != Object.class);
        return pvs;
    }

    private Object findReference(Class sourceInterface, Reference reference) {
        // 根据注入类型和指定实现
        ReferenceKey referenceKey = new ReferenceKey(sourceInterface, reference.value());
        return referenceRepository.getReference(referenceKey);
    }

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }
}

同样借助了 Bean 生命周期扩展接口,不过这次处理是在“ 属性填充 ”的阶段,将 @Reference 标识的服务接口进行代理赋值。即,通过代理拦截该接口的所有方法调用,转而以Http请求的方式请求服务提供方(本篇为Http方式,以后会提供Tcp方式的讲解,主要逻辑大致相似)。

同提供方相似的,消费方同样有仓库 ReferenceRepository ,通过接口的全限定名,implCode(对应服务端的beanName)构成 ProviderKey

package com.menghao.rpc.consumer.discovery;

import com.menghao.rpc.consumer.JdkProxyFactory;
import com.menghao.rpc.consumer.RpcAgent;
import com.menghao.rpc.consumer.model.HttpReferenceAgent;
import com.menghao.rpc.consumer.model.ReferenceKey;
import org.springframework.web.client.RestTemplate;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
 * <p>Http方式代理对象仓库.</br>
 *
 * @author MarvelCode
 */
public class HttpReferenceRepository implements ReferenceRepository {

    private JdkProxyFactory proxyFactory;

    private ProviderDiscovery providerDiscovery;

    private RestTemplate restTemplate;

    private final ConcurrentMap<ReferenceKey, RpcAgent> referenceCache = new ConcurrentHashMap<>(8);

    public HttpReferenceRepository(JdkProxyFactory proxyFactory, ProviderDiscovery providerDiscovery,
                                   RestTemplate restTemplate) {
        this.proxyFactory = proxyFactory;
        this.providerDiscovery = providerDiscovery;
        this.restTemplate = restTemplate;
    }

    @Override
    public Object getReference(ReferenceKey referenceKey) {
        if (referenceCache.get(referenceKey) != null) {
            return proxyFactory.getProxy(referenceCache.get(referenceKey));
        }
        RpcAgent referenceAgent = new HttpReferenceAgent(referenceKey, restTemplate);
        // Http方式 @Reference 代理初始化
        providerDiscovery.initReferenceAgent(referenceAgent);
        referenceCache.putIfAbsent(referenceKey, referenceAgent);
        return proxyFactory.getProxy(referenceAgent);
    }

}

这里有一层缓存,意图是将已经初始化的ReferenceAgent直接代理返回,否则初始化过程需要从ZooKeeper获取一次机器信息,缓存后就可以避免同一个服务的多次ZK请求。 ReferenceAgent  就是真正发送Http请求代理类,用来与服务通信以获取响应结果:

package com.menghao.rpc.consumer.model;

import com.menghao.rpc.RpcConstants;
import com.menghao.rpc.consumer.ReferenceAgent;
import com.menghao.rpc.consumer.balance.LoadBalancer;
import com.menghao.rpc.consumer.balance.RandomLoadBalancer;
import com.menghao.rpc.provider.exception.InvokeException;
import com.menghao.rpc.provider.model.RpcResponse;
import lombok.Getter;
import lombok.Setter;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;

import java.lang.reflect.Method;
import java.text.MessageFormat;
import java.util.List;

/**
 * <p>@Reference代理对象.</br>
 * <p>调用原始接口的任意方法会被该类的invoke方法代理:使用RestTemplate发送请求</p>
 * <p>sourceInterface/implCode:唯一标识一个服务</p>
 *
 * @author MarvelCode
 */
public class HttpReferenceAgent implements ReferenceAgent {

    @Getter
    private Class sourceInterface;
    @Getter
    private String implCode;
    @Setter
    private List<String> providerHosts;
    private RestTemplate restTemplate;
    private LoadBalancer defaultBalancer = new RandomLoadBalancer();

    public HttpReferenceAgent(ReferenceKey referenceKey, RestTemplate restTemplate) {
        this.sourceInterface = referenceKey.getSourceInterface();
        this.implCode = referenceKey.getName();
        this.restTemplate = restTemplate;
    }

    @Override
    public Object invoke(Method method, Object[] args) {
        // 构造请求参数
        RpcRequest apiParam = makeParam(method, args);
        if (providerHosts == null || providerHosts.size() == 0) {
            throw new InvokeException("There are currently no service providers available");
        }
        String url = select(providerHosts);
        // 发送Http请求
        ResponseEntity<RpcResponse> responseEntity = restTemplate.postForEntity(url, apiParam, RpcResponse.class);
        if (responseEntity.getStatusCode() == HttpStatus.OK) {
            if (responseEntity.getBody().getResult() != null) {
                return responseEntity.getBody().getResult();
            }
            if (responseEntity.getBody().getThrowable() != null) {
                throw new InvokeException(responseEntity.getBody().getThrowable());
            }
        }
        return new InvokeException(MessageFormat.format(RpcConstants.LOG_RPC_PREFIX + "invoke {} response status code {}",
                sourceInterface.getName() + ":" + method.getName(), responseEntity.getStatusCode()));
    }


    private RpcRequest makeParam(Method method, Object[] args) {
        return RpcRequest.builder()
                .method(method.getName())
                .contract(sourceInterface.getName())
                .implCode(implCode)
                .args(args)
                .argsType(method.getParameterTypes())
                .build();
    }

    private String select(List<String> providerHosts) {
        // 负载均衡
        String ip = defaultBalancer.select(providerHosts);
        return "http://" + ip + "/marvel/rpc/entrance";
    }
}

这个类封装了具体的调用过程,包含了请求实体的封装、负载均衡(随机访问),最后通过 Spring RestTemplate 发送请求(其中序列化与反序列化后面再展开讲解)。接下来看下代理对象是如何跟服务接口建立关系的,使用到了 JDK动态代理

package com.menghao.rpc.consumer;

import com.menghao.rpc.consumer.model.ReferenceAgent;
import com.menghao.rpc.provider.exception.InitializationException;
import com.menghao.rpc.provider.exception.InvokeException;
import lombok.Getter;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.text.MessageFormat;

/**
 * <p>JDK动态代理工厂</br>
 * <p>对@Reference标识的成员变量进行动态代理,最终进行http请求</p>
 *
 * @author MarvelCode
 * @see ReferenceAgent
 */
public class JdkProxyFactory {

    /**
     * 根据@Reference创建代理对象
     *
     * @param referenceAgent 实际处理对象
     * @return 代理后的对象
     */
    public <T> T getProxy(ReferenceAgent referenceAgent) {
        return new RpcProxy<T>(referenceAgent).getProxy();
    }

    private static class RpcProxy<T> implements InvocationHandler {

        private ReferenceAgent referenceAgent;

        @Getter
        private T proxy;

        private static Method hashcodeMethod;
        private static Method toStringMethod;
        private static Method equalsMethod;

        static {
            try {
                hashcodeMethod = Object.class.getMethod("hashCode");
                toStringMethod = Object.class.getMethod("toString");
                equalsMethod = Object.class.getMethod("equals", Object.class);
            } catch (NoSuchMethodException e) {
                throw new InitializationException("");
            }
        }

        @SuppressWarnings("unchecked")
        RpcProxy(ReferenceAgent referenceAgent) {
            this.referenceAgent = referenceAgent;
            proxy = (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                    new Class[]{referenceAgent.getSourceInterface()}, this);
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            if (method.getDeclaringClass() == Object.class) {
                if (hashcodeMethod == method) {
                    return System.identityHashCode(proxy);
                }
                if (toStringMethod == method) {
                    return referenceAgent.toString();
                }
                if (equalsMethod == method) {
                    return proxy == args[0];
                }
                return new InvokeException(MessageFormat.format("method {} not support", method.getName()));
            }
            return referenceAgent.invoke(method, args);
        }
    }
}

可以看到,除了 toString、hashCode、equals方法外,其余方法调用都转而调用 ReferenceAgent.invoke

到目前为之,大体的调用逻辑已经逐渐清晰,但最重要的一步,要向谁去请求呢?这时需要依赖 ZooKeeper 进行 服务发现

package com.menghao.rpc.consumer.discovery;

import com.menghao.rpc.consumer.model.ReferenceAgent;
import com.menghao.rpc.zookeeper.ChildChangeListener;
import com.menghao.rpc.zookeeper.CuratorClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * <p>对于 @Reference代理所需信息的初始化.</br>
 * <p>需要根据契约及实现找到服务提供方机器信息</p>
 *
 * @author MarvelCode
 */
public class ProviderDiscovery {

    private final static Logger LOG = LoggerFactory.getLogger(ProviderDiscovery.class);

    private CuratorClient curatorClient;

    public ProviderDiscovery(CuratorClient curatorClient) {
        this.curatorClient = curatorClient;
    }

    public void initReferenceAgent(ReferenceAgent referenceAgent) {
        detect(referenceAgent);
    }

    private void detect(final ReferenceAgent referenceAgent) {
        // 服务信息节点路径
        final String nodePath = "/" + referenceAgent.getContract() + ":" + referenceAgent.getImplCode();
        LOG.info("look up zookeeper node {}", nodePath);
        try {
            // 监听 ZK节点
            curatorClient.addPathListener(nodePath, new ChildChangeListener() {
                @Override
                public void onChange(String path, String data) {
                    try {
                        referenceAgent.setProviderHosts(curatorClient.getChildren(nodePath));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            // 设置服务机器信息列表
            referenceAgent.setProviderHosts(curatorClient.getChildren(nodePath));
        } catch (Exception e) {
            LOG.error("zookeeper getNode child {} exception", nodePath, e);
        }
    }
}

可以看到,通过服务“契约”,借助 Curator 我们很容易可以获取到指定服务的服务信息列表。当然我们还需要 设置对指定节点的监听 ,当服务所在的机器宕机时,临时服务节点就会下线,我们需要重新获取机器列表,以保证请求的有效性。

序列化与反序列化

最后就是请求的发送与响应了,来看下请求实体类定义:

package com.menghao.rpc.consumer.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

/**
 * <p>Rpc统一请求封装.</br>
 * <p>包含了服务契约、调用方法名、入参类型、入参数组</p>
 *
 * @author MarvelCode
 */
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RpcRequest implements Serializable {

    private String contract;

    private String implCode;

    private String method;

    private Class[] argsType;

    private Object[] args;
}

包含了:接口的全限定名、implCode,这两者用于从容器中定位到服务。除此之外就是反射调用需要的三个参数了,方法、入参类型、以及具体的参数。

这些数据将会被序列化后传输到服务提供方,这里将使用 Hessian 序列化框架。考虑到本次采用的 Http 请求的方式,为了将调用代码对序列化无感知,借助了 Spring 的  HttpMessageConverter ,将Http请求和响应自动的序列化与反序列化。

package com.menghao.rpc.spring;

import com.menghao.rpc.consumer.model.RpcRequest;
import com.menghao.rpc.serialize.hessian.HessianObjectInput;
import com.menghao.rpc.serialize.hessian.HessianObjectOutput;
import org.springframework.http.HttpInputMessage;
import org.springframework.http.HttpOutputMessage;
import org.springframework.http.MediaType;
import org.springframework.http.converter.AbstractHttpMessageConverter;
import org.springframework.http.converter.HttpMessageNotReadableException;
import org.springframework.http.converter.HttpMessageNotWritableException;

import java.io.IOException;

/**
 * <p>RpcRequest消息转换器.<br>
 *
 * @author MarvelCode.
 */
public class RpcRequestConvert extends AbstractHttpMessageConverter<RpcRequest> {

    public RpcRequestConvert(MediaType supportedMediaType) {
        super(supportedMediaType);
    }

    @Override
    protected boolean supports(Class<?> clazz) {
        return RpcRequest.class.isAssignableFrom(clazz);
    }

    @Override
    protected RpcRequest readInternal(Class<? extends RpcRequest> clazz, HttpInputMessage inputMessage) throws IOException, HttpMessageNotReadableException {
        HessianObjectInput input = new HessianObjectInput(inputMessage.getBody());
        return (RpcRequest) input.readObject(RpcRequest.class);
    }

    @Override
    protected void writeInternal(RpcRequest rpcRequest, HttpOutputMessage outputMessage) throws IOException, HttpMessageNotWritableException {
        HessianObjectOutput output = new HessianObjectOutput(outputMessage.getBody());
        output.writeObject(rpcRequest);
        output.flush();
    }

}
package com.menghao.rpc.spring;

import com.menghao.rpc.provider.model.RpcResponse;
import com.menghao.rpc.serialize.hessian.HessianObjectInput;
import com.menghao.rpc.serialize.hessian.HessianObjectOutput;
import org.springframework.http.HttpInputMessage;
import org.springframework.http.HttpOutputMessage;
import org.springframework.http.MediaType;
import org.springframework.http.converter.AbstractHttpMessageConverter;
import org.springframework.http.converter.HttpMessageNotReadableException;
import org.springframework.http.converter.HttpMessageNotWritableException;

import java.io.IOException;

/**
 * <p>RpcResponse消息转换器.<br>
 *
 * @author MarvelCode.
 */
public class RpcResponseConvert extends AbstractHttpMessageConverter<RpcResponse> {

    public RpcResponseConvert(MediaType supportedMediaType) {
        super(supportedMediaType);
    }

    @Override
    protected boolean supports(Class<?> clazz) {
        return RpcResponse.class.isAssignableFrom(clazz);
    }

    @Override
    protected RpcResponse readInternal(Class<? extends RpcResponse> clazz, HttpInputMessage inputMessage) throws IOException, HttpMessageNotReadableException {
        HessianObjectInput input = new HessianObjectInput(inputMessage.getBody());
        return (RpcResponse) input.readObject(RpcResponse.class);
    }

    @Override
    protected void writeInternal(RpcResponse rpcRequest, HttpOutputMessage outputMessage) throws IOException, HttpMessageNotWritableException {
        HessianObjectOutput output = new HessianObjectOutput(outputMessage.getBody());
        output.writeObject(rpcRequest);
        output.flush();
    }
}

以上这两个转换器,装配到 RestTemplate、以及 mvc后,就能实现自动序列化的功能。接下来看下服务提供方接到请求后是如何处理的:

package com.menghao.rpc.provider;

import com.menghao.rpc.consumer.model.RpcRequest;
import com.menghao.rpc.provider.exception.InvokeException;
import com.menghao.rpc.provider.model.ProviderKey;
import com.menghao.rpc.provider.model.RpcResponse;
import com.menghao.rpc.provider.regisiter.ProviderRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;

import java.lang.reflect.Method;

/**
 * <p>服务提供方统一入口.</br>
 *
 * @author MarvelCode
 */
@RequestMapping("/marvel/rpc/entrance")
public class HttpProviderEntrance {

    private static final Logger LOGGER = LoggerFactory.getLogger(HttpProviderEntrance.class);

    private ProviderRepository providerRepository;

    public HttpProviderEntrance(ProviderRepository providerRepository) {
        this.providerRepository = providerRepository;
    }

    @ResponseBody
    @RequestMapping(method = RequestMethod.POST)
    public RpcResponse api(@RequestBody RpcRequest rpcRequest) {
        // 通过 contract、implCode定位服务bean
        ProviderKey providerKey = new ProviderKey(rpcRequest.getContract(), rpcRequest.getImplCode());
        Object bean = providerRepository.getProvider(providerKey);
        if (bean == null) {
            return new RpcResponse(new InvokeException(""));
        }
        try {
            // 反射调用服务方法
            Method method = ReflectionUtils.findMethod(bean.getClass(), rpcRequest.getMethod(), rpcRequest.getArgsType());
            Object result = ReflectionUtils.invokeMethod(method, bean, rpcRequest.getArgs());
            return new RpcResponse(result);
        } catch (Throwable e) {
            LOGGER.error("invoke exception,", e);
            return new RpcResponse(e);
        }
    }
}

响应实体:

package com.menghao.rpc.provider.model;

import lombok.Data;

import java.io.Serializable;

/**
 * <p>Rpc调用响应实体封装.<br>
 *
 * @author MarvelCode.
 * @version 2018/8/1.
 */
@Data
public class RpcResponse implements Serializable {

    private Object result;

    private Throwable throwable;

    public RpcResponse(Object result) {
        this.result = result;
    }

    public RpcResponse(Throwable throwable) {
        this.throwable = throwable;
    }
}

至此,基于Http的远程服务调用的大致流程就结束了。

配置

该工程基于 Spring Boot开发,配置采用自配置方式。

依赖

<dependency>
	<groupId>com.menghao</groupId>
	<artifactId>marvelcode-rpc</artifactId>
	<version>1.0-SNAPSHOT</version>
</dependency>

服务提供方

# 自定义端口
server.port=
# ZooKeeper所在机器 ip:port
marvel.rpc.zkServerHost =
marvel.rpc.consumerEnable=false
marvel.rpc.providerEnable=true
marvel.rpc.type=http

服务消费方

# ZooKeeper所在机器 ip:port
marvel.rpc.zkServerHost=
marvel.rpc.providerEnable=false
marvel.rpc.consumerEnable=true
marvel.rpc.type=http

总结

代码均已测试,这里测试结果就不赘述了。各位可以本地先简写一个服务接口层,在服务提供方实现,并部署等待消费方调用。消费方引入服务接口层,编写调用代码,同样部署。工程运行依赖Spring Boot框架。

本篇目的在于讲解RPC调用实现的一种思路,很多功能尚不完备,例如负载均衡采用了简单的随机访问方式。完整的Rpc框架需要包括权重负载、幂等、重试等额外功能。另外,采用 Http 方式的远程调用性能上,不如 Tcp 方式的。

有时间我会整理基于 Tcp 方式远程服务调用,敬请期待。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

创业时代

创业时代

付遥 / 中信出版社 / 2015-7 / 39.8元

香港人郭鑫年酷爱赛车,在驾车穿越隧道的时候,因为收发短信发生意外,他从被撞得破烂的车里爬出来时,兴奋地高喊:我有一个伟大的想法,手机上的对讲机,将要改变世界!他随即辞职来到北京,开始艰难的创业历程。 移动技术迅猛发展,正在颠覆互联网行业,郭鑫年误打误撞,对讲机用户数量急增,竟成为移动互联网的明星,他也因此置身于风口浪尖。三大互联网巨头为了抢夺手机入口大打出手,无不希望争夺这张通往未来移动市场......一起来看看 《创业时代》 这本书的介绍吧!

URL 编码/解码
URL 编码/解码

URL 编码/解码

MD5 加密
MD5 加密

MD5 加密工具

SHA 加密
SHA 加密

SHA 加密工具