redigo中PubSub的一点小坑

栏目: 数据库 · Redis · 发布时间: 6年前

内容简介:最近在用 golang 做一些 redis 相关的操作,选用了 redigo 这个第三方库。然后在使用 Pub/Sub 的时候,却发现了一个小坑……首先,我们来初始化一个带连接池的 Redis Client:然后我们可以简单的实现一个 publish 方法:

最近在用 golang 做一些 redis 相关的操作,选用了 redigo 这个第三方库。然后在使用 Pub/Sub 的时候,却发现了一个小坑……

Redis Client

首先,我们来初始化一个带连接池的 Redis Client:

import (
	"github.com/gomodule/redigo/redis"
)
 
type RedisClient struct {
	pool *redis.Pool
}
 
func NewRedisClient(addr string, db int, passwd string) *RedisClient {
	pool := &redis.Pool{
		MaxIdle:     10,
		IdleTimeout: 300 * time.Second,
		Dial: func() (redis.Conn, error) {
			c, err := redis.Dial("tcp", addr, redis.DialPassword(passwd), redis.DialDatabase(db))
			if err != nil {
				return nil, err
			}
			return c, nil
		},
		TestOnBorrow: func(c redis.Conn, t time.Time) error {
			if time.Since(t) < time.Minute {
				return nil
			}
			_, err := c.Do("PING")
			return err
		},
	}
	log.Printf("new redis pool at %s", addr)
	client := &RedisClient{
		pool: pool,
	}
	return client
}

Publish

然后我们可以简单的实现一个 publish 方法:

func (r *RedisClient) Publish(channel, message string) (int, error) {
	c := r.pool.Get()
	defer c.Close()
	n, err := redis.Int(c.Do("PUBLISH", channel, message))
	if err != nil {
		return 0, fmt.Errorf("redis publish %s %s, err: %v", channel, message, err)
	}
	return n, nil
}

Subscribe

接下来就是一个稍微复杂点的带有心跳的 subscribe 方法:

func (r *RedisClient) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {
	psc := redis.PubSubConn{Conn: r.pool.Get()}
	defer psc.Close()
	log.Printf("redis pubsub subscribe channel: %v", channel)
	if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {
		return err
	}
	done := make(chan error, 1)
	// start a new goroutine to receive message
	go func() {
		for {
			switch msg := psc.Receive().(type) {
			case error:
				done <- fmt.Errorf("redis pubsub receive err: %v", msg)
				return
			case redis.Message:
				if err := consume(msg); err != nil {
					done <- err
					return
				}
			case redis.Subscription:
				if msg.Count == 0 {
					// all channels are unsubscribed
					done <- nil
					return
				}
			}
		}
	}()
 
	// health check
	tick := time.NewTicker(time.Minute)
	defer tick.Stop()
	for {
		select {
		case <-ctx.Done():
			if err := psc.Unsubscribe(); err != nil {
				return fmt.Errorf("redis pubsub unsubscribe err: %v", err)
			}
			return nil
		case err := <-done:
			return err
		case <-tick.C:
			if err := psc.Ping(""); err != nil {
				return err
			}
		}
	}
 
	return nil
}

最后,我们写一个简单地 main 函数来调用 publish & subscribe:

func main(){
	r := NewRedisClient("172.16.8.128:6379", 0, "")
	consume := func(msg redis.Message) error {
		log.Printf("recv msg: %s", msg.Data)
		return nil
	}
	for i := 0; i <10; i++{
		log.Printf("-------------- %d -----------------", i)
		ctx, cancel := context.WithCancel(context.Background())
		go func(){
			if err := r.Subscribe(ctx, consume, "channel"); err != nil {
				log.Println("subscribe err: %v", err)
			}
		}()
		time.Sleep(time.Second)
		_, err:= r.Publish("channel", "hello, " + strconv.Itoa(i))
		if err != nil {
			log.Fatal(err)
		}
		time.Sleep(time.Second)
		cancel()
	}
	forever := make(chan struct{})
	<-forever
}

咋一看之下,好像并没有什么异常?然而,如果我们这时候去看 redis 的 tcp 连接,就可以发现一些猫腻:

$sudo netstat -antp | grep redis
tcp        0      0 0.0.0.0:6379            0.0.0.0:*               LISTEN      940/redis-server 0. 
tcp        0      0 172.16.8.128:6379       172.16.8.1:55010        ESTABLISHED 940/redis-server 0. 
tcp        0      0 172.16.8.128:6379       172.16.8.1:55015        ESTABLISHED 940/redis-server 0. 
tcp        0      0 172.16.8.128:6379       172.16.8.1:55009        ESTABLISHED 940/redis-server 0. 
tcp        0      0 172.16.8.128:6379       172.16.8.1:55005        ESTABLISHED 940/redis-server 0. 
tcp        0      0 172.16.8.128:6379       172.16.8.1:55012        ESTABLISHED 940/redis-server 0. 
tcp        0      0 172.16.8.128:6379       172.16.8.1:55011        ESTABLISHED 940/redis-server 0. 
tcp        0      0 172.16.8.128:6379       172.16.8.1:55013        ESTABLISHED 940/redis-server 0. 
tcp        0      0 172.16.8.128:6379       172.16.8.1:55007        ESTABLISHED 940/redis-server 0. 
tcp        0      0 172.16.8.128:6379       172.16.8.1:55006        ESTABLISHED 940/redis-server 0. 
tcp        0      0 172.16.8.128:6379       172.16.8.1:55014        ESTABLISHED 940/redis-server 0. 
tcp        0      0 172.16.8.128:6379       172.16.8.1:54972        ESTABLISHED 940/redis-server 0. 

竟然是每一次 subscribe 就新建了一个连接,而 connection pool 似乎没有什么作用。

更进一步地调试,我们发现在 defer psc.Close() 的时候就卡住了,也就是上面的 10 个 goroutine 其实并没有正常退出。

Concurrent

排查许久之后,终于定位到了问题!引用 redigo 的 说明

Connections support one concurrent caller to the Receive method and one concurrent caller to the Send and Flush methods. No other concurrency is supported including concurrent calls to the Do method.

For full concurrent access to Redis, use the thread-safe Pool to get, use and release a connection from within a goroutine. Connections returned from a Pool have the concurrency restrictions described in the previous paragraph.

也就是说,虽然一个连接可以在不同的 goroutine 并发调用 Receive() 和 Subscribe() (subscribe调用了send和flush) ,但是却不能再有其他并发操作 (比如 Close())

其他相似的问题还可以参考 issue

Fix

知道了上面的原因之后,我们稍微修改一下 defer psc.Close() 的位置即可解决问题:

	// start a new goroutine to receive message
	go func() {
		// IMPORTANT!
		defer psc.Close()
		for {
			switch msg := psc.Receive().(type) {
			case error:

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

逆袭大学

逆袭大学

贺利坚 / 人民邮电出版社 / 2014-3 / 49.00

《逆袭大学——传给IT学子的正能量》以作者近二十年的从教经历和义务为IT学子解答咨询的工作为基础,以认识专业为起点,以编程能力的提高为关键,帮助计算机类专业的大学生更新学习观念、重塑学习品质、培养学习方法,找到自己的大学之路。书中直接解答了学无用处论、专业兴趣、考研、职场等诸多大学生面临的典型困惑。 本书主要面向在校计算机类(包括软件工程、网络工程等)专业高校学生,也能让非计算机类专业的高校......一起来看看 《逆袭大学》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具