内容简介:rq的意思是 Redis Queue。这个项目和redis是强结合的,此外还有一个重度依赖是pickle。 这是这个项目很简单的原因之一。首先我们需要fork一份源代码例如:那么我们应该怎么读源码呢?入口点是什么?编写代码的时候我们的入口点是main函数, 那么读源代码的时候入口点应该是什么呢?我们是怎么启动rq的worker呢?
rq的意思是 Redis Queue。这个项目和 redis 是强结合的,此外还有一个重度依赖是pickle。 这是这个项目很简单的原因之一。
拷贝源码
首先我们需要fork一份源代码例如: https://github.com/jiajunhuang/rq
,然后拷贝
到本地。进入源码文件夹 $ cd rq/rq
,我们可以看到目录结构:
$ tree . ├── cli │ ├── cli.py │ ├── helpers.py │ └── __init__.py ├── compat │ ├── connections.py │ ├── dictconfig.py │ └── __init__.py ├── connections.py ├── contrib │ ├── __init__.py │ ├── legacy.py │ └── sentry.py ├── decorators.py ├── defaults.py ├── dummy.py ├── exceptions.py ├── handlers.py ├── __init__.py ├── job.py ├── local.py ├── logutils.py ├── queue.py ├── registry.py ├── suspension.py ├── timeouts.py ├── utils.py ├── version.py └── worker.py 3 directories, 26 files
入口
那么我们应该怎么读源码呢?入口点是什么?编写代码的时候我们的入口点是main函数, 那么读源代码的时候入口点应该是什么呢?我们是怎么启动rq的worker呢?
$ rq worker
所以我们看看rq这个命令是怎么来的:
cat `which rq` #!/home/jiajun/.py3k/bin/python3 # -*- coding: utf-8 -*- import re import sys from rq.cli import main if __name__ == '__main__': sys.argv[0] = re.sub(r'(-script\.pyw?|\.exe)?$', '', sys.argv[0]) sys.exit(main())
说明入口点在 rq.cli
的main函数里。接下来我们看看 rq.cli
从何而来。
cat cli/__init__.py # flake8: noqa from .cli import main # TODO: the following imports can be removed when we drop the `rqinfo` and # `rqworkers` commands in favor of just shipping the `rq` command. from .cli import info, worker
接下来我们看看 cli/cli.py
这个文件,里面可以看到 def worker
,这就是我们要找
的入口点。可以看到真正开始工作的地方是 worker.work(burst=burst)
默认值
DEFAULT_JOB_CLASS = 'rq.job.Job' DEFAULT_QUEUE_CLASS = 'rq.Queue' DEFAULT_WORKER_CLASS = 'rq.Worker' DEFAULT_CONNECTION_CLASS = 'redis.StrictRedis' DEFAULT_WORKER_TTL = 420 DEFAULT_RESULT_TTL = 500
一路追查worker初始化的地方的来源:
queues = [cli_config.queue_class(queue, connection=cli_config.connection, job_class=cli_config.job_class) for queue in queues] worker = cli_config.worker_class(queues, name=name, connection=cli_config.connection, default_worker_ttl=worker_ttl, default_result_ttl=results_ttl, job_class=cli_config.job_class, queue_class=cli_config.queue_class, exception_handlers=exception_handlers or None) worker.work(burst=burst)
就可以追查到上述默认值,这些值我们之后还会看到。
探究 work
打开 worker.py
,找到 def work
:
def work(self, burst=False, logging_level="INFO"): """Starts the work loop. Pops and performs all jobs on the current list of queues. When all queues are empty, block and wait for new jobs to arrive on any of the queues, unless `burst` mode is enabled. The return value indicates whether any jobs were processed. """ setup_loghandlers(logging_level) self._install_signal_handlers() did_perform_work = False self.register_birth() self.log.info("RQ worker {0!r} started, version {1}".format(self.key, VERSION)) self.set_state(WorkerStatus.STARTED) try: while True: try: self.check_for_suspension(burst) if self.should_run_maintenance_tasks: self.clean_registries() if self._stop_requested: self.log.info('Stopping on request') break timeout = None if burst else max(1, self.default_worker_ttl - 60) result = self.dequeue_job_and_maintain_ttl(timeout) if result is None: if burst: self.log.info("RQ worker {0!r} done, quitting".format(self.key)) break job, queue = result self.execute_job(job, queue) self.heartbeat() did_perform_work = True except StopRequested: break finally: if not self.is_horse: self.register_death() return did_perform_work
可以看到这一段代码做的事情:
- 配置好日志
- 安装好信号处理器
- 注册worker
- 把状态设置成开始工作
-
然后开始进入循环
- 检查当前worker是否被暂停了
- 弹出一个job来
- 开始执行job
- 执行完成之后发送心跳
job?这应该就是我们的任务了,那么,它是从何而来呢?我们的worker是怎么知道哪个任务从何而来呢?
job从何而来
我们可以看到,job是从 job, queue = result
来的,而 result 是从 result = self.dequeue_job_and_maintain_ttl(timeout)
来的。我们看看后者:
def dequeue_job_and_maintain_ttl(self, timeout): result = None qnames = self.queue_names() self.set_state(WorkerStatus.IDLE) self.procline('Listening on {0}'.format(','.join(qnames))) self.log.info('') self.log.info('*** Listening on {0}...'.format(green(', '.join(qnames)))) while True: self.heartbeat() try: result = self.queue_class.dequeue_any(self.queues, timeout, connection=self.connection, job_class=self.job_class) if result is not None: job, queue = result self.log.info('{0}: {1} ({2})'.format(green(queue.name), blue(job.description), job.id)) break except DequeueTimeout: pass self.heartbeat() return result
继续追查 self.queue_class.dequeue_any
就是 queue.py
里的 Queue
的函数:
@classmethod def dequeue_any(cls, queues, timeout, connection=None, job_class=None): """Class method returning the job_class instance at the front of the given set of Queues, where the order of the queues is important. When all of the Queues are empty, depending on the `timeout` argument, either blocks execution of this function for the duration of the timeout or until new messages arrive on any of the queues, or returns None. See the documentation of cls.lpop for the interpretation of timeout. """ job_class = backend_class(cls, 'job_class', override=job_class) while True: queue_keys = [q.key for q in queues] result = cls.lpop(queue_keys, timeout, connection=connection) if result is None: return None queue_key, job_id = map(as_text, result) queue = cls.from_queue_key(queue_key, connection=connection, job_class=job_class) try: job = job_class.fetch(job_id, connection=connection) except NoSuchJobError: # Silently pass on jobs that don't exist (anymore), # and continue in the look continue except UnpickleError as e: # Attach queue information on the exception for improved error # reporting e.job_id = job_id e.queue = queue raise e return job, queue return None, None
看到了 result = cls.lpop
,继续追查下去:
@classmethod def lpop(cls, queue_keys, timeout, connection=None): """Helper method. Intermediate method to abstract away from some Redis API details, where LPOP accepts only a single key, whereas BLPOP accepts multiple. So if we want the non-blocking LPOP, we need to iterate over all queues, do individual LPOPs, and return the result. Until Redis receives a specific method for this, we'll have to wrap it this way. The timeout parameter is interpreted as follows: None - non-blocking (return immediately) > 0 - maximum number of seconds to block """ connection = resolve_connection(connection) if timeout is not None: # blocking variant if timeout == 0: raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0') result = connection.blpop(queue_keys, timeout) if result is None: raise DequeueTimeout(timeout, queue_keys) queue_key, job_id = result return queue_key, job_id else: # non-blocking variant for queue_key in queue_keys: blob = connection.lpop(queue_key) if blob is not None: return queue_key, blob return None
原来就是从给定的queue里 lpop
出来,然后,查一下 blpop 的返回值,是返回的值所在
的list名和值。
Once new data is present on one of the lists, the client returns with the name of the key unblocking it and the popped value.
然后我们跳回到上一个函数。发现接下来的步骤是根据所得的job_id和queue_key实例化 Queue和Job。
那么我们看看其中调用的 Job.fetch
:
@classmethod def fetch(cls, id, connection=None): """Fetches a persisted job from its corresponding Redis key and instantiates it. """ job = cls(id, connection=connection) job.refresh() return job
job.refresh()
很可疑,因为到这一步之前,我们的 job的信息都还只是字符串。
在worker端worker是怎么知道要去调用哪里的函数呢?
我仔细看了看,差点就放过了 self.data = obj['data']
这一步,跟进去一看,结果发现不是,
其他地方也没有看到。
尴尬。
那就很奇怪了哈,肯定有个地方从字符串转回 python 对象的吧。于是我去翻了翻 文档
,
发现文档上写了,它是用pickle的,那肯定有地方用了 pickle.loads
,于是就搜到了 loads = pickle.loads
。继续搜看哪里用到了loads。
$ ack loads job.py 25:loads = pickle.loads 46: This is a helper method to not have to deal with the fact that `loads()` 51: obj = loads(pickled_string) 394: self._result = loads(rv)
分别看了一下51行和394行,感觉51行更像,于是就搜 unpickle
,于是找到了 Job 里的
def _unpickle_data(self): self._func_name, self._instance, self._args, self._kwargs = unpickle(self.data)
继续搜 _unpickle_data
,我们发现有四个地方用了它:
func_name instance args kwargs
原来在引用这些 property 的时候,如果还没有反序列化,就会先反序列化一下,算了,我们 先放下这个,看看接下来是如何执行job的好了。
Job是如何执行的
继续看看 Worker.work
的代码,拿到job之后,就开始执行 self.execute_job(job, queue)
,
跟进去看,看到了 self.fork_work_horse(job, queue)
,继续跟进去看,看到了:
def fork_work_horse(self, job, queue): """Spawns a work horse to perform the actual work and passes it a job. """ child_pid = os.fork() os.environ['RQ_WORKER_ID'] = self.name os.environ['RQ_JOB_ID'] = job.id if child_pid == 0: self.main_work_horse(job, queue) else: self._horse_pid = child_pid self.procline('Forked {0} at {1}'.format(child_pid, time.time()))
fork之后返回0的是子进程,我们继续看 self.main_work_horse
:
def main_work_horse(self, job, queue): """This is the entry point of the newly spawned work horse.""" # After fork()'ing, always assure we are generating random sequences # that are different from the worker. random.seed() self.setup_work_horse_signals() self._is_horse = True self.log = logger success = self.perform_job(job, queue) # os._exit() is the way to exit from childs after a fork(), in # constrast to the regular sys.exit() os._exit(int(not success))
继续看 self.perform_job
,发现中间执行了 job.perform
,然后发现调用了 self._execute
,然后发现使用了 self.func
这个属性,进去一看,发现使用了 self.func_name
!恍然大悟!这个时候终于发序列化了:
@property def func(self): func_name = self.func_name if func_name is None: return None if self.instance: return getattr(self.instance, func_name) return import_attribute(self.func_name)
找到了 func_name,然后导入,然后把 args和kwargs塞进去执行。就是这样!
那enqueue呢?
猜测一下,enqueue是怎么执行的?肯定是把函数的func_name,args,kwargs全部dump
然后塞到对应的queue里啊,答案就在 queue.py
的 enqueue_call
里。我就不继续写
了,就当作是练习吧 :)
以上所述就是小编给大家介绍的《任务队列怎么写?python rq源码分析》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 等待队列源码分析
- 聊聊 JDK 阻塞队列源码分析
- 源码级深挖 AQS 队列同步器
- Python 模块源码分析:queue 队列
- JStorm 源码分析 - 高性能队列 DisruptorQueue
- Laravel Queue——消息队列任务与分发源码剖析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。