PHP 项目中应用 Beanstalkd 消息队列

栏目: 后端 · 发布时间: 5年前

内容简介:最近偶然看到 Beanstalkd 这个消息队列中间件。根据网上资料显示,它是 Facebook 开发的一款用来处理系统消息的中间件。处理过千万级别消息的处理。该项目源码已经开源。网上摘抄的一部分对它的介绍:官网地址:

最近偶然看到 Beanstalkd 这个消息队列中间件。根据网上资料显示,它是 Facebook 开发的一款用来处理系统消息的中间件。处理过千万级别消息的处理。该项目源码已经开源。

网上摘抄的一部分对它的介绍:

Beanstalkd,一个高性能、轻量级的分布式内存队列系统。最初设计的目的是想通过后台异步执行耗时的任务来降低高容量 Web 应用系统的页面访问延迟,支持过有9.5 million 用户的 Facebook Causes 应用。后来开源,现在有 PostRank 大规模部署和使用,每天处理百万级任务。Beanstalkd 是典型的类  Memcached  设计,协议和使用方式都是同样的风格,所以使用过 memcached 的用户会觉得 Beanstalkd 似曾相识。

官网地址: https://beanstalkd.github.io/

1)Beanstalkd 核心概念

job:一个需要异步处理的任务,需要放在一个 tube 中。

tube:一个有名的任务队列,用来存储统一类型的 job。

producer:job 的生产者。

consumer:job 的消费者。

简单来说流程就一句话:

由 producer 产生一个任务 job ,并将 job 推进到一个 tube 中,然后由 consumer 从 tube 中取出 job 执行(当然了,这一切的操作的前提是 beanstalk 服务正在运行中)。

2)Beanstalkd 与其他(Kafka/RabbitMQ/Redis) 有何异同

在实际项目中,我们很多项目都会用到 Redis。并且它一些待处理的数据放入 Redis 队列中。例如短信发送,APP 应用消息推送。通常我们都会把这些任务放入 Redis 队列。然后 PHP 写一个消费端通过 Supefvisor 设置为后台守护进程去消费队列。一般情况下,有 Redis/Kafka/RabbitMQ 已经足够使用了。

那么,我们为何还需要使用 Beanstalkd 中间件呢?它有何种优势让我们对它青睐有加呢?

Beanstalkd 特性

1、支持优先级(支持任务插队)

2、延迟(实现定时任务)

3、持久化(定时把内存中的数据刷到 binlog 日志)

4、预留(把任务设置成预留,消费者无法取出任务,等某个合适时机再拿出来处理)

5、任务超时重发(消费者必须在指定时间内处理任务,如果没有则认为任务失败,重新进入队列)

根据对 Beanstalkd 特性的分析,如果系统当中没有优先级、延迟等需求。建议还是使用 Redis/Kafka/RabbitMQ 等消息中间件。相对来说,我们对这些常用的中间件更熟悉,社区更加的完备,能更好的使用它们来解决我们的日常工作。

3)Beanstalkd 执行流程

Beanstalkd 中间件我没有在实际的项目中应用过它。但是,Facebook 在其系统当中大面积应用过。想来也是非常稳定可靠的中间件。我从网上再次摘抄一份执行流程的介绍给大家看看:

一个 job 有 READY(时刻准备着被消费者取出), RESERVED(任务正在被一个消费者处理中), DELAYED(延迟任务,设定的延迟时间后进入ready状态), BURIED(休眠中,需要转移状态后才能操作)四种状态。当 producer 直接 put 一个 job 时,job 就处于 READY 状态,等待consumer 来处理,如果选择延迟 put,job 就先到 DELAYED 状态,等待时间过后才迁移到 READY 状态。consumer 获取了当前 READY 的 job后,该 job 的状态就迁移到 RESERVED,这样其他的 consumer 就不能再操作该 job。当 consumer 完成该 job 后,可以选择 delete, release 或者 bury 操作; delete 之后,job 从系统消亡,之后不能再获取;release 操作可以重新把该 job 状态迁移回 READY(也可以延迟该状态迁移操作),使其他的 consumer 可以继续获取和执行该 job;有意思的是 bury 操作,可以把该 job 休眠,等到需要的时候,再将休眠的 job kick 回 READY 状态,也可以 delete BURIED 状态的job。正是有这些有趣的操作和状态,才可以基于此做出很多意思的应用,比如要实现一个循环队列,就可以将 RESERVED 状态的 job 休眠掉,等没有 READY 状态的 job 时再将 BURIED 状态的 job一次性 kick 回 READY 状态。

beanstalkd 拥有的一些特性:
++    producer 产生的任务可以给他分配一个优先级,支持0到2**32的优先级,值越小,优先级越高,默认优先级为1024。
      优先级高的会被消费者首先执行
++    持久化,可以通过 binlog 将 job 及其状态记录到文件里面,在 Beanstalkd 下次启动时可以
      通过读取 binlog 来恢复之前的 job 及状态。
++    分布式容错,分布式设计和 Memcached 类似,beanstalkd 各个 server 之间并不知道彼此的存在,
      都是通过 client 来实现分布式以及根据 tube 名称去特定 server 获取 job。
