从goim定制, 浅谈 golang 的 interface 解耦合

栏目: Go · 发布时间: 6年前

内容简介:继上一篇文章goim 架构与定制 , 再谈github 上的 issue 在这里简要说明一下 golang 的 interface: 在吴德宝AllenWu 文章

继上一篇文章goim 架构与定制 , 再谈 goim 的定制扩展, 这一次谈一弹 goim 从 kafka 转到 nats

github 上的 issue 在这里 github.com/Terry-Mao/g…

简要说明一下 golang 的 interface: 在吴德宝AllenWu 文章 Golang interface接口深入理解 中这样写到:

为什么要用接口呢?在Gopher China 上的分享中,有大神给出了下面的理由:

writing generic algorithm (类似泛型编程)
hiding implementation detail (隐藏具体实现)
providing interception points (提供拦截点-----> 也可称叫提供 HOOKS , 一个插入其他业务逻辑的钩子)

换个方式说, interface 就是 de-couple 解耦合在 golang 中的实施, 这是现代编程中比较重要的"分层, 解耦合" 架构设计方法

在QQ群"golang中国" 中, 有关于 de-couple 解耦合的话题中, 闪侠这样说到:

从goim定制, 浅谈 golang 的 interface 解耦合
从goim定制, 浅谈 golang 的 interface 解耦合

这里, 就来看看 interface 如何实现 goim 从kafka 转到NATS

1. goim 中的 kafka

看图, 不说话, 哈哈

从goim定制, 浅谈 golang 的 interface 解耦合

上图中,

  1. 在 logic 这个网元中, 有 logic 向 kafka 的消息发布
  2. 在 job 网元中, job 从 kafka 订阅消息, 再赂 comet 网元分发

那我们的目标很简单了, 换了!!! ----------> 等等 .......能保留原有 kafka 实现不? 在必要时, 可以使用开关项, 切换 nats 或 kafka ??

当然......可以!

_

_

2. Don't talk, show me the code!!

下面就比较简单, 看码

2.1 发布接口第一步, 阅读原代码

先看源代码( 注意下面代码中的注释 )

代码在 github.com/Terry-Mao/g… 大约第33行

// PushMids push a message by mid.
func (l *Logic) PushMids(c context.Context, op int32, mids []int64, msg []byte) (err error) {
	keyServers, _, err := l.dao.KeysByMids(c, mids)
	if err != nil {
		return
	}
	keys := make(map[string][]string)
	for key, server := range keyServers {
		if key == "" || server == "" {
			log.Warningf("push key:%s server:%s is empty", key, server)
			continue
		}
		keys[server] = append(keys[server], key)
	}
	for server, keys := range keys {
	    // 
	    //  主要向 kafka 发送消息, 是下面这一行
	    //  l.dao.PushMsg(c, op, server, keys, msg)
	    //  方法名是 PushMsg
	    //
		if err = l.dao.PushMsg(c, op, server, keys, msg); err != nil {
			return
		}
	}
	return
}
复制代码

再看一下 dao 是什么:

代码在 github.com/Terry-Mao/g… 大约第20行

// Logic struct
type Logic struct {
	c   *conf.Config
	dis *naming.Discovery
	//
	//
	// 下面这个 dao.Dao 提供了 PushMsg 方法
	// 带个星, 这是个引用
	//
	//
	dao *dao.Dao
	// online
	totalIPs   int64
	totalConns int64
	roomCount  map[string]int32
	// load balancer
	nodes        []*naming.Instance
	loadBalancer *LoadBalancer
	regions      map[string]string // province -> region
}
复制代码

最后, 重点来了 , 查到 dao 源头实现

下面是我们需要扩展的地方, 在 github.com/Terry-Mao/g… 中 dao, 这名称很 java (DAO-------> Data Access Objects 数据存取对象), 这里也说明了 bilibili 们在代码纺织上, 挺规范

代码在 github.com/Terry-Mao/g… 大约第10行开始

// Dao dao.
type Dao struct {
	c           *conf.Config
	//
	// ******************************************************************
	// 下面这个 kafkaPub 很清楚, 是 kafka 的同步发布者 kafka.SyncProducer
	// 
	//  这个是我们要换成 interface 的地方
	//
	// ******************************************************************
	//
	kafkaPub    kafka.SyncProducer
	redis       *redis.Pool
	redisExpire int32
}

