gRPC是什么

栏目: 服务器 · 发布时间: 7年前

内容简介:gRPC是什么

GRPC是google开源的一个高性能、跨语言的RPC框架,基于HTTP2协议,基于protobuf 3.x,基于Netty 4.x +。GRPC与thrift、avro-rpc等其实在总体原理上并没有太大的区别,简而言之GRPC并没有太多突破性的创新。(如下描述,均基于 JAVA 语言的实现)

对于开发者而言:

1)需要使用protobuf定义接口,即.proto文件

2)然后使用compile工具生成特定语言的执行代码,比如JAVA、C/C++、 Python 等。类似于thrift,为了解决跨语言问题。

3)启动一个Server端,server端通过侦听指定的port,来等待Client链接请求,通常使用Netty来构建,GRPC内置了Netty的支持。

4)启动一个或者多个Client端,Client也是基于Netty,Client通过与Server建立TCP长链接,并发送请求;Request与Response均被封装成HTTP2的stream Frame,通过Netty Channel进行交互。

对于GRPC的“鼓吹”,本文不多表述,截止到今日,GRPC仍然处于开发阶段,尚没有release版本,而且特性也很多需要补充;GRPC基于protobuf 3.x,但是protobuf 3.x也没有release版本;虽然HTTP2协议已成定局,但尚未被主流web容器包括代理服务器支持,这意味着GRPC在HTTP负载均衡方面尚有欠缺;最终,在短期内我们还不能在production环境中实施,可以做技术储备。不过GRPC的缺点,在将来将会成为它的优点,我们需要时间等待它的成熟。

1)GRPC尚未提供连接池

2)尚未提供“服务发现”、“负载均衡”机制

3)因为基于HTTP2,绝大部多数HTTP Server、Nginx都尚不支持,即Nginx不能将GRPC请求作为HTTP请求来负载均衡,而是作为普通的TCP请求。(nginx将会在1.9版本支持)

4)GRPC尚不成熟,易用性还不是很理想;就本人而言,我还是希望GRPC能够像hessian一样:无IDL文件,无需代码生成,接口通过HTTP表达。

5)Spring容器尚未提供整合。

在实际应用中,GRPC尚未完全提供连接池、服务自动发现、进程内负载均衡等高级特性,需要开发人员额外的封装;最大的问题,就是GRPC生成的接口,调用方式实在是不太便捷(JAVA),最起码与thrift相比还有差距,希望未来能够有所改进。

一、实例

1、proto文件

GRPC并没有创造新的序列化协议,而是使用已有的protobuf;基于protobuf来声明数据模型和RPC接口服务,当然protobuf是一个非常优秀的协议框架。关于protobuf 3.x的相关文档,请参见【protobuf 3】

接下来,我们设计一个sayHello接口,我们将数据模型和RPC接口分别保存在两个文件中。

1)TestModel.proto

syntax = “proto3″;
package com.test.grpc;
option java_package = “com.test.grpc.service.model”;
message TestRequest{
    string name  = 1;
    int32 id    = 2;
}
message TestResponse{
    string message = 1;
}

2)TestService.proto

syntax = “proto3″;
package com.test.grpc;
option java_package = “com.test.grpc.service”;
import “TestModel.proto”;
service TestRpcService{
    rpc sayHello(TestRequest) returns (TestResponse);
}

proto文件中需要注意加上“syntax”,表示使用protobuf 3的语法。

2、生成JAVA代码

生成代码,我们最好借助于maven插件,可以在pom文件中增加如下信息:

<pluginRepositories><!– 插件库 –>
    <pluginRepository>
        <id>protoc-plugin</id>
        <url>https://dl.bintray.com/sergei-ivanov/maven/</url>
    </pluginRepository>
</pluginRepositories>
<build>
    <extensions>
        <extension>
            <groupId>kr.motd.maven</groupId>
            <artifactId>os-maven-plugin</artifactId>
            <version>1.4.0.Final</version>
        </extension>
    </extensions>
    <plugins>
        <plugin>
            <groupId>com.google.protobuf.tools</groupId>
            <artifactId>maven-protoc-plugin</artifactId>
            <version>0.4.4</version>
            <configuration>
                <protocArtifact>com.google.protobuf:protoc:3.0.0-beta-2:exe:${os.detected.classifier}</protocArtifact>
                <pluginId>grpc-java</pluginId>
                <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
            </configuration>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>compile-custom</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

然后只需要执行“mvn compile”指令即可,此后我们会在项目的target目录下看到生成的classes文件,当然最终我们还是需要将service打成jar包发布的。maven仍然可以帮助我们做这些工作,由.proto生成classes是在compile阶段,那么jar阶段仍然是可以将classes打成jar,只需要借助maven-jar-plugin插件即可。

3、开发Server端服务(简例)

//server端实现类,扩展原有接口
public class TestServiceImpl implements TestRpcServiceGrpc.TestRpcService {

    @Override
    public void sayHello(TestModel.TestRequest request, StreamObserver<TestModel.TestResponse> responseObserver) {
        String result = request.getName() + request.getId();
        TestModel.TestResponse response = TestModel.TestResponse.newBuilder().setMessage(result).build();
        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }
}
public class TestServer {

    public static void main(String[] args) throws Exception{

        ServerImpl server = NettyServerBuilder.forPort(50010).addService(TestRpcServiceGrpc.bindService(new TestServiceImpl())).build();
        server.start();
        server.awaitTermination();//阻塞直到退出
    }
}

稍后启动TestServer即可。

4、开发Client端(简例)

public class TestClient {

    private final TestRpcServiceGrpc.TestRpcServiceBlockingStub client;
    public TestClient(String host,int port) {
        ManagedChannel channel =  NettyChannelBuilder.forAddress(host, port).usePlaintext(true).build();
        client = TestRpcServiceGrpc.newBlockingStub(channel).withDeadlineAfter(60000, TimeUnit.MILLISECONDS);
    }

    public String sayHello(String name,Integer id) {
        TestModel.TestRequest request = TestModel.TestRequest.newBuilder().setId(id).setName(name).build();
        TestModel.TestResponse response = client.sayHello(request);
        return response.getMessage();
    }
}

然后我们运行即可,代码非常简单,当然无论是Client还是Server端,我们还有其他额外的参数可以配置,我们稍后详细介绍。

二、原理解析

GRPC的Client与Server,均通过Netty Channel作为数据通信,序列化、反序列化则使用Protobuf,每个请求都将被封装成HTTP2的Stream,在整个生命周期中,客户端Channel应该保持长连接,而不是每次调用重新创建Channel、响应结束后关闭Channel(即短连接、交互式的RPC),目的就是达到链接的复用,进而提高交互效率。

1、Server端

我们通常使用NettyServerBuilder,即IO处理模型基于Netty,将来可能会支持其他的IO模型。Netty Server的IO模型简析:

1)创建ServerBootstrap,设定BossGroup与workerGroup线程池

2)注册childHandler,用来处理客户端链接中的请求成帧

3)bind到指定的port,即内部初始化ServerSocketChannel等,开始侦听和接受客户端链接。

4)BossGroup中的线程用于accept客户端链接,并转发(轮训)给workerGroup中的线程。

5)workerGroup中的特定线程用于初始化客户端链接,初始化pipeline和handler,并将其注册到worker线程的selector上(每个worker线程持有一个selector,不共享)