++    超时控制,为了防止某个consumer长时间占用任务但不能处理的情况,
      Beanstalkd 为 reserve 操作设置了 timeout 时间,如果该 consumer 不能在指定时间内完成 job,
      job 将被迁移回 READY 状态,供其他 consumer 执行。

4) Beanstalkd 安装/启动

在 CentOS 系统安装是相当简单的。

yum -y install beanstalkd --enablerepo=epel;

查看 beanstalkd 版本号:

beanstalkd -v

启动:

beanstalkd -l 0.0.0.0 -p 11300 -b /data/beanstalkd/binlogs
-l 指定服务绑定哪个 IP。
-p 指定服务监听的端口。
-b binlog 二进制日志存放目录。当服务器重启之后通过它来恢复数据用。

除了上述参数外,还有其他的。以下是通过 --help 得到的文档:

Options:
 -b DIR   wal directory
 -f MS    fsync at most once every MS milliseconds (use -f0 for "always fsync")
 -F       never fsync (default)
 -l ADDR  listen on address (default is 0.0.0.0)
 -p PORT  listen on port (default is 11300)
 -u USER  become user and group
 -z BYTES set the maximum job size in bytes (default is 65535)
 -s BYTES set the size of each wal file (default is 10485760)
            (will be rounded up to a multiple of 512 bytes)
 -c       compact the binlog (default)
 -n       do not compact the binlog
 -v       show version information
 -V       increase verbosity
 -h       show this help

5) PHP 使用

说得再多,不如实际来使用一下。才能感受到它的魅力。

5.1) 安装 Beanstalkd composer 包

希望你的系统环境已经安装了 composer 工具。那么,即下来你就可以在你的项目根目录下面运行以下命令:

composer require pda/pheanstalk

如果相关 composer 知识不知道的话,只能自己去 Google 搜索学习了。

5.2)生产者 Producer 添加任务

<?php
require('./vendor/autoload.php');

use Pheanstalk\Pheanstalk;
// Create using autodetection of socket implementation

//创建一个Pheanstalk对象
$p = new Pheanstalk('127.0.0.1', 11300);
 
$data = array(
    'id' => 1,
    'name' => 'test',
);
 
//向userReg管道中添加任务,返回任务ID
//put()方法有四个参数
//第一个任务的数据
//第二个任务的优先级,值越小,越先处理
//第三个任务的延迟
//第四个任务的ttr超时时间
$id = $p->useTube('userReg')->put(json_encode($data));
//获取任务
$job = $p->peek($id);
//查看任务状态
print_r($p->statsJob($job));

这里要注意:我这个 pda/pheanstalk 默认安装的是最后一个稳定版本:3.2.1 版本。如果使用 4.x 系列版本。用法有差别。

5.3)消费端 consumer 消费任务

<?php
require './vendor/autoload.php';
 
use Pheanstalk\Pheanstalk;
 
//创建一个Pheanstalk对象
$p = new Pheanstalk('127.0.0.1', 11300);
 
//监听userReg管道,忽略default管道
$job = $p->watch('userReg')->ignore('default')->reserve();
 
$data = json_decode($job->getData());
//打印任务中的数据
print_r($data);
 
//最后删除任务,表示任务处理完成
$p->delete($job);

当然了。我这里没有对取出来的任务做任何的业务处理。在实际项目中这里肯定是要增加业务处理代码。

以上是一个最基本最核心的操作了。

5.4)Beanstalkd 的方法预览

中间件系统维护系列方法

stats() 查看状态方法
listTubes() 目前存在的管道
listTubesWatched() 目前监听的管道
statsTube() 管道的状态
useTube() 指定使用的管道
statsJob() 查看任务的详细信息
peek() 通过任务ID获取任务

生产者方法

putInTube() 往管道中写入数据
put() 配合useTube()使用

消费者方法

watch() 监听管道,可以同时监听多个管道
ignore() 不监听管道
reserve() 以阻塞方式监听管道,获取任务
reserveFromTube()
release() 把任务重新放回管道
bury() 把任务预留
peekBuried() 把预留任务读取出来
kickJob() 把buried状态的任务设置成ready
kick() 批量把buried状态的任务设置成ready
peekReady() 把准备好的任务读取出来
peekDelayed() 把延迟的任务读取出来
pauseTube() 给管道设置延迟
resumeTube() 取消管道延迟
touch() 让任务重新计算ttr时间,给任务续命

6) 总结

1)Facebook 出品的东西,值得一用。稳定/实用。

2)如果没有优先级/定时/延时等消息类型的任务。还是使用 Redis/Kafka/RabbitMQ 消息中间件。

3)如果支持任务调度执行就好了。比如,当一个任务添加之后,可以自己调用 PHP 脚本把任务消费了。这个是我目前最需要的。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Effective Objective-C 2.0

Effective Objective-C 2.0

Matt Galloway / 爱飞翔 / 机械工业出版社 / 2014-1 / 69.00元

《effective objective-c 2.0:编写高质量ios与os x代码的52个有效方法》是世界级c++开发大师scott meyers亲自担当顾问编辑的“effective software development series”系列丛书中的新作,amazon全五星评价。从语法、接口与api设计、内存管理、框架等7大方面总结和探讨了objective-c编程中52个鲜为人知和容易被忽......一起来看看 《Effective Objective-C 2.0》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换