使用Redis作为消息队列实现生产消费与发布订阅

栏目: 数据库 · 发布时间: 5年前

内容简介:日常的工作中,经常会用到队列(Queue),在python中有原生的队列,但是由于原生的队列是存在于内存当中,当系统重启后队列里的消息就没有,且无法进行分步式,Redis中的List数据有四种原语,LPUSH,LPOP,RPUSH,RPOP,配合使用可以实现简单的生产消费模型。push 是向列表中添加信息,pop是从列表中读取信息,l与r 则是左和右或者说列表头和列表尾,lpush将消息放到列表头,rpush将消息放到列表尾。可以看出每次调用lpush时,数据都添加到列表的最前面.

日常的工作中,经常会用到队列(Queue),在 python 中有原生的队列,但是由于原生的队列是存在于内存当中,当系统重启后队列里的消息就没有,且无法进行分步式,Redis中的List数据有四种原语,LPUSH,LPOP,RPUSH,RPOP,配合使用可以实现简单的生产消费模型。

原语说明

push 是向列表中添加信息,pop是从列表中读取信息,l与r 则是左和右或者说列表头和列表尾,lpush将消息放到列表头,rpush将消息放到列表尾。

>>> import redis
>>> pool = redis.ConnectionPool(host='192.168.99.100', port=6739, db=0)
>>> r=redis.Redis(connection_pool=pool)
>>> r.lpush('task',"task1")
1L
>>> r.lpush('task',"task2")
2L
>>> r.lpush('task',"task10")
3L
>>> r.lindex('task',0)
'task10'
>>> r.lindex('task',1)
'task2'
>>> r.lindex('task',2)
'task1'
>>> r.lindex('task',3)
>>>

可以看出每次调用lpush时,数据都添加到列表的最前面.

lpop是从列表的头开始出数据,现在task中有三条数据,[‘task10’,’task2’,’task1’],现在调用三次lpop

>>> r.lpop('task')
'task10'
>>> r.lpop('task')
'task2'
>>> r.lpop('task')
'task1'
>>> r.lpop('task')
>>> r.lpop('task')

可以看到lpop是从列表头中开始弹出数据的,当列表中没有数据的时候,则返回空。

rpush与rpop与其相反,从列表尾部进行操作

>>> r.rpush('task',1)
1L
>>> r.rpush('task',2)
2L
>>> r.rpush('task',3)
3L
>>> r.rpush('task',10)
4L
>>> for i in range(4):
...     print r.lindex('task',i)
...
1
2
3
10
>>> r.rpop('task')
'10'
>>> r.rpop('task')
'3'
>>> r.rpop('task')
'2'
>>> r.rpop('task')
'1'
>>> r.rpop('task')

实现生产消费模型

那么利用列表的push与pop命令就可以实现简单的生产消费了

#coding:gbk
import redis
import time,threading

pool = redis.ConnectionPool(host='192.168.99.100', port=6739, db=0)
r=redis.Redis(connection_pool=pool)


def ping(r):
    # 一直执行ping命令,防止连接丢失
    while 1:
        try:
            r.ping()
        except:
            pass
        finally:
            time.sleep(1)


def producer(r,l):
        times = 10
        while times >0 :
            l.acquire()
            try:
                r.lpush('task','%s produce data'%threading.current_thread().name)
            finally:
                l.release()
                time.sleep(2)
                times -=1

def consumer(r,l):
    while 1:
        l.acquire()
        task = None
        try:
            task = r.lpop('task')
            if task:
                print "%s get a task %s "%(threading.current_thread().name,task)
        finally:
            l.release()
            time.sleep(1)

if __name__ == '__main__':
    thlist = []
    t1 = threading.Thread(target=ping,args=(r,))
    t1.setDaemon(True)
    thlist.append(t1)
    lock_c = threading.Lock()
    lock_p = threading.Lock()

    for i in range(10): #10个生产者与消费者
        t_p = threading.Thread(target=producer,args=(r,lock_p),name="Producer:%s"%i)
        t_c = threading.Thread(target=consumer, args=(r, lock_c), name="Consumer:%s" % i)
        thlist.append(t_p)
        thlist.append(t_c)
        t_p.setDaemon(True)
        t_c.setDaemon(True)
    for th in thlist:
        th.start()
        # th.join()

    print 2222
    time.sleep(10)

阻塞pop

lpop与rpop还有一个阻塞的版本,当给定列表内没有任何元素可供弹出的时候,连接将被 BLPOP 命令阻塞,直到等待超时或发现可弹出元素为止。有时候,为了等待一个新元素到达数据中,需要使用轮询的方式对数据进行探查。

另一种更好的方式是,使用系统提供的阻塞原语,在新元素到达时立即进行处理,而新元素还没到达时,就一直阻塞住,避免轮询占用资源。

发布订阅

可以配合使用 publishpubsub 来实现发布订阅

发布

pool = redis.ConnectionPool(host='192.168.99.100', port=6739, db=0)
r=redis.Redis(connection_pool=pool)
r.publish('mychanel','hello everyone')

该方法返回的是订阅者的数量

订阅

pool = redis.ConnectionPool(host='192.168.99.100', port=6739, db=0)
r=redis.Redis(connection_pool=pool)
p = r.pubsub() #打开订阅功能
p.subscribe(['mychanel']) # 订阅关注的频道,可以是多个
p.parse_response()

订阅者能收到的信息只能是自它开始订阅之后的消息,之前已经发布的就不能收到了。

parse_response() 是阻塞的,只有当收到消息时才结束,可以写一个while 循环,但还有一个更好的方法,是调用它的 listen() 方法

>>> for i in p.listen():
...     print i

redis可以作为简单的消息队列来用,但是它毕竟不是专业的消息队列,如果对于有很大的消息队列需求的系统还是考虑使用专业的MQ如kafka等。


以上所述就是小编给大家介绍的《使用Redis作为消息队列实现生产消费与发布订阅》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

C++ Primer Plus

C++ Primer Plus

Stephen Prata / 张海龙、袁国忠 / 人民邮电出版社 / 2012-6-19 / 99.00元

C++是在C语言基础上开发的一种集面向对象编程、通用编程和传统的过程化编程于一体的编程语言,是C语言的超集。本书是根据2003年的ISO/ANSI C++标准编写的。通过大量短小精悍的程序详细而全面地阐述了C++的基本概念和技术。全书分为18章和10个附录,分别介绍了C++程序的运行方式、基本数据类型、复合数据类型、循环和关系表达式、分支语句和逻辑操作符、函数重载和函数模板、内存模型和名称空间、类......一起来看看 《C++ Primer Plus》 这本书的介绍吧!

MD5 加密
MD5 加密

MD5 加密工具

SHA 加密
SHA 加密

SHA 加密工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换