6)selector上发生读写事件后,获取事件所属的链接句柄,然后执行handler(inbound),同时进行拆封package,handler执行完毕后,数据写入通过,由outbound handler处理(封包)通过链接发出。    注意每个worker线程上的数据请求是队列化的。

参见源码:SingleThreadEventLoop、NioEventLoop。(请求队列化)

GRPC而言,只是对Netty Server的简单封装,底层使用了PlaintextHandler、Http2ConnectionHandler的相关封装等。具体Framer、Stream方式请参考Http2相关文档。

1)bossEventLoopGroup:如果没指定,默认为一个static共享的对象,即JVM内所有的NettyServer都使用同一个Group,默认线程池大小为1。

2)workerEventLoopGroup:如果没指定,默认为一个static共享的对象,线程池大小为coreSize * 2。这两个对象采用默认值并不会带来问题;通常情况下,即使你的application中有多个GRPC Server,默认值也一样能够带来收益。不合适的线程池大小,有可能会是性能受限。

3)channelType:默认为NioServerSocketChannel,通常我们采用默认值;当然你也可以开发自己的类。如果此值为NioServerSocketChannel,则开启keepalive,同时设定SO_BACKLOG为128;BACKLOG就是系统底层已经建立引入链接但是尚未被accept的Socket队列的大小,在链接密集型(特别是短连接)时,如果队列超过此值,新的创建链接请求将会被拒绝(有可能你在压力测试时,会遇到这样的问题),keepalive和BACKLOG特性目前无法直接修改。

[root@sh149 ~]# sysctl -a|grep tcp_keepalive
net.ipv4.tcp_keepalive_time = 60  ##单位:秒
net.ipv4.tcp_keepalive_probes = 9
net.ipv4.tcp_keepalive_intvl = 75 ##单位:秒
##可以在/etc/sysctl.conf查看和修改相关值
##tcp_keepalive_time:最后一个实际数据包发送完毕后,首个keepalive探测包发送的时间。
##如果首个keepalive包探测成功,那么链接会被标记为keepalive(首先TCP开启了keepalive)
##此后此参数将不再生效,而是使用下述的2个参数继续探测
##tcp_keepalive_intvl:此后,无论通道上是否发生数据交换,keepalive探测包发送的时间间隔
##tcp_keepalive_probes:在断定链接失效之前,尝试发送探测包的次数;
##如果都失败,则断定链接已关闭。

对于Server端,我们需要关注上述keepalive的一些设置;如果Netty Client在空闲一段时间后,Server端会主动关闭链接,有可能Client仍然保持链接的句柄,将会导致RPC调用时发生异常。这也会导致GRPC客户端调用时偶尔发生错误的原因之一。

4)followControlWindow:流量控制的窗口大小,单位:字节,默认值为1M,HTTP2中的“Flow Control”特性;连接上,已经发送尚未ACK的数据帧大小,比如window大小为100K,且winow已满,每次向Client发送消息时,如果客户端反馈ACK(携带此次ACK数据的大小),window将会减掉此大小;每次向window中添加亟待发送的数据时,window增加;如果window中的数据已达到限定值,它将不能继续添加数据,只能等待Client端ACK。

5)maxConcurrentCallPerConnection:每个connection允许的最大并发请求数,默认值为Integer.MAX_VALUE;如果此连接上已经接受但尚未响应的streams个数达到此值,新的请求将会被拒绝。为了避免TCP通道的过度拥堵,我们可以适度调整此值,以便Server端平稳处理,毕竟buffer太多的streams会对server的内存造成巨大压力。

6)maxMessageSize:每次调用允许发送的最大数据量,默认为100M。

7)maxHeaderListSize:每次调用允许发送的header的最大条数,GRPC中默认为8192。

对于其他的比如SSL/TSL等,可以参考其他文档。

GRPC Server端,还有一个最终要的方法:addService。【如下文service代理模式】

在此之前,我们需要介绍一下bindService方法,每个GRPC生成的service代码中都有此方法,它以硬编码的方式遍历此service的方法列表,将每个方法的调用过程都与“被代理实例”绑定,这个模式有点类似于静态代理,比如调用sayHello方法时,其实内部直接调用“被代理实例”的sayHello方法(参见MethodHandler.invoke方法,每个方法都有一个唯一的index,通过硬编码方式执行);bindService方法的最终目的是创建一个ServerServiceDefinition对象,这个对象内部位置一个map,key为此Service的方法的全名(fullname,{package}.{service}.{method}),value就是此方法的GRPC封装类(ServerMethodDefinition)。

源码分析:

private static final int METHODID_SAY_HELLO = 0;
private static class MethodHandlers<Req, Resp> implements
      … {
    private final TestRpcService serviceImpl;//实际被代理实例
    private final int methodId;

    public MethodHandlers(TestRpcService serviceImpl, int methodId) {
      this.serviceImpl = serviceImpl;
      this.methodId = methodId;
    }

    @java.lang.SuppressWarnings(“unchecked”)
    public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
      switch (methodId) {
        case METHODID_SAY_HELLO:        //通过方法的index来判定具体需要代理那个方法
          serviceImpl.sayHello((com.test.grpc.service.model.TestModel.TestRequest) request,
              (io.grpc.stub.StreamObserver<com.test.grpc.service.model.TestModel.TestResponse>) responseObserver);
          break;
        default:
          throw new AssertionError();
      }
    }
    ….
  }

  public static io.grpc.ServerServiceDefinition bindService(
      final TestRpcService serviceImpl) {
    return io.grpc.ServerServiceDefinition.builder(SERVICE_NAME)
        .addMethod(
          METHOD_SAY_HELLO,
          asyncUnaryCall(
            new MethodHandlers<
              com.test.grpc.service.model.TestModel.TestRequest,
              com.test.grpc.service.model.TestModel.TestResponse>(
                serviceImpl, METHODID_SAY_HELLO)))
        .build();
  }

addService方法可以添加多个Service,即一个Netty Server可以为多个service服务,这并不违背 设计模式 和架构模式。addService方法将会把service保存在内部的一个map中,key为serviceName(即{package}.{service}),value就是上述bindService生成的对象。

那么究竟Server端是如何解析RPC过程的?Client在调用时会将调用的service名称 + method信息保存在一个GRPC“保留”的header中,那么Server端即可通过获取这个特定的header信息,就可以得知此stream需要请求的service、以及其method,那么接下来只需要从上述提到的map中找到service,然后找到此method,直接代理调用即可。执行结果在Encoder之后发送给Client。(参见:NettyServerHandler)

因为是map存储,所以我们需要在定义.proto文件时,尽可能的指定package信息,以避免因为service过多导致名称可能重复的问题。

2、Client端

我们使用ManagedChannelBuilder来创建客户端channel,ManagedChannelBuilder使用了provider机制,具体是创建了哪种channel有provider决定,可以参看META-INF下同类名的文件中的注册信息。当前Channel有2种:NettyChannelBuilder与OkHttpChannelBuilder。本人的当前版本中为NettyChannelBuilder;我们可以直接使用NettyChannelBuilder来构建channel。如下描述则针对NettyChannelBuilder:

配置参数与NettyServerBuilder基本类似,再次不再赘言。默认情况下,Client端默认的eventLoopGroup线程池也是static的,全局共享的,默认线程个数为coreSize * 2。合理的线程池个数可以提高客户端的吞吐能力。

ManagedChannel是客户端最核心的类,它表示逻辑上的一个channel;底层持有一个物理的transport(TCP通道,参见NettyClientTransport),并负责维护此transport的活性;即在RPC调用的任何时机,如果检测到底层transport处于关闭状态(terminated),将会尝试重建transport。(参见TransportSet.obtainActiveTransport())

