内容简介:日常的工作中,经常会用到队列(Queue),在python中有原生的队列,但是由于原生的队列是存在于内存当中,当系统重启后队列里的消息就没有,且无法进行分步式,Redis中的List数据有四种原语,LPUSH,LPOP,RPUSH,RPOP,配合使用可以实现简单的生产消费模型。push 是向列表中添加信息,pop是从列表中读取信息,l与r 则是左和右或者说列表头和列表尾,lpush将消息放到列表头,rpush将消息放到列表尾。可以看出每次调用lpush时,数据都添加到列表的最前面.
日常的工作中,经常会用到队列(Queue),在 python 中有原生的队列,但是由于原生的队列是存在于内存当中,当系统重启后队列里的消息就没有,且无法进行分步式,Redis中的List数据有四种原语,LPUSH,LPOP,RPUSH,RPOP,配合使用可以实现简单的生产消费模型。
原语说明
push 是向列表中添加信息,pop是从列表中读取信息,l与r 则是左和右或者说列表头和列表尾,lpush将消息放到列表头,rpush将消息放到列表尾。
>>> import redis >>> pool = redis.ConnectionPool(host='192.168.99.100', port=6739, db=0) >>> r=redis.Redis(connection_pool=pool) >>> r.lpush('task',"task1") 1L >>> r.lpush('task',"task2") 2L >>> r.lpush('task',"task10") 3L >>> r.lindex('task',0) 'task10' >>> r.lindex('task',1) 'task2' >>> r.lindex('task',2) 'task1' >>> r.lindex('task',3) >>>
可以看出每次调用lpush时,数据都添加到列表的最前面.
lpop是从列表的头开始出数据,现在task中有三条数据,[‘task10’,’task2’,’task1’],现在调用三次lpop
>>> r.lpop('task') 'task10' >>> r.lpop('task') 'task2' >>> r.lpop('task') 'task1' >>> r.lpop('task') >>> r.lpop('task')
可以看到lpop是从列表头中开始弹出数据的,当列表中没有数据的时候,则返回空。
rpush与rpop与其相反,从列表尾部进行操作
>>> r.rpush('task',1) 1L >>> r.rpush('task',2) 2L >>> r.rpush('task',3) 3L >>> r.rpush('task',10) 4L >>> for i in range(4): ... print r.lindex('task',i) ... 1 2 3 10 >>> r.rpop('task') '10' >>> r.rpop('task') '3' >>> r.rpop('task') '2' >>> r.rpop('task') '1' >>> r.rpop('task')
实现生产消费模型
那么利用列表的push与pop命令就可以实现简单的生产消费了
#coding:gbk import redis import time,threading pool = redis.ConnectionPool(host='192.168.99.100', port=6739, db=0) r=redis.Redis(connection_pool=pool) def ping(r): # 一直执行ping命令,防止连接丢失 while 1: try: r.ping() except: pass finally: time.sleep(1) def producer(r,l): times = 10 while times >0 : l.acquire() try: r.lpush('task','%s produce data'%threading.current_thread().name) finally: l.release() time.sleep(2) times -=1 def consumer(r,l): while 1: l.acquire() task = None try: task = r.lpop('task') if task: print "%s get a task %s "%(threading.current_thread().name,task) finally: l.release() time.sleep(1) if __name__ == '__main__': thlist = [] t1 = threading.Thread(target=ping,args=(r,)) t1.setDaemon(True) thlist.append(t1) lock_c = threading.Lock() lock_p = threading.Lock() for i in range(10): #10个生产者与消费者 t_p = threading.Thread(target=producer,args=(r,lock_p),name="Producer:%s"%i) t_c = threading.Thread(target=consumer, args=(r, lock_c), name="Consumer:%s" % i) thlist.append(t_p) thlist.append(t_c) t_p.setDaemon(True) t_c.setDaemon(True) for th in thlist: th.start() # th.join() print 2222 time.sleep(10)
阻塞pop
lpop与rpop还有一个阻塞的版本,当给定列表内没有任何元素可供弹出的时候,连接将被 BLPOP 命令阻塞,直到等待超时或发现可弹出元素为止。有时候,为了等待一个新元素到达数据中,需要使用轮询的方式对数据进行探查。
另一种更好的方式是,使用系统提供的阻塞原语,在新元素到达时立即进行处理,而新元素还没到达时,就一直阻塞住,避免轮询占用资源。
发布订阅
可以配合使用 publish
和 pubsub
来实现发布订阅
发布
pool = redis.ConnectionPool(host='192.168.99.100', port=6739, db=0) r=redis.Redis(connection_pool=pool) r.publish('mychanel','hello everyone')
该方法返回的是订阅者的数量
订阅
pool = redis.ConnectionPool(host='192.168.99.100', port=6739, db=0) r=redis.Redis(connection_pool=pool) p = r.pubsub() #打开订阅功能 p.subscribe(['mychanel']) # 订阅关注的频道,可以是多个 p.parse_response()
订阅者能收到的信息只能是自它开始订阅之后的消息,之前已经发布的就不能收到了。
parse_response()
是阻塞的,只有当收到消息时才结束,可以写一个while 循环,但还有一个更好的方法,是调用它的 listen()
方法
>>> for i in p.listen(): ... print i
redis可以作为简单的消息队列来用,但是它毕竟不是专业的消息队列,如果对于有很大的消息队列需求的系统还是考虑使用专业的MQ如kafka等。
以上所述就是小编给大家介绍的《使用Redis作为消息队列实现生产消费与发布订阅》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 消息队列和发布订阅
- Lua Web快速开发指南(10) - 利用MQ实现异步任务、订阅/发布、消息队列
- 设计模式之发布订阅模式(2) Redis实现发布订阅模式
- 设计模式之发布订阅模式(1) 一文搞懂发布订阅模式
- 使用并解析 OPML 格式的订阅列表来转移自己的 RSS 订阅(解析篇)
- Redis订阅与发布
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
C++ Primer Plus
Stephen Prata / 张海龙、袁国忠 / 人民邮电出版社 / 2012-6-19 / 99.00元
C++是在C语言基础上开发的一种集面向对象编程、通用编程和传统的过程化编程于一体的编程语言,是C语言的超集。本书是根据2003年的ISO/ANSI C++标准编写的。通过大量短小精悍的程序详细而全面地阐述了C++的基本概念和技术。全书分为18章和10个附录,分别介绍了C++程序的运行方式、基本数据类型、复合数据类型、循环和关系表达式、分支语句和逻辑操作符、函数重载和函数模板、内存模型和名称空间、类......一起来看看 《C++ Primer Plus》 这本书的介绍吧!