内容简介:实现一个好用, 可靠的 thrift client 连接池最近, 在公司项目中需要用到 thrift rpc 调用, 以往的 thrift client 都是用 python 写的. 因此, 需要写一个新的 golang 版 thrift client, 根据 python 中的经验和以往的经验, 当然是采用连接池比较好, 这样可以减少 TCP 连接的频繁创建销毁带来的开销.首先, 我翻看了 thrift 的官网和网上的资料, 得出几点小结论:
目的
实现一个好用, 可靠的 thrift client 连接池
背景故事
最近, 在公司项目中需要用到 thrift rpc 调用, 以往的 thrift client 都是用 python 写的. 因此, 需要写一个新的 golang 版 thrift client, 根据 python 中的经验和以往的经验, 当然是采用连接池比较好, 这样可以减少 TCP 连接的频繁创建销毁带来的开销.
首先, 我翻看了 thrift 的官网和网上的资料, 得出几点小结论:
- 没有发现官方有支持 thrift client 的连接池
- 可以利用第三方连接池库包装 thrift client, 实现连接池
既然如此, 我选择了利用第三方库去实现连接池的方式, 很容易就搞定了. 做法和这篇文章差不多: 链接一 , 类似的还有这篇文章: 链接二 .
在简短运行了一段时间之后, 我敏感的发现了其中的问题. 程序日志中有几个 EOF, write broken pipe 的报错, 我意识到, 这并不是偶然, 很有可能是连接池的问题, 迅速通过 demo 验证, 确定是 thrift server 重启导致的.
回想一下这个场景, 当你通过 rpc 去调用的时候, server 需要更新代码重启, 这个时候 client 的连接都是失效的, 应该及时从连接池中清理掉, 然而第三方连接池似乎都没有这个机制, 也没有提供任何口子给用户. 在 [链接一] [链接二] 中, 两位同仁解决的都是超时失效的问题, 并没有处理重启导致的连接失效.
为了解决这个问题, 我思索了几种方案.
- 方案一 如果官方支持 ping, 可以在每次从连接池获取连接的时候判断一下, 无法 ping 通的连接直接丢弃, 重新获取或者创建新连接
- 方案二 在 server 提供一个 ping 的 rpc 接口, 专门用于判断连通性
- 方案三 继承 thrift 的 client 类, 重写 Call 方法, 通过 send 数据包是否报错来判断连通性, 报错的连接直接丢弃
查找了一圈, 发现 thrift 没有类似 ping 的方法检测连接的连通性, 于是否决方案一;
方案二需要专门提供一个 ping 的接口, 比较 low, 代价较大, 也否定了;
最终, 我选择了方案三, 在 rpc Call 的时候, 做连接池的相关动作, 以及连通性的检测.
这样子改造之后, 代码很简单, 甚至比没有连接池更加方便. 只需要两步:
- 初始化一次全局的连接池
- 调用的时候通过全局连接池操作
以往没有采用连接池的时候, 每次都要创建连接, 关闭连接, 现在就没必要了
附件文件 thrift.go 是基于 github.com/fatih/pool 这个第三方库, 重写了 Call 的相关代码. 最终实现了个人非常满意的 golang thrift client pool, 分享给大家.
后记
回顾以往接触的各种连接池, 都要考虑连接失效的问题. 通过什么方法判断, 如果失效是否重连, 是否重试. 如果想要更好的使用连接池, 通过举一反三就是最好的方式, 把遇到的连接池对比起来看看, 也许还有新的收获.
附件
附件: thrift.go
package util import ( "context" "git.apache.org/thrift.git/lib/go/thrift" "github.com/fatih/pool" "net" "time" ) type ThriftPoolClient struct { *thrift.TStandardClient seqId int32 timeout time.Duration iprotFactory, oprotFactory thrift.TProtocolFactory pool pool.Pool } func NewThriftPoolClient(host, port string, inputProtocol, outputProtocol thrift.TProtocolFactory, initialCap, maxCap int) (*ThriftPoolClient, error) { factory := func() (net.Conn, error) { conn, err := net.Dial("tcp", net.JoinHostPort(host, port)) if err != nil { return nil, err } return conn, err } p, err := pool.NewChannelPool(initialCap, maxCap, factory) if err != nil { return nil, err } return &ThriftPoolClient{ iprotFactory: inputProtocol, oprotFactory: outputProtocol, pool: p, }, nil } // Sets the socket timeout func (p *ThriftPoolClient) SetTimeout(timeout time.Duration) error { p.timeout = timeout return nil } func (p *ThriftPoolClient) Call(ctx context.Context, method string, args, result thrift.TStruct) error { p.seqId++ seqId := p.seqId // get conn from pool conn, err := p.pool.Get() if err != nil { return err } defer conn.Close() // wrap conn as thrift fd transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory()) trans := thrift.NewTSocketFromConnTimeout(conn, p.timeout) transport, err := transportFactory.GetTransport(trans) if err != nil { return err } inputProtocol := p.iprotFactory.GetProtocol(transport) outputProtocol := p.oprotFactory.GetProtocol(transport) if err := p.Send(outputProtocol, seqId, method, args); err != nil { // if err occurred, real close the connection. if pc, ok := conn.(*pool.PoolConn); ok { pc.MarkUnusable() pc.Close() } return err } // method is oneway if result == nil { return nil } if err = p.Recv(inputProtocol, seqId, method, result); err != nil { // if err occurred, real close the connection. if pc, ok := conn.(*pool.PoolConn); ok { pc.MarkUnusable() pc.Close() } return err } return nil }
附件 client.go
package util import ( "git.apache.org/thrift.git/lib/go/thrift" "services/articles" "services/comments" "services/users" "log" "time" ) func GetArticleClient(host, port string, initialCap, maxCap int, timeout time.Duration) *articles.ArticleServiceClient { protocolFactory := thrift.NewTBinaryProtocolFactoryDefault() client, err := NewThriftPoolClient(host, port, protocolFactory, protocolFactory, initialCap, maxCap) if err != nil { log.Panicln("GetArticleClient error: ", err) } client.SetTimeout(timeout) return articles.NewArticleServiceClient(client) } func GetCommentClient(host, port string, initialCap, maxCap int, timeout time.Duration) *comments.CommentServiceClient { protocolFactory := thrift.NewTCompactProtocolFactory() client, err := NewThriftPoolClient(host, port, protocolFactory, protocolFactory, initialCap, maxCap) if err != nil { log.Panicln("GetCommentClient error: ", err) } client.SetTimeout(timeout) return comments.NewCommentServiceClient(client) } func GetUserClient(host, port string, initialCap, maxCap int, timeout time.Duration) *users.UserServiceClient { protocolFactory := thrift.NewTCompactProtocolFactory() client, err := NewThriftPoolClient(host, port, protocolFactory, protocolFactory, initialCap, maxCap) if err != nil { log.Panicln("GetUserClient error: ", err) } client.SetTimeout(timeout) return users.NewUserServiceClient(client) }
以上所述就是小编给大家介绍的《golang thrift client pool 解决 server 重启连接失效》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
工程问题C++语言求解
Delores M.Etter、Jeanine A.Ingber / 冯力、周凯 / 机械工业出版社 / 2014-8 / 79元
本书介绍了如何利用ANSIC++编程语言以基于对象的编程方式来解决工程问题。书中引用了大量来自于不同工程、科学和计算机科学领域的示例,是一本理论和实践结合紧密的教材。针对C++基本语法的各个部分,由浅入深地进行讲解。每讲解一部分基础知识,同时会结合多个相关实例,实例内容详实,紧贴所讲内容,使读者能够立刻对所学知识进行练习,实战性强。一起来看看 《工程问题C++语言求解》 这本书的介绍吧!