通常情况下,我们不需要在RPC调用结束后就关闭Channel,Channel可以被一直重用,直到Client不再需要请求位置或者Channel无法真的异常中断而无法继续使用。当然,为了提高Client端application的整体并发能力,我们可以使用连接池模式,即创建多个ManagedChannel,然后使用轮训、随机等算法,在每次RPC请求时选择一个Channel即可。(备注,连接池特性,目前GRPC尚未提供,需要额外的开发)

每个Service客户端,都生成了2种stub:BlockingStub和FutureStub;这两个Stub内部调用过程几乎一样,唯一不同的是BlockingStub的方法直接返回Response Model,而FutureStub返回一个Future对象。BlockingStub内部也是基于Future机制,只是封装了阻塞等待的过程:

try {
        //也是基于Future
      ListenableFuture<RespT> responseFuture = futureUnaryCall(call, param);
      //阻塞过程
      while (!responseFuture.isDone()) {
        try {
          executor.waitAndDrain();
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          throw Status.CANCELLED.withCause(e).asRuntimeException();
        }
      }
      return getUnchecked(responseFuture);
    } catch (Throwable t) {
      call.cancel();
      throw t instanceof RuntimeException ? (RuntimeException) t : new RuntimeException(t);
}

创建一个Stub的成本是非常低的,我们可以在每次请求时都通过channel创建新的stub,这并不会带来任何问题(只不过是创建了大量对象);其实更好的方式是,我们应该使用一个Stub发送多次请求,即Stub也是可以重用的;直到Stub上的状态异常而无法使用。最常见的异常,就是“io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED”,即表示DEADLINE时间过期,我们可以为每个Stub配置deadline时间,那么如果此stub被使用的时长超过此值(不是空闲的时间),将不能再发送请求,此时我们应该创建新的Stub。很多人想尽办法来使用“withDeadlineAfter”方法来实现一些奇怪的事情,此参数的主要目的就是表明:此stub只能被使用X时长,此后将不能再进行请求,应该被释放。所以,它并不能实现类似于“keepAlive”的语义,即使我们需要keepAlive,也应该在Channel级别,而不是在一个Stub上。

如果你使用了连接池,那么其实连接池不应该关注DEADLINE的错误,只要Channel本身没有terminated即可;就把这个问题交给调用者处理。如果你也对Stub使用了对象池,那么你就可能需要关注这个情况了,你不应该向调用者返回一个“DEADLINE”的stub,或者如果调用者发现了DEADLINE,你的对象池应该能够移除它。

1)实例化ManagedChannel,此channel可以被任意多个Stub实例引用;如上文说述,我们可以通过创建Channel池,来提高application整体的吞吐能力。此Channel实例,不应该被shutdown,直到Client端停止服务;在任何时候,特别是创建Stub时,我们应该判定Channel的状态。

synchronized (this) {
    if (channel.isShutdown() || channel.isTerminated()) {
        channel = ManagedChannelBuilder.forAddress(poolConfig.host, poolConfig.port).usePlaintext(true).build();
    }
    //new Stub
}

//或者
ManagedChannel channel = (ManagedChannel)client.getChannel();
if(channel.isShutdown() || channel.isTerminated()) {
    client = createBlockStub();
}
client.sayHello(…)

因为Channel是可以多路复用,所以我们用Pool机制(比如commons-pool)也可以实现连接池,只是这种池并非完全符合GRPC/HTTP2的设计语义,因为GRPC允许一个Channel上连续发送对个Requests(然后一次性接收多个Responses),而不是“交互式”的Request-Response模式,当然这么使用并不会有任何问题。

2)对于批量调用的场景,我们可以使用FutureStub,对于普通的业务类型RPC,我们应该使用BlockingStub。

3)每个RPC方法的调用,比如sayHello,调用开始后,将会为每个调用请求创建一个ClientCall实例,其内部封装了调用的方法、配置选项(headers)等。此后将会创建Stream对象,每个Stream都持有唯一的streamId,它是Transport用于分拣Response的凭证。最终调用的所有参数都会被封装在Stream中。

4)检测DEADLINE,是否已经过期,如果过期,将使用FailingClientStream对象来模拟整个RPC过程,当然请求不会通过通道发出,直接经过异常流处理过程。

5)然后获取transport,如果此时检测到transport已经中断,则重建transport。(自动重练机制,ClientCallImpl.start()方法)

6)发送请求参数,即我们Request实例。一次RPC调用,数据是分多次发送,但是ClientCall在创建时已经绑定到了指定的线程上,所以数据发送总是通过一个线程进行(不会乱序)。

7)将ClientCall实例置为halfClose,即半关闭,并不是将底层Channel或者Transport半关闭,只是逻辑上限定此ClientCall实例上将不能继续发送任何stream信息,而是等待Response。

8)Netty底层IO将会对reponse数据流进行解包(Http2ConnectionDecoder),并根据streamId分拣Response,同时唤醒响应的ClientCalls阻塞。(参见ClientCalls,GrpcFuture)

9)如果是BlockingStub,则请求返回,如果响应中包含应用异常,则封装后抛出;如果是网络异常,则可能触发Channel重建、Stream重置等。

到此为止,已经把GRPC的基本原理描述完毕,此后如果有其他问题,则继续补充。

———————–

gRPC基础:C++

本教程提供了C++程序员如何使用gRPC的指南。

通过学习教程中例子,你可以学会如何:

  • 在一个 .proto 文件内定义服务.
  • 用 protocol buffer 编译器生成服务器和客户端代码.
  • 使用 gRPC 的 C++ API 为你的服务实现一个简单的客户端和服务器.

假设你已经阅读了概览并且熟悉protocol buffers. 注意,教程中的例子使用的是 protocol buffers 语言的 proto3 版本,它目前只是 alpha 版:可以在proto3 语言指南和 protocol buffers 的 Github 仓库的版本注释发现更多关于新版本的内容.

这算不上是一个在 C++ 中使用 gRPC 的综合指南:以后会有更多的参考文档.

为什么使用 gRPC?

我们的例子是一个简单的路由映射的应用,它允许客户端获取路由特性的信息,生成路由的总结,以及交互路由信息,如服务器和其他客户端的流量更新。

有了 gRPC, 我们可以一次性的在一个 .proto 文件中定义服务并使用任何支持它的语言去实现客户端和服务器,反过来,它们可以在各种环境中,从Google的服务器到你自己的平板电脑- gRPC 帮你解决了不同语言间通信的复杂性以及环境的不同.使用 protocol buffers 还能获得其他好处,包括高效的序列号,简单的 IDL 以及容易进行接口更新。

例子代码和设置

教程的代码在这里 grpc/grpc/examples/cpp/route_guide. 要下载例子,通过运行下面的命令去克隆 grpc 代码库:

$ git clone https://github.com/grpc/grpc.git

改变当前的目录到 examples/cpp/route_guide

$ cd examples/cpp/route_guide

你还需要安装生成服务器和客户端的接口代码相关工具-如果你还没有安装的话,查看下面的设置指南 C++快速开始指南。

定义服务

我们的第一步(可以从概览中得知)是使用 protocol buffers去定义 gRPC service 和方法 request 以及 response 的类型。你可以在 examples/protos/route_guide.proto 看到完整的 .proto 文件。

要定义一个服务,你必须在你的 .proto 文件中指定 service