// New new a dao and return.
func New(c *conf.Config) *Dao {
	d := &Dao{
		c:           c,
		//
    	// ******************************************************************
	    // 下面这个 newKafkaPub(c.Kafka) 即是初始化 kafka
    	//  也就是连接上 kafka
    	//  下面, 我们先改写一下这个函数, 变通一下代码形式
    	//
    	// ******************************************************************
    	//
		kafkaPub:    newKafkaPub(c.Kafka),
		redis:       newRedis(c.Redis),
		redisExpire: int32(time.Duration(c.Redis.Expire) / time.Second),
	}
	return d
}

//  这是连接 kafka 的函数( function ) 
//  
func newKafkaPub(c *conf.Kafka) kafka.SyncProducer {
	kc := kafka.NewConfig()
	kc.Producer.RequiredAcks = kafka.WaitForAll // Wait for all in-sync replicas to ack the message
	kc.Producer.Retry.Max = 10                  // Retry up to 10 times to produce the message
	kc.Producer.Return.Successes = true
	pub, err := kafka.NewSyncProducer(c.Brokers, kc)
	if err != nil {
		panic(err)
	}
	return pub
}
复制代码

这里, 先小改一下 func New(c *conf.Config) *Dao 这个函数 改成如下代码形式

// New new a dao and return.
func New(c *conf.Config) *Dao {
	d := &Dao{
		c:           c,
		//
        // 注意, 下面这行被移出去
        //
		redis:       newRedis(c.Redis),
		redisExpire: int32(time.Duration(c.Redis.Expire) / time.Second),
	}
	//
    // 变成这样了, 功能没变化
    //
	d.kafkaPub = newKafkaPub(c.Kafka)
		
	return d
}
复制代码

2.2 发布接口第二步, 检查一下哪个方法( method )需要被 interface 实现

还是看源代码

代码在 github.com/Terry-Mao/g… 大约第13行开始

// PushMsg push a message to databus.
func (d *Dao) PushMsg(c context.Context, op int32, server string, keys []string, msg []byte) (err error) {
	pushMsg := &pb.PushMsg{
		Type:      pb.PushMsg_PUSH,
		Operation: op,
		Server:    server,
		Keys:      keys,
		Msg:       msg,
	}
	b, err := proto.Marshal(pushMsg)
	if err != nil {
		return
	}
	
	//
	// ********************************
	//
	// 实际发布消息, 就是下面这个几行语句
	// 1. 组织一下需要发送的信息, 以 kafka 的发布接口要求的形式
	// 2. 尝试发布信息, 处理发布信息可能的错误
	//
	// 重点注意下面这几行, 后面会改掉
	// 重点注意下面这几行, 后面会改掉
	// 重点注意下面这几行, 后面会改掉
	//
	// ********************************
	//
	m := &sarama.ProducerMessage{
		Key:   sarama.StringEncoder(keys[0]),
		Topic: d.c.Kafka.Topic,
		Value: sarama.ByteEncoder(b),
	}

	if _, _, err = d.kafkaPub.SendMessage(m); err != nil {
		log.Errorf("PushMsg.send(push pushMsg:%v) error(%v)", pushMsg, err)
	}
	return
}

// BroadcastRoomMsg push a message to databus.
func (d *Dao) BroadcastRoomMsg(c context.Context, op int32, room string, msg []byte) (err error) {
	pushMsg := &pb.PushMsg{
		Type:      pb.PushMsg_ROOM,
		Operation: op,
		Room:      room,
		Msg:       msg,
	}
	b, err := proto.Marshal(pushMsg)
	if err != nil {
		return
	}
	m := &sarama.ProducerMessage{
		Key:   sarama.StringEncoder(room),
		Topic: d.c.Kafka.Topic,
		Value: sarama.ByteEncoder(b),
	}
	//
	// ********************************
	// 实际发布消息, 就是下面这个语句
	// ********************************
	//
	if _, _, err = d.kafkaPub.SendMessage(m); err != nil {
		log.Errorf("PushMsg.send(broadcast_room pushMsg:%v) error(%v)", pushMsg, err)
	}
	return
}
复制代码

2.3 换用 interface 实现这个 SendMessage(m) 方法( method )

先上代码, 代码会说话( golang 简单就在这里, 代码会说话 ) , 后加说明

