前言
最近在用 golang 做一些 redis 相关的操作,选用了 redigo 这个第三方库。然后在使用 Pub/Sub 的时候,却发现了一个小坑……
Redis Client
首先,我们来初始化一个带连接池的 Redis Client:
import ( "github.com/gomodule/redigo/redis" ) type RedisClient struct { pool *redis.Pool } func NewRedisClient(addr string, db int, passwd string) *RedisClient { pool := &redis.Pool{ MaxIdle: 10, IdleTimeout: 300 * time.Second, Dial: func() (redis.Conn, error) { c, err := redis.Dial("tcp", addr, redis.DialPassword(passwd), redis.DialDatabase(db)) if err != nil { return nil, err } return c, nil }, TestOnBorrow: func(c redis.Conn, t time.Time) error { if time.Since(t) < time.Minute { return nil } _, err := c.Do("PING") return err }, } log.Printf("new redis pool at %s", addr) client := &RedisClient{ pool: pool, } return client }
Publish
然后我们可以简单的实现一个 publish 方法:
func (r *RedisClient) Publish(channel, message string) (int, error) { c := r.pool.Get() defer c.Close() n, err := redis.Int(c.Do("PUBLISH", channel, message)) if err != nil { return 0, fmt.Errorf("redis publish %s %s, err: %v", channel, message, err) } return n, nil }
Subscribe
接下来就是一个稍微复杂点的带有心跳的 subscribe 方法:
func (r *RedisClient) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error { psc := redis.PubSubConn{Conn: r.pool.Get()} defer psc.Close() log.Printf("redis pubsub subscribe channel: %v", channel) if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil { return err } done := make(chan error, 1) // start a new goroutine to receive message go func() { for { switch msg := psc.Receive().(type) { case error: done <- fmt.Errorf("redis pubsub receive err: %v", msg) return case redis.Message: if err := consume(msg); err != nil { done <- err return } case redis.Subscription: if msg.Count == 0 { // all channels are unsubscribed done <- nil return } } } }() // health check tick := time.NewTicker(time.Minute) defer tick.Stop() for { select { case <-ctx.Done(): if err := psc.Unsubscribe(); err != nil { return fmt.Errorf("redis pubsub unsubscribe err: %v", err) } return nil case err := <-done: return err case <-tick.C: if err := psc.Ping(""); err != nil { return err } } } return nil }
最后,我们写一个简单地 main 函数来调用 publish & subscribe:
func (r *RedisClient) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error { psc := redis.PubSubConn{Conn: r.pool.Get()} defer psc.Close() log.Printf("redis pubsub subscribe channel: %v", channel) if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil { return err } done := make(chan error, 1) // start a new goroutine to receive message go func() { for { switch msg := psc.Receive().(type) { case error: done <- fmt.Errorf("redis pubsub receive err: %v", msg) return case redis.Message: if err := consume(msg); err != nil { done <- err return } case redis.Subscription: if msg.Count == 0 { // all channels are unsubscribed done <- nil return } } } }() // health check tick := time.NewTicker(time.Minute) defer tick.Stop() for { select { case <-ctx.Done(): if err := psc.Unsubscribe(); err != nil { return fmt.Errorf("redis pubsub unsubscribe err: %v", err) } return nil case err := <-done: return err case <-tick.C: if err := psc.Ping(""); err != nil { return err } } } return nil }
坑
咋一看之下,好像并没有什么异常?然而,如果我们这时候去看 redis 的 tcp 连接,就可以发现一些猫腻:
$sudo netstat -antp | grep redis tcp 0 0 0.0.0.0:6379 0.0.0.0:* LISTEN 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55010 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55015 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55009 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55005 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55012 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55011 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55013 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55007 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55006 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55014 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:54972 ESTABLISHED 940/redis-server 0.
竟然是每一次 subscribe 就新建了一个连接,而 connection pool 似乎没有什么作用。
更进一步地调试,我们发现在 defer psc.Close() 的时候就卡住了,也就是上面的 10 个 goroutine 其实并没有正常退出。
Concurrent
排查许久之后,终于定位到了问题!引用 redigo 的说明:
Connections support one concurrent caller to the Receive method and one concurrent caller to the Send and Flush methods. No other concurrency is supported including concurrent calls to the Do method.
For full concurrent access to Redis, use the thread-safe Pool to get, use and release a connection from within a goroutine. Connections returned from a Pool have the concurrency restrictions described in the previous paragraph.
也就是说,虽然一个连接可以在不同的 goroutine 并发调用 Receive() 和 Subscribe()(subscribe调用了send和flush) ,但是却不能再有其他并发操作(比如 Close())。
其他相似的问题还可以参考 issue
Fix
知道了上面的原因之后,我们稍微修改一下 defer psc.Close() 的位置即可解决问题:
// start a new goroutine to receive message go func() { // IMPORTANT! defer psc.Close() for { switch msg := psc.Receive().(type) { case error:
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对的支持。
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
《魔兽世界》大逃杀!60人新游玩模式《强袭风暴》3月21日上线
暴雪近日发布了《魔兽世界》10.2.6 更新内容,新游玩模式《强袭风暴》即将于3月21 日在亚服上线,届时玩家将前往阿拉希高地展开一场 60 人大逃杀对战。
艾泽拉斯的冒险者已经征服了艾泽拉斯的大地及遥远的彼岸。他们在对抗世界上最致命的敌人时展现出过人的手腕,并且成功阻止终结宇宙等级的威胁。当他们在为即将于《魔兽世界》资料片《地心之战》中来袭的萨拉塔斯势力做战斗准备时,他们还需要在熟悉的阿拉希高地面对一个全新的敌人──那就是彼此。在《巨龙崛起》10.2.6 更新的《强袭风暴》中,玩家将会进入一个全新的海盗主题大逃杀式限时活动,其中包含极高的风险和史诗级的奖励。
《强袭风暴》不是普通的战场,作为一个独立于主游戏之外的活动,玩家可以用大逃杀的风格来体验《魔兽世界》,不分职业、不分装备(除了你在赛局中捡到的),光是技巧和战略的强弱之分就能决定出谁才是能坚持到最后的赢家。本次活动将会开放单人和双人模式,玩家在加入海盗主题的预赛大厅区域前,可以从强袭风暴角色画面新增好友。游玩游戏将可以累计名望轨迹,《巨龙崛起》和《魔兽世界:巫妖王之怒 经典版》的玩家都可以获得奖励。
更新日志
- 小骆驼-《草原狼2(蓝光CD)》[原抓WAV+CUE]
- 群星《欢迎来到我身边 电影原声专辑》[320K/MP3][105.02MB]
- 群星《欢迎来到我身边 电影原声专辑》[FLAC/分轨][480.9MB]
- 雷婷《梦里蓝天HQⅡ》 2023头版限量编号低速原抓[WAV+CUE][463M]
- 群星《2024好听新歌42》AI调整音效【WAV分轨】
- 王思雨-《思念陪着鸿雁飞》WAV
- 王思雨《喜马拉雅HQ》头版限量编号[WAV+CUE]
- 李健《无时无刻》[WAV+CUE][590M]
- 陈奕迅《酝酿》[WAV分轨][502M]
- 卓依婷《化蝶》2CD[WAV+CUE][1.1G]
- 群星《吉他王(黑胶CD)》[WAV+CUE]
- 齐秦《穿乐(穿越)》[WAV+CUE]
- 发烧珍品《数位CD音响测试-动向效果(九)》【WAV+CUE】
- 邝美云《邝美云精装歌集》[DSF][1.6G]
- 吕方《爱一回伤一回》[WAV+CUE][454M]