service RouteGuide {
   ...
}

然后在你的服务中定义 rpc 方法,指定请求的和响应类型。gRPC允 许你定义4种类型的 service 方法,在 RouteGuide 服务中都有使用:

  • 一个 简单 RPC , 客户端使用存根发送请求到服务器并等待响应返回,就像平常的函数调用一样。
// Obtains the feature at a given position.
   rpc GetFeature(Point) returns (Feature) {}
  • 一个 服务器端流式 RPC , 客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息。从例子中可以看出,通过在 响应 类型前插入 stream 关键字,可以指定一个服务器端的流方法。
// Obtains the Features available within the given Rectangle.  Results are
  // streamed rather than returned at once (e.g. in a response message with a
  // repeated field), as the rectangle may cover a large area and contain a
  // huge number of features.
  rpc ListFeatures(Rectangle) returns (stream Feature) {}
  • 一个 客户端流式 RPC , 客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦客户端完成写入消息,它等待服务器完成读取返回它的响应。通过在 请求 类型前指定 stream 关键字来指定一个客户端的流方法。
// Accepts a stream of Points on a route being traversed, returning a
  // RouteSummary when traversal is completed.
  rpc RecordRoute(stream Point) returns (RouteSummary) {}
  • 一个 双向流式 RPC 是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替的读取和写入消息,或者其他读写的组合。 每个流中的消息顺序被预留。你可以通过在请求和响应前加 stream 关键字去制定方法的类型。
// Accepts a stream of RouteNotes sent while a route is being traversed,
  // while receiving other RouteNotes (e.g. from other users).
  rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

我们的 .proto 文件也包含了所有请求的 protocol buffer 消息类型定义以及在服务方法中使用的响应类型-比如,下面的 Point 消息类型:

// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

生成客户端和服务器端代码

接下来我们需要从 .proto 的服务定义中生成 gRPC 客户端和服务器端的接口。我们通过 protocol buffer 的编译器 protoc 以及一个特殊的 gRPC C++ 插件来完成。

简单起见,我们提供一个 makefile 帮您用合适的插件,输入,输出去运行 protoc (如果你想自己去运行,确保你已经安装了 protoc,并且请遵循下面的 gRPC 代码安装指南)来操作:

$ make route_guide.grpc.pb.cc route_guide.pb.cc

实际上运行的是:

