内容简介:这是系列最后一篇文章了,最后我们来为我们的rpc框架实现一个http gateway。这个功能实际上受到了rpcx的启发,基于这种方式实现一个简单的类似service mesh中的sidecar。代码地址:http gateway可以接收来自客户端的http请求并将其转换为rpc请求然后交给服务端处理,再将服务端处理过后的结果通过http响应返回给客户端。http gateway的大致原理就是将我们的RPC协议中header部分放到http header中,然后RPC协议中的body部分放到http bod
这是系列最后一篇文章了,最后我们来为我们的rpc框架实现一个http gateway。这个功能实际上受到了rpcx的启发,基于这种方式实现一个简单的类似service mesh中的sidecar。代码地址: github
原理
http gateway可以接收来自客户端的http请求并将其转换为rpc请求然后交给服务端处理,再将服务端处理过后的结果通过http响应返回给客户端。
http gateway的大致原理就是将我们的RPC协议中header部分放到http header中,然后RPC协议中的body部分放到http body即可。
实现
首先我们需要定义http header中各个字段的名称:
const ( HEADER_SEQ = "rpc-header-seq" //序号, 用来唯一标识请求或响应 HEADER_MESSAGE_TYPE = "rpc-header-message_type" //消息类型,用来标识一个消息是请求还是响应 HEADER_COMPRESS_TYPE = "rpc-header-compress_type" //压缩类型,用来标识一个消息的压缩方式 HEADER_SERIALIZE_TYPE = "rpc-header-serialize_type" //序列化类型,用来标识消息体采用的编码方式 HEADER_STATUS_CODE = "rpc-header-status_code" //状态类型,用来标识一个请求是正常还是异常 HEADER_SERVICE_NAME = "rpc-header-service_name" //服务名 HEADER_METHOD_NAME = "rpc-header-method_name" //方法名 HEADER_ERROR = "rpc-header-error" //方法调用发生的异常 HEADER_META_DATA = "rpc-header-meta_data" //其他元数据 ) 复制代码
然后我们需要启动一个http server,用来接收http请求。这里我们使用 go 自带的api,默认使用5080端口,如果发现端口已经被占用了,就递增端口。
func (s *SGServer) startGateway() { port := 5080 ln, err := net.Listen("tcp", ":" + strconv.Itoa(port)) for err != nil && strings.Contains(err.Error(), "address already in use") { port++ ln, err = net.Listen("tcp", ":" + strconv.Itoa(port)) } if err != nil { log.Printf("error listening gateway: %s", err.Error()) } log.Printf("gateway listenning on " + strconv.Itoa(port)) //避免阻塞,使用新的goroutine来执行http server go func() { err := http.Serve(ln, s) if err != nil { log.Printf("error serving http %s", err.Error()) } }() } 复制代码
接下来我们需要实现ServeHTTP函数:
func (s *SGServer) ServeHTTP(rw http.ResponseWriter, r *http.Request) { //如果url不对则直接返回 if r.URL.Path != "/invoke" { rw.WriteHeader(404) return } //如果method不对则直接返回 if r.Method != "POST" { rw.WriteHeader(405) return } //构造新的请求 request := protocol.NewMessage(s.Option.ProtocolType) //根据http header填充request的header request, err := parseHeader(request, r) if err != nil { rw.WriteHeader(400) } //根据http body填充request的data request, err = parseBody(request, r) if err != nil { rw.WriteHeader(400) } //构造context ctx := metadata.WithMeta(context.Background(), request.MetaData) response := request.Clone() response.MessageType = protocol.MessageTypeResponse //处理请求 response = s.process(ctx, request, response) //返回相应 s.writeHttpResponse(response, rw, r) } func parseBody(message *protocol.Message, request *http.Request) (*protocol.Message, error) { data, err := ioutil.ReadAll(request.Body) if err != nil { return nil, err } message.Data = data return message, nil } func parseHeader(message *protocol.Message, request *http.Request) (*protocol.Message, error) { headerSeq := request.Header.Get(HEADER_SEQ) seq, err := strconv.ParseUint(headerSeq, 10, 64) if err != nil { return nil, err } message.Seq = seq headerMsgType := request.Header.Get(HEADER_MESSAGE_TYPE) msgType, err := protocol.ParseMessageType(headerMsgType) if err != nil { return nil, err } message.MessageType = msgType headerCompressType := request.Header.Get(HEADER_COMPRESS_TYPE) compressType, err := protocol.ParseCompressType(headerCompressType) if err != nil { return nil, err } message.CompressType = compressType headerSerializeType := request.Header.Get(HEADER_SERIALIZE_TYPE) serializeType, err := codec.ParseSerializeType(headerSerializeType) if err != nil { return nil, err } message.SerializeType = serializeType headerStatusCode := request.Header.Get(HEADER_STATUS_CODE) statusCode, err := protocol.ParseStatusCode(headerStatusCode) if err != nil { return nil, err } message.StatusCode = statusCode serviceName := request.Header.Get(HEADER_SERVICE_NAME) message.ServiceName = serviceName methodName := request.Header.Get(HEADER_METHOD_NAME) message.MethodName = methodName errorMsg := request.Header.Get(HEADER_ERROR) message.Error = errorMsg headerMeta := request.Header.Get(HEADER_META_DATA) meta := make(map[string]interface{}) err = json.Unmarshal([]byte(headerMeta), &meta) if err != nil { return nil, err } message.MetaData = meta return message, nil } func (s *SGServer) writeHttpResponse(message *protocol.Message, rw http.ResponseWriter, r *http.Request) { header := rw.Header() header.Set(HEADER_SEQ, string(message.Seq)) header.Set(HEADER_MESSAGE_TYPE, message.MessageType.String()) header.Set(HEADER_COMPRESS_TYPE, message.CompressType.String()) header.Set(HEADER_SERIALIZE_TYPE, message.SerializeType.String()) header.Set(HEADER_STATUS_CODE, message.StatusCode.String()) header.Set(HEADER_SERVICE_NAME, message.ServiceName) header.Set(HEADER_METHOD_NAME, message.MethodName) header.Set(HEADER_ERROR, message.Error) metaDataJson, _ := json.Marshal(message.MetaData) header.Set(HEADER_META_DATA, string(metaDataJson)) _, _ = rw.Write(message.Data) } 复制代码
最后我们只需要在wrapper中启动http server即可。
func (w *DefaultServerWrapper) WrapServe(s *SGServer, serveFunc ServeFunc) ServeFunc { return func(network string, addr string, meta map[string]interface{}) error { //省略前面的部分 ... //启动gateway s.startGateway() return serveFunc(network, addr, meta) } } 复制代码
客户端测试代码:
func MakeHttpCall() { //声明参数并序列化,放到http请求的body中 arg := service.Args{A: rand.Intn(200), B: rand.Intn(100)} data, _ := msgpack.Marshal(arg) body := bytes.NewBuffer(data) req, err := http.NewRequest("POST", "http://localhost:5080/invoke", body) if err != nil { log.Println(err) return } req.Header.Set(server.HEADER_SEQ, "1") req.Header.Set(server.HEADER_MESSAGE_TYPE, protocol.MessageTypeRequest.String()) req.Header.Set(server.HEADER_COMPRESS_TYPE,protocol.CompressTypeNone.String()) req.Header.Set(server.HEADER_SERIALIZE_TYPE,codec.MessagePack.String()) req.Header.Set(server.HEADER_STATUS_CODE,protocol.StatusOK.String()) req.Header.Set(server.HEADER_SERVICE_NAME,"Arith") req.Header.Set(server.HEADER_METHOD_NAME,"Add") req.Header.Set(server.HEADER_ERROR,"") meta := map[string]interface{}{"key":"value"} metaJson, _ := json.Marshal(meta) req.Header.Set(server.HEADER_META_DATA,string(metaJson)) response, err := http.DefaultClient.Do(req) if err != nil { log.Println(err) return } if response.StatusCode != 200 { log.Println(response) } else if response.Header.Get(server.HEADER_ERROR) != "" { log.Println(response.Header.Get(server.HEADER_ERROR)) } else { data, err = ioutil.ReadAll(response.Body) result := service.Reply{} msgpack.Unmarshal(data, &result) fmt.Println(result.C) } } 复制代码
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 自己实现集合框架(十):顺序栈的实现
- Golang实现简单爬虫框架(4)——队列实现并发任务调度
- 简易RPC框架实现
- 优秀开源框架的扩展机制实现
- Go 实现简易 RPC 框架
- 如何实现一个Python爬虫框架
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。