// PushMsg  interface for kafka / nats 
// ******************** 这里是新加的 interface 定义 *****************
type PushMsg interface {
	PublishMessage(topic, ackInbox string, key string, msg []byte) error  // ****** 这里小改了个方法名!!! 注意
	Close() error
}

// Dao dao.
type Dao struct {
	c           *conf.Config
	push        PushMsg   // ******************** 看这里 *****************
	redis       *redis.Pool
	redisExpire int32
}

// New new a dao and return.
func New(c *conf.Config) *Dao {

	d := &Dao{
		c:           c,
		redis:       newRedis(c.Redis),
		redisExpire: int32(time.Duration(c.Redis.Expire) / time.Second),
	}

	if c.UseNats {   // ******************** 在配置中加一个 bool 布尔值的开关项 *****************
		d.push = NewNats(c) // ******************** 这里支持 nats  *****************
	} else {
		d.push = NewKafka(c) //// ******************** 这里是原来的 kafka *****************
	}
	return d
}
复制代码

kafka 实现 interface 接口的代码

// Dao dao.
type kafkaDao struct {
	c    *conf.Config
	push kafka.SyncProducer
}

// New new a dao and return.
func NewKafka(c *conf.Config) *kafkaDao {
	d := &kafkaDao{
		c:    c,
		push: newKafkaPub(c.Kafka),
	}
	return d
}

// PublishMessage  push message to kafka
func (d *kafkaDao) PublishMessage(topic, ackInbox string, key string, value []byte) error {

	m := &kafka.ProducerMessage{
		Key:   sarama.StringEncoder(key),
		Topic: d.c.Kafka.Topic,
		Value: sarama.ByteEncoder(value),
	}
	_, _, err := d.push.SendMessage(m)

	return err
}

复制代码

nats 对 interface 的实现

// natsDao dao for nats
type natsDao struct {
	c    *conf.Config
	push *nats.Conn
}

// New new a dao and return.
func NewNats(c *conf.Config) *natsDao {

	conn, err := newNatsClient(c.Nats.Brokers, c.Nats.Topic, c.Nats.TopicID)
	if err != nil {
		return nil
	}
	d := &natsDao{
		c:    c,
		push: conn,
	}
	return d
}

// PublishMessage  push message to nats
func (d *natsDao) PublishMessage(topic, ackInbox string, key string, value []byte) error {
	if d.push == nil {
		return errors.New("nats error")
	}
	msg := &nats.Msg{Subject: topic, Reply: ackInbox, Data: value}
	return d.push.PublishMsg(msg)

}
复制代码

最后, 调用 interface 的变更

// PushMsg push a message to databus.
func (d *Dao) PushMsg(c context.Context, op int32, server string, keys []string, msg []byte) (err error) {
	pushMsg := &pb.PushMsg{
		Type:      pb.PushMsg_PUSH,
		Operation: op,
		Server:    server,
		Keys:      keys,
		Msg:       msg,
	}
	b, err := proto.Marshal(pushMsg)
	if err != nil {
		return
	}
	//
	// ********************************
	//
	// 实际发布消息, 就是下面这个几行语句
	// 1. 组织一下需要发送的信息, 以 kafka 的发布接口要求的形式
	// 2. 尝试发布信息, 处理发布信息可能的错误
	//
	// 重点注意下面这几行, 实际更改
	// 重点注意下面这几行, 实际更改
	// 重点注意下面这几行, 实际更改
	//
	// ********************************
	if err = d.push.PublishMessage(d.c.Kafka.Topic, d.c.Nats.AckInbox, keys[0], b); err != nil {
		log.Errorf("PushMsg.send(push pushMsg:%v) error(%v)", pushMsg, err)
	}
	return
}
复制代码

OK, 修改完成

2.4 小结

// PushMsg  interface for kafka / nats 
// ******************** 这里是新加的 interface 定义 *****************
type PushMsg interface {
	PublishMessage(topic, ackInbox string, key string, msg []byte) error  // ****** 这里小改了个方法名!!! 注意
	Close() error
}

// Dao dao.
type Dao struct {
	c           *conf.Config
	push        PushMsg   // ******************** 看这里 *****************
	redis       *redis.Pool
	redisExpire int32
}
复制代码

上面 定义了 PushMsg 这个interface , 这是一个 方法( method)集合