$ protoc -I ../../protos --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_cpp_plugin` ../../protos/route_guide.proto
$ protoc -I ../../protos --cpp_out=. ../../protos/route_guide.proto

运行这个命令可以在当前目录中生成下面的文件:

  • route_guide.pb.h , 声明生成的消息类的头文件
  • route_guide.pb.cc , 包含消息类的实现
  • route_guide.grpc.pb.h , 声明你生成的服务类的头文件
  • route_guide.grpc.pb.cc , 包含服务类的实现

这些包括:

  • 所有的填充,序列化和获取我们请求和响应消息类型的 protocol buffer 代码
  • 名为 RouteGuide 的类,包含
    • 为了客户端去调用定义在 RouteGuide 服务的远程接口类型(或者 存根 )
    • 让服务器去实现的两个抽象接口,同时包括定义在 RouteGuide 中的方法。

创建服务器

首先来看看我们如何创建一个 RouteGuide 服务器。如果你只对创建 gRPC 客户端感兴趣,你可以跳过这个部分,直接到创建客户端 (当然你也可能发现它也很有意思)。

RouteGuide 服务工作有两个部分:

  • 实现我们服务定义的生成的服务接口:做我们的服务的实际的“工作”。
  • 运行一个 gRPC 服务器,监听来自客户端的请求并返回服务的响应。

你可以从examples/cpp/route_guide/route_guide_server.cc看到我们的 RouteGuide 服务器的实现代码。现在让我们近距离研究它是如何工作的。

实现RouteGuide

我们可以看出,服务器有一个实现了生成的 RouteGuide::Service 接口的 RouteGuideImpl 类:

class RouteGuideImpl final : public RouteGuide::Service {
...
}

在这个场景下,我们正在实现 同步 版本的 RouteGuide ,它提供了 gRPC 服务器缺省的行为。同时,也有可能去实现一个异步的接口 RouteGuide::AsyncService ,它允许你进一步定制服务器线程的行为,虽然在本教程中我们并不关注这点。

RouteGuideImpl 实现了所有的服务方法。让我们先来看看最简单的类型 GetFeature ,它从客户端拿到一个 Point 然后将对应的特性返回给数据库中的 Feature

Status GetFeature(ServerContext* context, const Point* point,
                    Feature* feature) override {
    feature->set_name(GetFeatureName(*point, feature_list_));
    feature->mutable_location()——>CopyFrom(*point);
    return Status::OK;
  }

这个方法为 RPC 传递了一个上下文对象,包含了客户端的 Point protocol buffer 请求以及一个填充响应信息的 Feature protocol buffer。在这个方法中,我们用适当的信息填充 Feature ,然后返回 OK 的状态,告诉 gRPC 我们已经处理完 RPC,并且 Feature 可以返回给客户端。

现在让我们看看更加复杂点的情况——流式RPC。 ListFeatures 是一个服务器端的流式 RPC,因此我们需要给客户端返回多个 Feature

Status ListFeatures(ServerContext* context, const Rectangle* rectangle,
                      ServerWriter<Feature>* writer) override {
    auto lo = rectangle->lo();
    auto hi = rectangle->hi();
    long left = std::min(lo.longitude(), hi.longitude());
    long right = std::max(lo.longitude(), hi.longitude());
    long top = std::max(lo.latitude(), hi.latitude());
    long bottom = std::min(lo.latitude(), hi.latitude());
    for (const Feature& f : feature_list_) {
      if (f.location().longitude() >= left &&
          f.location().longitude() <= right &&
          f.location().latitude() >= bottom &&
          f.location().latitude() <= top) {
        writer->Write(f);
      }
    }
    return Status::OK;
  }

如你所见,这次我们拿到了一个请求对象(客户端期望在 Rectangle 中找到的 Feature )以及一个特殊的 ServerWriter 对象,而不是在我们的方法参数中获取简单的请求和响应对象。在方法中,根据返回的需要填充足够多的 Feature 对象,用 ServerWriterWrite() 方法写入。最后,和我们简单的 RPC 例子相同,我们返回 Status::OK 去告知gRPC我们已经完成了响应的写入。

如果你看过客户端流方法 RecordRoute ,你会发现它很类似,除了这次我们拿到的是一个 ServerReader 而不是请求对象和单一的响应。我们使用 ServerReaderRead() 方法去重复的往请求对象(在这个场景下是一个 Point )读取客户端的请求直到没有更多的消息:在每次调用后,服务器需要检查 Read() 的返回值。如果返回值为 true ,流仍然存在,它就可以继续读取;如果返回值为 false ,则表明消息流已经停止。

while (stream->Read(&point)) {
  ...//process client input
}

最后,让我们看看双向流RPC RouteChat()

Status RouteChat(ServerContext* context,
                   ServerReaderWriter<RouteNote, RouteNote>* stream) override {
    std::vector<RouteNote> received_notes;
    RouteNote note;
    while (stream->Read(&note)) {
      for (const RouteNote& n : received_notes) {
        if (n.location().latitude() == note.location().latitude() &&
            n.location().longitude() == note.location().longitude()) {
          stream->Write(n);
        }
      }
      received_notes.push_back(note);
    }

    return Status::OK;
  }

这次我们得到的 ServerReaderWriter 对象可以用来读 写消息。这里读写的语法和我们客户端流以及服务器流方法是一样的。虽然每一端获取对方信息的顺序和写入的顺序一致,客户端和服务器都可以以任意顺序读写——流的操作是完全独立的。

启动服务器

一旦我们实现了所有的方法,我们还需要启动一个gRPC服务器,这样客户端才可以使用服务。下面这段代码展示了在我们 RouteGuide 服务中实现的过程:

void RunServer(const std::string& db_path) {
  std::string server_address("0.0.0.0:50051");
  RouteGuideImpl service(db_path);

  ServerBuilder builder;
  builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
  builder.RegisterService(&service);
  std::unique_ptr<Server> server(builder.BuildAndStart());
  std::cout << "Server listening on " << server_address << std::endl;
  server->Wait();
}

如你所见,我们通过使用 ServerBuilder 去构建和启动服务器。为了做到这点,我们需要:

  1. 创建我们的服务实现类 RouteGuideImpl 的一个实例。
  2. 创建工厂类 ServerBuilder 的一个实例。
  3. 在生成器的 AddListeningPort() 方法中指定客户端请求时监听的地址和端口。
  4. 用生成器注册我们的服务实现。
  5. 调用生成器的 BuildAndStart() 方法为我们的服务创建和启动一个RPC服务器。
  6. 调用服务器的 Wait() 方法实现阻塞等待,直到进程被杀死或者 Shutdown() 被调用。

<a name=”client”></a>

创建客户端

在这部分,我们将尝试为 RouteGuide 服务创建一个C++的客户端。你可以从examples/cpp/route_guide/route_guide_client.cc看到我们完整的客户端例子代码.

创建一个存根

为了能调用服务的方法,我们得先创建一个 存根

首先需要为我们的存根创建一个gRPC channel ,指定我们想连接的服务器地址和端口,以及 channel 相关的参数——在本例中我们使用了缺省的 ChannelArguments 并且没有使用SSL:

grpc::CreateChannel("localhost:50051", grpc::InsecureCredentials(), ChannelArguments());

现在我们可以利用channel,使用从.proto中生成的 RouteGuide 类提供的 NewStub 方法去创建存根。

public:
  RouteGuideClient(std::shared_ptr<ChannelInterface> channel,
                   const std::string& db)
      : stub_(RouteGuide::NewStub(channel)) {
    ...
  }

调用服务的方法

现在我们来看看如何调用服务的方法。注意,在本教程中调用的方法,都是 阻塞/同步 的版本:这意味着 RPC 调用会等待服务器响应,要么返回响应,要么引起一个异常。

简单RPC

调用简单 RPC GetFeature 几乎是和调用一个本地方法一样直观。

Point point;
  Feature feature;
  point = MakePoint(409146138, -746188906);
  GetOneFeature(point, &feature);

...

  bool GetOneFeature(const Point& point, Feature* feature) {
    ClientContext context;
    Status status = stub_->GetFeature(&context, point, feature);
    ...
  }

如你所见,我们创建并且填充了一个请求的 protocol buffer 对象(例子中为 Point ),同时为了服务器填写创建了一个响应 protocol buffer 对象。为了调用我们还创建了一个 ClientContext 对象——你可以随意的设置该对象上的配置的值,比如期限,虽然现在我们会使用缺省的设置。注意,你不能在不同的调用间重复使用这个对象。最后,我们在存根上调用这个方法,将其传给上下文,请求以及响应。如果方法的返回是 OK ,那么我们就可以从服务器从我们的响应对象中读取响应信息。

std::cout << "Found feature called " << feature->name()  << " at "
                << feature->location().latitude()/kCoordFactor_ << ", "
                << feature->location().longitude()/kCoordFactor_ << std::endl;

流式RPC

现在来看看我们的流方法。如果你已经读过创建服务器,本节的一些内容看上去很熟悉——流式 RPC 是在客户端和服务器两端以一种类似的方式实现的。下面就是我们称作是服务器端的流方法 ListFeatures ,它会返回地理的 Feature

std::unique_ptr<ClientReader<Feature> > reader(
        stub_->ListFeatures(&context, rect));
    while (reader->Read(&feature)) {
      std::cout << "Found feature called "
                << feature.name() << " at "
                << feature.location().latitude()/kCoordFactor_ << ", "
                << feature.location().longitude()/kCoordFactor_ << std::endl;
    }
    Status status = reader->Finish();

我们将上下文传给方法并且请求,得到 ClientReader 返回对象,而不是将上下文,请求和响应传给方法。客户端可以使用 ClientReader 去读取服务器的响应。我们使用 ClientReaderRead() 反复读取服务器的响应到一个响应 protocol buffer 对象(在这个例子中是一个 Feature ),直到没有更多的消息:客户端需要去检查每次调用完 Read() 方法的返回值。如果返回值为 true ,流依然存在并且可以持续读取;如果是 false ,说明消息流已经结束。最后,我们在流上调用 Finish() 方法结束调用并获取我们 RPC 的状态。

客户端的流方法 RecordRoute 的使用很相似,除了我们将一个上下文和响应对象传给方法,拿到一个 ClientWriter 返回。

std::unique_ptr<ClientWriter<Point> > writer(
        stub_->RecordRoute(&context, &stats));
    for (int i = 0; i < kPoints; i++) {
      const Feature& f = feature_list_[feature_distribution(generator)];
      std::cout << "Visiting point "
                << f.location().latitude()/kCoordFactor_ << ", "
                << f.location().longitude()/kCoordFactor_ << std::endl;
      if (!writer->Write(f.location())) {
        // Broken stream.
        break;
      }
      std::this_thread::sleep_for(std::chrono::milliseconds(
          delay_distribution(generator)));
    }
    writer->WritesDone();
    Status status = writer->Finish();
    if (status.IsOk()) {
      std::cout << "Finished trip with " << stats.point_count() << " points\n"
                << "Passed " << stats.feature_count() << " features\n"
                << "Travelled " << stats.distance() << " meters\n"
                << "It took " << stats.elapsed_time() << " seconds"
                << std::endl;
    } else {
      std::cout << "RecordRoute rpc failed." << std::endl;
    }

一旦我们用 Write() 将客户端请求写入到流的动作完成,我们需要在流上调用 WritesDone() 通知 gRPC 我们已经完成写入,然后调用 Finish() 完成调用同时拿到 RPC 的状态。如果状态是 OK ,我们最初传给 RecordRoute() 的响应对象会跟着服务器的响应被填充。

最后,让我们看看双向流式 RPC RouteChat() 。在这种场景下,我们将上下文传给一个方法,拿到一个可以用来读写消息的 ClientReaderWriter 的返回。

std::shared_ptr<ClientReaderWriter<RouteNote, RouteNote> > stream(
        stub_->RouteChat(&context));

这里读写的语法和我们客户端流以及服务器端流方法没有任何区别。虽然每一方都能按照写入时的顺序拿到另一方的消息,客户端和服务器端都可以以任意顺序读写——流操作起来是完全独立的。

来试试吧!

构建客户端和服务器:

$ make

运行服务器,它会监听50051端口:

$ ./route_guide_server

在另外一个终端运行客户端:

$ ./route_guide_client


来源: http://doc.oschina.net/grpc?t=57966

———————————————————

本文转载自IPD-Chat,IPD-Chat为京东商城基础平台部门官方公众号,扫一扫二维码进行关注。

gRPC是什么

gRPC是Google 开发的基于HTTP/2和Protocol Buffer 3的RPC 框架。

gRPC是开源的,有 C、Java、 Go 等多种语言的实现,可以轻松实现跨语言调用。

声称是”一个高性能,开源,将移动和HTTP/2放在首位的通用的RPC框架”

当前版本1.1.1,主要技术栈:Netty-4.1.8,Protobuff-3.1.0,Guava-20.0

Multi-language, multi-platform framework

● Native implementations in C, Java, and Go

● C stack wrapped by C++, C#, Node, ObjC, Python, Ruby, PHP

● Platforms supported: Linux, Android, iOS, MacOS, Windows

gRPC是什么

gRPC设计动机和原则

最初由Louis Ryan在谷歌其他同事帮助下写成,如下文:

设计动机

十多年来,谷歌一直使用一个叫做Stubby的通用RPC基础框架,用它来连接在其数据中心内和跨数据中心运行的大量微服务。其内部系统早就接受了如今越来越流行的微服务架构。拥有一个统一的、跨平台的RPC的基础框架,使得服务的首次发行在效率、安全性、可靠性和行为分析上得到全面提升,这是支撑这一时期谷歌快速增长的关键。

Stubby有许多非常棒的特性,然而,它没有基于任何标准,而且与其内部的基础框架耦合得太紧密以至于被认为不适合公开发布。随着SPDY、HTTP/2和QUIC的到来,许多类似特性在公共标准中出现,并提供了Stubby不支持的其它功能。很明显,是时候利用这些标准来重写Stubby,并将其适用性扩展到移动、物联网和云场景。

设计原则

● 服务非对象、消息非引用 —— 促进微服务的系统间粗粒度消息交互设计理念,同时避免分布式对象的陷阱和分布式计算的谬误。

● 普遍并且简单 —— 该基础框架应该在任何流行的开发平台上适用,并且易于被个人在自己的平台上构建。它在CPU和内存有限的设备上也应该切实可行。

● 免费并且开源 —— 所有人可免费使用基本特性。以友好的许可协议开源方式发布所有交付件。

● 互通性 —— 该报文协议(Wire Protocol)必须遵循普通互联网基础框架。

● 通用并且高性能 —— 该框架应该适用于绝大多数用例场景,相比针对特定用例的框架,该框架只会牺牲一点性能。

● 分层的 —— 该框架的关键是必须能够独立演进。对报文格式(Wire Format)的修改不应该影响应用层。

● 负载无关的 —— 不同的服务需要使用不同的消息类型和编码,例如protocol buffers、JSON、XML和Thrift,协议上和实现上必须满足这样的诉求。类似地,对负载压缩的诉求也因应用场景和负载类型不同而不同,协议上应该支持可插拔的压缩机制。

● 流 —— 存储系统依赖于流和流控来传递大数据集。像语音转文本或股票代码等其它服务,依靠流表达时间相关的消息序列。

● 阻塞式和非阻塞式 —— 支持异步和同步处理在客户端和服务端间交互的消息序列。这是在某些平台上缩放和处理流的关键。

● 取消和超时 —— 有的操作可能会用时很长,客户端运行正常时,可以通过取消操作让服务端回收资源。当任务因果链被追踪时,取消可以级联。客户端可能会被告知调用超时,此时服务就可以根据客户端的需求来调整自己的行为。

● Lameducking —— 服务端必须支持优雅关闭,优雅关闭时拒绝新请求,但继续处理正在运行中的请求。

● 流控 —— 在客户端和服务端之间,计算能力和网络容量往往是不平衡的。流控可以更好的缓冲管理,以及保护系统免受来自异常活跃对端的拒绝服务(DOS)攻击。

● 可插拔的 —— 数据传输协议(Wire Protocol)只是功能完备API基础框架的一部分。大型分布式系统需要安全、健康检查、负载均衡和故障恢复、监控、跟踪、日志等。实 现上应该提供扩展点,以允许插入这些特性和默认实现。

● API扩展 —— 可能的话,在服务间协作的扩展应该最好使用接口扩展,而不是协议扩展。这种类型的扩展可以包括健康检查、服务内省、负载监测和负载均衡分配。

● 元数据交换 —— 常见的横切关注点,如认证或跟踪,依赖数据交换,但这不是服务公共接口中的一部分。部署依赖于他们将这些特性以不同速度演进到服务暴露的个别API的能力。

● 标准化状态码 —— 客户端通常以有限的方式响应API调用返回的错误。应该限制状态代码名字空间,使得这些错误处理决定更清晰。如果需要更丰富的特定域的状态,可以使用元数据交换机制来提供。

关于HTTP/2和Protocol Buffer 3简介

HTTP/2是什么

HTTP/2是下一代的HTTP协议。

起源于 GOOGLE 带头开发的 SPDY 协议,由 IETF 的 HTTPbis 工作组修改发布。

由两个RFC组成:

● RFC 7540 – Hypertext Transfer Protocol Version 2 (HTTP/2)

● RFC 7541 – HPACK: Header Compression for HTTP/2

这两个 RFC 目前的状态是 PROPOSED STANDARD

HTTP/1的主要问题

Head-of-line blocking,新请求的发起必须等待服务器对前一个请求的回应,无法同时发起多个请求,导致很难充分利用TCP连接。

gRPC是什么

● 头部冗余

HTTP头部包含大量重复数据,比如cookies,多个请求的cookie可能完全一样

HTTP/2改进

● 二进制协议、分帧(Frame)

● 双向流,多路复用

● 头部压缩

● 服务器推送(Server Push)

● 优先级

● 流量控制

● 流重置

gRPC是什么

HTTP/2-帧

●HTTP/2抛弃HTTP/1的文本协议改为二进制协议。HTTP/2的基本传输单元为帧。每个帧都从属于某个流。

● Length: Payload 长度

● Type: 帧类型

● Stream identifier:流ID

● Frame Payload: 依帧类型而不同

HTTP/2-帧的类型

● HEADERS 对应HTTP/1的 Headers

● DATA 对应HTTP/1的 Body

● CONTINUATION 头部太大,分多个帧传输(一个HEADERS+若干CONTINUATION)

● SETTINGS 连接设置

● WINDOW_UPDATE 流量控制

● PUSH_PROMISE 服务端推送

● PRIORITY 流优先级更改

● PING 心跳或计算RTT

● RST_STREAM 马上中止一个流

● GOAWAY 关闭连接并且发送错误信息

HTTP/2-流

HTTP/2连接上传输的每个帧都关联到一个流,一个连接上可以同时有多个流。同一个流的帧按序传输,不同流的帧交错混合传输。客户端、服务端双方都可以建立流,流也可以被任意一方关闭。客户端发起的流使用奇数流ID,服务端发起的使用偶数。

Protocol Buffers是什么

一个语言无关,平台无关,可扩展的结构化数据序列化方案,用于协议通讯,数据存储和其他更多用途。

一个灵活,高效,自动化的结构化数据序列化机制(想象xml),但是更小,更快并且更简单,一旦定义好数据如何构造, 就可以使用特殊的生成的源代码来轻易的读写你的结构化数据到和从不同的数据流,用不同的语言。你甚至可以更新你的数据结构而不打破已部署的使用”旧有”格式编译的程序。

为什么使用HTTP协议

将移动和HTTP/2放在首位的通用的RPC框架,

● 网络基础设施设计良好的支持HTTP,比如防火墙, 负载, 加密, 认证, 压缩, …

gRPC原理-从一个HelloWorld开始

第1步. 定义 hello-dto.proto 文件

syntax = “proto3″;

option java_package = “com.jd.jsf.grpc.dto”;

option java_multiple_files = true;

option java_outer_classname = “HelloServiceDTO”;

package grpc;

// The request message containing the user’s name.

message HelloRequest {

string name = 1;

}

// The response message containing the greetings

message HelloReply {

string message = 1;

}

第2步. 定义hello-service.proto文件(可以和第一步合并)

syntax = “proto3″;

import “hello-dto.proto”;

option java_package = “com.jd.jsf.grpc.service”;

option java_multiple_files = true;

option java_outer_classname = “IHelloService”;

package grpc;

// The greeting service definition.

service HelloService {

// Sends a greeting

rpc SayHello (grpc.HelloRequest) returns (grpc.HelloReply) {}

}

第3步. 生成 源代码 文件

#! /bin/bash

PROTOC3=”/grpc/protoc-3.1.0″

PROJECT_HOME=”./”

echo “gen dto”

${PROTOC3}/bin/protoc

-I=${PROJECT_HOME}/src/main/proto/

–java_out=${PROJECT_HOME}/src/main/java

${PROJECT_HOME}/src/main/proto/hello-dto.proto

echo “gen service”

${PROTOC3}/bin/protoc

-I=${PROJECT_HOME}/src/main/proto/

–java_out=${PROJECT_HOME}/src/main/java

${PROJECT_HOME}/src/main/proto/hello-service.proto

echo “gen grpc service”

${PROTOC3}/bin/protoc

–plugin=protoc-gen-grpc-java=${PROTOC3}/bin/protoc-gen-grpc-java-1.1.1-linux-x86_64.exe

–grpc-java_out=${PROJECT_HOME}/src/main/java

-I=${PROJECT_HOME}/src/main/proto/

${PROJECT_HOME}/src/main/proto/hello-service.proto

echo “over!”

第4步. 编写Server端

int port = 50051;

server = ServerBuilder.forPort(port).addService(new GreeterImpl()).build() .start();

server.awaitTermination();

class GreeterImpl extends HelloServiceGrpc.HelloServiceImplBase {

@Override

public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {

HelloReply reply = HelloReply.newBuilder().setMessage(“Hello ” + req.getName()).build();

responseObserver.onNext(reply);

responseObserver.onCompleted();

}

}

第5步. 编写Client端

ManagedChannel

channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true);

HelloServiceGrpc.HelloServiceBlockingStub

blockingStub = HelloServiceGrpc.newBlockingStub(channel);

HelloServiceGrpc.HelloServiceFutureStub

futureStubStub = HelloServiceGrpc.newFutureStub(channel);

HelloRequest request = HelloRequest.newBuilder().setName(name).build();

HelloReply response = blockingStub.sayHello(request);

ListenableFuture<HelloReply> future = futureStubStub.sayHello(request);

HelloReply response = future.get();

gRPC是什么

gRPC原理- 概念

从HelloWorld中,映射出gRPC的基本概念

1. Channels

在创建客户端存根时,一个gRPC通道提供一个特定主机和端口服务端的连接。客户端可以通过指定通道参数来修改gRPC的默认行为,比如打开关闭消息压缩。一个通道具有状态,包含已连接和空闲 。

2. Stub

Proxy, Channel, Marshaller, MethodDeor

利用代码生成器生成client和server端stub代码,为了跨语言只能这么玩,这也体现了静态语言和动态语言的区别。Stub代码包含了客户端和服务端静态代理类,分别处理消息的加工和发送。还包括序列化方法,服务定义相关的方法描述。

3. Service Def

gRPC 基于如下思想:定义一个服务, 指定其可以被远程调用的方法及其参数和返回类型。gRPC 默认使用 protocol buffers 3 作为接口定义语言.

service HelloService {

rpc SayHello (HelloRequest) returns (HelloResponse);

}

message HelloRequest {

required string greeting = 1;

}

message HelloResponse {

required string reply = 1;

}

gRPC 允许你定义四类服务方法:

1). 单项 RPC,即客户端发送一个请求给服务端,从服务端获取一个应答,就像一次普通的函数调用

rpc SayHello(HelloRequest) returns (HelloResponse){}

2). 服务端流式 RPC,即客户端发送一个请求给服务端,可获取一个数据流用来读取一系列消息。客户端从返回的数据流里一直读取直到没有更多消息为止。

rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse){}

3). 客户端流式 RPC,即客户端用提供的一个数据流写入并发送一系列消息给服务端。一旦客户端完成消息写入,就等待服务端读取这些消息并返回应答。

rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse) {}

4). 双向流式 RPC,即两边都可以分别通过一个读写数据流来发送一系列消息。这两个数据流操作是相互独立的,所以客户端和服务端能按其希望的任意顺序读写,例如:服务端可以在写应答前等待所有的客户端消息,或者它可以先读一个消息再写一个消息,或者是读写相结合的其他方式。每个数据流里消息的顺序会被保持。

rpc BidiHello(stream HelloRequest) returns (stream HelloResponse){}

gRPC是什么

4. DEADLINE

gRPC 允许客户端在调用一个远程方法前指定一个最后期限值。这个值指定了在客户端可以等待服务端多长时间来应答,超过这个时间值 RPC 将结束并返回DEADLINE_EXCEEDED错误。在服务端可以查询这个期限值来看是否一个特定的方法已经过期,或者还剩多长时间来完成这个方法。

5. Metadata

Headers

RPC原理- 协议与实现

gRPC中,把HTTP2的Steam Identifier当作调用标识,每一次请求都发起一个新的流。

每个请求的调用哪个服务和方法、回应的调用结果状态码都在HEADER Frame中指定。

请求内容和回应内容由Protocol Buffer序列化后使用DATA Frame传输

以下是 gRPC 请求和应答消息流中一般的消息顺序:

请求 → 请求报头 * 有定界符的消息 EOS

应答 → 应答报头 * 有定界符的消息 EOS

应答 → (应答报头 * 有定界符的消息 跟踪信息) / 仅仅跟踪时

界定的消息的重复序列通过数据帧来进行传输。

界定的消息 → 压缩标志 消息长度 消息

压缩标志 → 0 / 1 # 编码为 1 byte 的无符号整数

消息长度 → {消息长度} # 编码为 4 byte 的无符号整数

消息 → *{二进制字节}

1. 请求

HEADERS (flags = END_HEADERS)

:method = POST

:scheme = http

:path = /google.pubsub.v2.PublisherService/CreateTopic

:authority = pubsub.googleapis.com

grpc-timeout = 1S

content-type = application/grpc+proto

grpc-encoding = gzip

authorization = Bearer y235.wef315yfh138vh31hv93hv8h3v

DATA (flags = END_STREAM)

<Delimited Message>

2. 应答

HEADERS (flags = END_HEADERS)

:status = 200

grpc-encoding = gzip

DATA

<Delimited Message>

HEADERS (flags = END_STREAM, END_HEADERS)

grpc-status = 0 # OK

trace-proto-bin = jher831yy13JHy3hc

gRPC原理- Server端

gRPC而言,只是对Netty Server的简单封装,底层使用了PlaintextHandler、Http2ConnectionHandler的相关封装等。具体Framer、Stream方式请参考Http2相关文档。

followControlWindow:

流量控制的窗口大小,单位:字节,默认值为1M,HTTP2中的“Flow Control”特性;连接上,已经发送尚未ACK的数据帧大小,比如window大小为100K,且winow已满,每次向Client发送消息时,如果客户端反馈ACK(携带此次ACK数据的大小),window将会减掉此大小;每次向window中添加亟待发送的数据时,window增加;如果window中的数据已达到限定值,它将不能继续添加数据,只能等待Client端ACK。

maxConcurrentCallPerConnection:

每个connection允许的最大并发请求数,默认值为Integer.MAX_VALUE;如果此连接上已经接受但尚未响应的streams个数达到此值,新的请求将会被拒绝。为了避免TCP通道的过度拥堵,我们可以适度调整此值,以便Server端平稳处理,毕竟buffer太多的streams会对server的内存造成巨大压力。

maxMessageSize:每次调用允许发送的最大数据量,默认为100M。

maxHeaderListSize:每次调用允许发送的header的最大条数,gRPC中默认为8192。

gRPC Server端,有个重要的方法:addService。【如下文service代理模式】

在此之前,我们需要介绍一下bindService方法,每个gRPC生成的service代码中都有此方法,它以硬编码的方式遍历此service的方法列表,将每个方法的调用过程都与“被代理实例”绑定,这个模式有点类似于静态代理,比如调用sayHello方法时,其实内部直接调用“被代理实例”的sayHello方法(参见MethodHandler.invoke方法,每个方法都有一个唯一的index,通过硬编码方式执行);bindService方法的最终目的是创建一个ServerServiceDefinition对象,这个对象内部位置一个map,key为此Service的方法的全名(fullname,{package}.{service}.{method}),value就是此方法的gRPC封装类(ServerMethodDefinition)

addService方法可以添加多个Service,即一个Netty Server可以为多个service服务,这并不违背设计模式和架构模式。addService方法将会把service保存在内部的一个map中,key为serviceName(即{package}.{service}),value就是上述bindService生成的对象。

如下是服务定义的类结构:

gRPC是什么

那么究竟Server端是如何解析RPC过程的?Client在调用时会将调用的service名称 + method信息保存在一个GRPC“保留”的header中,那么Server端即可通过获取这个特定的header信息,就可以得知此stream需要请求的service、以及其method,那么接下来只需要从上述提到的map中找到service,然后找到此method,直接代理调用即可。执行结果在Encoder之后发送给Client。

如下是Server端启动过程:

gRPC是什么

gRPC原理- Client端

ManagedChannelBuilder来创建客户端channel,ManagedChannelBuilder使用了provider机制,具体是创建了哪种channel有provider决定,可以参看META-INF下同类名的文件中的注册信息。当前Channel有2种:NettyChannelBuilder与OkHttpChannelBuilder。当前版本中为NettyChannelBuilder;可以直接使用NettyChannelBuilder来构建channel。

ManagedChannel是客户端最核心的类,它表示逻辑上的一个channel;底层持有一个物理的transport(TCP通道,参见NettyClientTransport),并负责维护此transport的活性;即在RPC调用的任何时机,如果检测到底层transport处于关闭状态(terminated),将会尝试重建transport。(参见TransportSet.obtainActiveTransport())

通常情况下,我们不需要在RPC调用结束后就关闭Channel,Channel可以被一直重用,直到Client不再需要请求为止或者Channel无法真的异常中断而无法继续使用。

每个Service客户端,都生成了2种stub:BlockingStub和FutureStub;这两个Stub内部调用过程几乎一样,唯一不同的是BlockingStub的方法直接返回Response, 而FutureStub返回一个Future对象。BlockingStub内部也是基于Future机制,只是封装了阻塞等待的过程。

如下是Client端关键组件:

gRPC是什么

如下是Client端的启动流程:

gRPC是什么

关于Client负载均衡

gRPC是什么

gRPC分层设计

gRPC是什么

JSF兼容gRPC

目前JSF支持Java和C++两种客户端,其他小众语言无法支持,为了解决跨语言问题,JSF系统增加了基于HTTP/1的网关服务。这可能是目前业内RPC框架解决跨语言问题的普遍解决方案。

gRPC是什么

针对gRPC的技术预言,就是为了解决JSF跨语言问题,如何解决?目前JSF框架发布的JSF协议服务,天然支持JSF、HTTP、Dubbo、Telnet协议。这都得益于Netty的伟大。就Netty而言,客户端与服务端建立TCP连接后,初始化Channel时,可以根据报文头的特征码进行协议匹配,进而针对当前连接设置相应协议的解码器。就gRPC而言,其报文头就是HTTP/2的报文头-棱镜。

gRPC是什么

针对JSF服务提供端,解析gRPC协议报文,获取接口、方法、参数,然后进行方法调用,最后模拟gRPC协议返回给客户端。

针对JSF服务调用端,模拟gRPC协议,发送gRPC协议报文。

JSF兼容gRPC如下图:

gRPC是什么

至此,JSF跨语言问题解决了。NO!目前gRPC各种语言客户端可以访问Java版的JSF服务,Java版的JSF客户端也可以访问gRPC各种语言服务端。我们要解决的问题是Java版的JSF服务,可以让其他gRPC各种语言客户端访问,目前仅解决了一小步。gRPC各种语言客户端不具备JSF的服务订阅功能,只能借道gRPC自身的负载策略DNS。

默认gRPC通过本地的域名解析,拉取服务列表,进而负载均衡。为了支持这种策略,Java版的JSF服务注册服务时,需要将信息同步注册到DNS服务,其他gRPC各种语言客户端访问DNS服务实现服务发现。这里需要按照服务申请域名,这是个弊端。这也违背了gRPC移动端为主、跨数据中心访问的初衷。

采用DNS服务发现的设计如下图:

gRPC是什么

为了解决DNS服务发现带来的系统复杂度,正能对gRPC进行动刀,由于gRPC的扩展性良好,而且只需要将C、Go语言的客户端进行扩展即可。gRPC服务发现的机制是通过NameResolver来解决的,而且是基于Plugin方式,故NameResolver的实现目标指向JSF系统现有的注册中心即可,同时为了更彻底的改变gRPC服务注册、订阅,又将C、Go语言的客户端增加了服务注册、订阅功能。至此,Java版的JSF服务与gRPC版的服务之间相互调用打通了。

gRPC是什么

gRPC总结

跨语言,针对移动端:省电、省流量、高性能、双向流、支持DNS负载。关于性能,肯定比HTTP/1好,比TCP差,网上好多性能对比,都是和TCP相关的RPC对比,没有可比性。

本文转载自IPD-Chat,IPD-Chat为京东商城基础平台部门官方公众号,扫一扫二维码进行关注。

来源: http://mt.sohu.com/20170222/n481448007.shtml


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

维多利亚时代的互联网

维多利亚时代的互联网

[英] 汤姆·斯丹迪奇 / 多绥婷 / 后浪丨江西人民出版社 / 2017-8 / 38.00元

人类历史上的第一次大连接 回顾互联网的前世 预言互联网的未来 ……………… ※编辑推荐※ ☆《财富》杂志推荐的75本商务人士必读书之一 ☆ 回顾互联网的前世,颠覆你的思维,升级你对互联网的认知 ☆ 人类历史上一次全球大连接是维多利亚时期的电报时代,那时候也有疯狂的资本、 巨大的泡沫、网络新型犯罪、网络亚文化崛起……现在的互联网时代就是电报时代的重演;回顾那......一起来看看 《维多利亚时代的互联网》 这本书的介绍吧!

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具