手把手教你写一个RPC

栏目: 服务器 · 发布时间: 6年前

内容简介:定义:RPC(Remote Procedure Call Protocol)——下面我们来看一下一个RPC调用的流程涉及哪些通信细节:RPC的目标就是要2~8这些步骤都封装起来,让用户对这些细节透明。

定义:RPC(Remote Procedure Call Protocol)—— 远程过程调用协议 ,RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了 传输层应用层RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

我的理解:与其说把RPC 看作是一种协议,倒不如把 它看作是一种 客户机/服务器交互的模式,但是 RPC一定是基于 TCP 或者 其他 通信协议的

下面我们来看一下一个RPC调用的流程涉及哪些通信细节:

手把手教你写一个RPC
  • 服务消费方(client)调用以本地调用方式调用服务;(1)
  • client stub接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体;(2)
  • client stub找到服务地址,并将消息发送到服务端;(3)
  • server stub收到消息后进行解码;(4)
  • server stub根据解码结果调用本地的服务;(5)
  • 本地服务执行并将结果返回给server stub;(6)
  • server stub将返回结果打包成消息并发送至消费方;(7)
  • client stub接收到消息,并进行解码;(8)
  • 服务消费方得到最终结果。(9)

RPC的目标就是要2~8这些步骤都封装起来,让用户对这些细节透明。

1.2 手动实现

1.2.1 先做一个空接口实现序列化接口

public interface IRpcService extends Serializable{

}
复制代码

1.2.2 做一个需要被远程调用的接口 以及对应的接口实现类

public interface IHelloService extends IRpcService{
    String sayHi(String name,String message); 
}
复制代码
public class HelloServiceImpl implements IHelloService{

    private static final long serialVersionUID = 146468468464364698L;

    @Override
    public String sayHi(String name, String message) {
        return new StringBuilder().append("hi~!").append(",").append(message).toString();
    }

}
复制代码

1.2.3 需要写一个服务端,主要的作用 是进行服务注册(接口注册) 以及 接收客户端的调用参数 执行调用请求 返回结果

注:这个地方 我没有采用dom4j 解析配置文件的形式 进行接口注册 有时间的朋友可以多加一层

public interface Server {
    
    //Socket端口
    int PORT = 8080;
    
    //启动服务端
    void start() throws IOException;
    
    //停止服务端
    void stop();
    
    /**
     * 服务注册
     * -- serviceInterface 对外暴露接口
     * -- 内部实现类
     */
    void regist(Class<? extends IRpcService> serviceInterface,Class<? extends IRpcService> impl);
    
}
复制代码
public class ServerCenter implements Server{
    
    /**线程池 接收客户端调用**/
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 20, 200, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
    /**服务注册缓存**/
    public static final Map<String,Class<?>> serviceRegistry = new HashMap<>();
    
    /**
     * 启动服务
     */
    @Override
    public void start() throws IOException {
        ServerSocket server = new ServerSocket();
        server.bind(new InetSocketAddress(PORT));
        try {
            while(true){
                executor.execute(new ServiceTask(server.accept()));
            }
        } finally {
            server.close();
        }
    }
    
    /**
     * 停止服务
     */
    @Override
    public void stop() {
        executor.shutdown();
    }
    
    /**
     * 注册服务
     */
    @Override
    public void regist(Class<? extends IRpcService> serviceInterface, Class<? extends IRpcService> impl) {
        serviceRegistry.put(serviceInterface.getName(), impl);
    }
    
    private static class ServiceTask implements Runnable{
        
        Socket client = null;
        
        public ServiceTask(Socket client) {
            this.client = client;
        }
        
        @Override
        public void run() {
            ObjectInputStream input = null;
            ObjectOutputStream output = null;
            
            try {
                
                input = new ObjectInputStream(client.getInputStream());
                String serviceName = input.readUTF();
                String methodName = input.readUTF();
                
                Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
                Object[] arguments = (Object[]) input.readObject();
                Class<?> serviceClass = serviceRegistry.get(serviceName);
                if(serviceClass == null){
                    throw new ClassNotFoundException(serviceName + "not found");
                }
                Method method = serviceClass.getMethod(methodName, parameterTypes);
                Object result = method.invoke(serviceClass.newInstance(), arguments);
                
                //将执行结果反序列化 通过socket返给客户端
                output = new ObjectOutputStream(client.getOutputStream());
                output.writeObject(result);
                
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                
                if(input != null){
                    try {
                        input.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                
                if(output != null){
                    try {
                        output.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                
                if(client != null){
                    try {
                        client.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                
            }
        }
        
    }
    
    public static void main(String[] args) throws Exception {
        ServerCenter center = new ServerCenter();
        center.regist(IHelloService.class,new HelloServiceImpl().getClass());
        center.start();
    }
}
复制代码

1.2.4 写一个客户端,用动态代理 获取被代理接口的 各种参数 传输给 服务端,接收返回结果,打印到控制台

public class Client {
    
    
    @SuppressWarnings("unchecked")
    public static <T extends IRpcService>T getRemoteProxyObj(final Class<? extends IRpcService> serviceInterface,final InetSocketAddress addr){
        return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface}, new InvocationHandler() {
            
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                Socket socket = null;
                ObjectOutputStream output = null;
                ObjectInputStream input = null;
                try {
                    //1.创建Socket客户端,根据指定地址连接远程服务提供者
                    socket = new Socket();
                    socket.connect(addr);
                    
                    //2.将远程服务调用所需的接口类、方法名、参数列表等编码后发送给服务提供者
                    output = new ObjectOutputStream(socket.getOutputStream());
                    output.writeUTF(serviceInterface.getName());
                    output.writeUTF(method.getName());
                    output.writeObject(method.getParameterTypes());
                    output.writeObject(args);
                    
                    //3.同步阻塞等待服务器返回应答 获取应答后返回
                    input = new ObjectInputStream(socket.getInputStream());
                    return input.readObject();
                } finally{
                    if(socket != null){
                        socket.close();
                    }
                    
                    if(output != null){
                        output.close();
                    }
                    
                    if(input != null){
                        input.close();
                    }
                }
            }
        });
    }
    
}
复制代码

1.2.5 测试

注:测试之前 需要开启服务端

public class RpcTest {
    public static void main(String[] args) throws IOException {
        IHelloService service  = Client.getRemoteProxyObj(IHelloService.class, new InetSocketAddress(8080));
        System.out.println(service.sayHi("张三", "新年快乐!"));
    }
}
复制代码

就这样我们实现了一个简陋的RPC

本文意在通过实现简单的RPC,去真正意义上对RPC框架的实现原理有初步的了解,而不是人云亦云。

此RPC实现有诸多缺点,但是 我们只要明白RPC的基座 其他的RPC框架只是完善基座以及扩展而已 。


以上所述就是小编给大家介绍的《手把手教你写一个RPC》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

ANSI Common Lisp

ANSI Common Lisp

Paul Graham / Prentice Hall / 1995-11-12 / USD 116.40

For use as a core text supplement in any course covering common LISP such as Artificial Intelligence or Concepts of Programming Languages. Teaching students new and more powerful ways of thinking abo......一起来看看 《ANSI Common Lisp》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具