方法定义与实现

  1. 方法名 , 比如 PublishMessage
  2. input 数据, 就是这些 topic, ackInbox string, key string, msg []byte, 分别是
  1. topic 这是 kafka 或 nats 里的主题, 也就是 pub/sub 发布/订阅的频道
  2. ackInbox 这是 publish 发布的 confirm 确认频道
  3. key 消息体( payload ) 的键
  4. msg 这是消息体 payload
  1. ouput 数据, 这里是 error , 标示 PublishMessage 方法( method ) 的输出

这就是一个接口定义, 方法名/ 输入/ 输出, 至于方法的具体实现, 交由下面的实体去实现( 可以看 kafka / nats 中分别对应的 PublishMessage 的方法实现)

方法引用

很清楚, 方法是由具体实现来完成, 下面就是实例化方法

是用哪一个具体实现呢, 就看实例化哪一个了, interface 最终落地, 就在这里

if c.UseNats {   // ******************** 在配置中加一个 bool 布尔值的开关项 *****************
		d.push = NewNats(c) // ******************** 这里支持 nats  *****************
	} else {
		d.push = NewKafka(c) //// ******************** 这里是原来的 kafka *****************
	}
复制代码

而在 func (d *Dao) PushMsg(c context.Context, op int32, server string, keys []string, msg []byte) (err error) 中, 则简单调用 interface 定义的方法

方法调用

与其他方法 method 或函数 function 是一样的, 没什么特别的

// ********************************
	if err = d.push.PublishMessage(d.c.Kafka.Topic, d.c.Nats.AckInbox, keys[0], b); err != nil {
		log.Errorf("PushMsg.send(push pushMsg:%v) error(%v)", pushMsg, err)
	}
复制代码

3. 浅谈 golang 的 interface

再一次回看,

在吴德宝AllenWu 文章 Golang interface接口深入理解 中这样写到:

为什么要用接口呢?在Gopher China 上的分享中,有大神给出了下面的理由:

writing generic algorithm (类似泛型编程)
hiding implementation detail (隐藏具体实现)
providing interception points (提供拦截点-----> 也可称叫提供 HOOKS , 一个插入其他业务逻辑的钩子)

interface 确是 隐藏了具体实现 , 能让我们很容易的把 goim 对 kafka 的依赖, 切换到 nats , 并且通过一个开关项, 来确定使用哪一个具体实现

扩展一下, 这个 interface 也可以实现从 kafka 切换到 rabbitMQ / activeMQ / redis (pub/sub) .... 只要简单实现 PushMsg 这个 interface 就好啦

4. 看看代码

另有 goim 在 job 网元上的 subscribe 订阅接口, 支持 interface 代码是一路子方法, 直接看源码吧, 有交流讨论再另写.

注: job 代码中, 我把某个方法( method ) 拆解成了函数( function ), 有兴趣的朋友可以查一下, 有些小区别,但效果一样.

goim 源代码在 github.com/Terry-Mao/g… 示例代码在 github.com/tsingson/go…

感谢www.bilibili.com & 毛剑 及众多开源社区的朋友们

欢迎交流与批评.....

关于我

网名 tsingson (三明智, 江湖人称3爷)

原 ustarcom IPTV/OTT 事业部播控产品线技术架构湿/解决方案工程湿角色(8年), 自由职业者,

喜欢音乐(口琴,是第三/四/五届广东国际口琴嘉年华的主策划人之一), 摄影与越野,

喜欢 golang 语言 (商用项目中主要用 postgres + golang )

tsingson 写于中国深圳, 2019/04/22


以上所述就是小编给大家介绍的《从goim定制, 浅谈 golang 的 interface 解耦合》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

阿里巴巴

阿里巴巴

邓肯·克拉克 (Duncan Clark) / 中信出版社 / 2016-8-22 / CNY 58.00

阿里巴巴的故事在中国已是家喻户晓,马云的个人魅力和非凡的商业头脑也早已声名远扬。而一千个人眼中会有一千个不一样的马云, 一个外国投资人、咨询顾问眼中的马云和阿里巴巴会是什么样的?1994年就来到中国,阿里巴巴创业早期的咨询顾问克拉克先生将阿里巴巴帝国崛起过程中他的见闻、感触和思考结合深入的访谈、研究写成了这本书。 书中既可以读到阿里巴巴艰辛的创业历程、惊心动魄的商业对垒,也不乏有趣好玩儿的背......一起来看看 《阿里巴巴》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码