内容简介:最近有个延迟执行的任务需求,比如发了一个定时红包,服务器不能相信客户端的一切,所以就得做时间的同步,但是PHP相对来讲不是很适合做这种“XX秒后去执行一个什么样的动作这类的行为”,但是这个功能又是不可缺少的,然后就周末花时间调研了下相关的实现。大致有如下几种:下面针对这几点一一做下简单的实现, 然后考虑到可维护性, 数据丢失后怎么恢复,服务监控等一系列问题。最后选择一个场景上来说更合适的吧。在正式使用Redis来实现这一延迟需求之前,我还了解到Redis的key notification事件提醒,可以在某
前言
最近有个延迟执行的任务需求,比如发了一个定时红包,服务器不能相信客户端的一切,所以就得做时间的同步,但是 PHP 相对来讲不是很适合做这种“XX秒后去执行一个什么样的动作这类的行为”,但是这个功能又是不可缺少的,然后就周末花时间调研了下相关的实现。大致有如下几种:
- 借助 Redis 的sorted_set和hash结构
- 自己写一个定时器,不断“轮询”触发
- 借助语言的异步库
- 借助消息队列等服务。
下面针对这几点一一做下简单的实现, 然后考虑到可维护性, 数据丢失后怎么恢复,服务监控等一系列问题。最后选择一个场景上来说更合适的吧。
借助Redis实现
在正式使用Redis来实现这一延迟需求之前,我还了解到Redis的key notification事件提醒,可以在某一个key过期的时候触发一个动作,这对于我们做延迟任务来讲,的确是很好的一个契机,但是打开了它就会不可避免的造成效率上的降低,而且线上服务器一般不会再去修改了,因此这个特性,自己了解下,玩玩就行了。具体的实现还是得老老实实设计数据结构了。
结构涉及
我的做法是 QUEUE 加上 CONTAINER。即会有一个根据时间不断往前移动的时间轴作为我们的队列,然后在队列上每一个时间戳,作为一个链表往外散发,保存多个task。
涉及描述
生产者productor.php
guo@Server218:/tmp$ cat productor.php
<?php
$redis = new Redis();
$redis->connect("localhost", 6379);
$redis->select(2);
$QUEUE = "asyncqueue:zset";
$SERIALIZER = "serialize:hash";
// 模拟生产延迟消息
for($index=0; $index<10; $index++) {
// 每秒可能会产生多条数据,但是只要“当秒”有数据,就需要添加到queue中
$ts = time();
$cursecond = rand(0, 9) % 2 == 0;
$tasklength = rand(0, 9) % 3;
if($cursecond == true) {
// 当前秒有task
$redis->zadd($QUEUE, $ts, $ts);
if($tasklength > 0) {
for($i=0; $i<$tasklength;$i++) {
$key = "2614677&".rand(0, 100000);
$redis->hset($SERIALIZER.":".$ts, $key, $key);
echo "[{$ts}] cursecond:{$cursecond}, KEY:{$key}\n";
}
}
}
sleep($tasklength);
}
消费者consumer.php
guo@Server218:/tmp$ cat consumer.php
<?php
$redis = new Redis();
$redis->connect("localhost", 6379);
$redis->select(2);
$QUEUE = "asyncqueue:zset";
$SERIALIZER = "serialize:hash";
$counter = 0;
while(true) {
$ts = 1542596034 + $counter;
$counter++;
$ret = $redis->zrangebyscore($QUEUE, $ts, $ts, array("WITHSCORES"=>true));
// 获取下具体的task并执行
$items = $redis->hgetall($SERIALIZER.":".$ts);
foreach($items as $key=>$member) {
echo "CONSUMER[{$ts}]\t[{$key}]\t{$member}\n";
}
if($counter>=10) {
break;
}
}
测试
先来看看生产的具体内容。
guo@Server218:/tmp$ vim productor.php [1542596034] cursecond:1, KEY:2614677&46685 [1542596034] cursecond:1, KEY:2614677&99086 [1542596036] cursecond:1, KEY:2614677&38241 [1542596037] cursecond:1, KEY:2614677&74988 [1542596038] cursecond:1, KEY:2614677&69443 [1542596038] cursecond:1, KEY:2614677&25523 [1542596040] cursecond:1, KEY:2614677&29642 [1542596040] cursecond:1, KEY:2614677&15928 [1542596042] cursecond:1, KEY:2614677&91626 [1542596042] cursecond:1, KEY:2614677&7382 Press ENTER or type command to continue
然后看看消费者是否正确消费。
Press ENTER or type command to continue CONSUMER[1542596034] [2614677&46685] 2614677&46685 CONSUMER[1542596034] [2614677&99086] 2614677&99086 CONSUMER[1542596036] [2614677&38241] 2614677&38241 CONSUMER[1542596037] [2614677&74988] 2614677&74988 CONSUMER[1542596038] [2614677&69443] 2614677&69443 CONSUMER[1542596038] [2614677&25523] 2614677&25523 CONSUMER[1542596040] [2614677&29642] 2614677&29642 CONSUMER[1542596040] [2614677&15928] 2614677&15928 CONSUMER[1542596042] [2614677&91626] 2614677&91626 CONSUMER[1542596042] [2614677&7382] 2614677&7382 Press ENTER or type command to continue
谈谈看法
- 利用Redis来实现,可以看出对Redis服务器的QPS会有一个微幅提升,这个问题可以通过multi管道来稍微优化下,这里就不多说了。
- 数据不会丢,这样即便是服务挂掉也能将未消费的任务进行恢复。
- 服务监控以及可维护性尚佳,基于Redis,稳定性能得到保证。
- 不用切换语言,易于实现,也无需增加额外的中间件,减少了维护工作。
定时器⏲
原理
在网上搜索相关实现的时候,搜到一篇不错的文章。 golang实现延迟消息的原理与方法 不错的文章,核心思路就在于下面这张图了。
定时器原理
代码实现
原文代码中有一个bug,就是在 执行任务轮询 的时候没有做休眠,会导致服务一直全速前进,这不太好。修改后的代码如下:
➜ asyncdemos cat delayring.go
package main
import (
"time"
"errors"
"fmt"
"github.com/kataras/iris"
"net/http"
"bytes"
"log"
"io/ioutil"
"encoding/json"
"github.com/garyburd/redigo/redis"
"strconv"
)
const (
TASK_TYPE_INTERVAL = 1
TASK_TYPE_DELAY = 2
QUEUE_LENGTH = 10
DINGTALK_WEBHOOK = "https://oapi.dingtalk.com/robot/send?access_token=b716e1f9f2f7d4fb93d4bb79db65a117d589f886d1757"
)
//延迟消息
type DelayMessage struct {
//当前下标
curIndex int;
//环形槽
slots [QUEUE_LENGTH]map[string]*Task;
//关闭
closed chan bool;
//任务关闭
taskClose chan bool;
//时间关闭
timeClose chan bool;
//启动时间
startTime time.Time;
}
//执行的任务函数
type TaskFunc func(args ...interface{});
//任务
type Task struct {
//循环次数
cycleNum int;
//执行的函数
exec TaskFunc;
params []interface{};
catagory int
}
//创建一个延迟消息
func NewDelayMessage() *DelayMessage {
dm := &DelayMessage{
curIndex: 0,
closed: make(chan bool),
taskClose: make(chan bool),
timeClose: make(chan bool),
startTime: time.Now(),
};
for i := 0; i < QUEUE_LENGTH; i++ {
dm.slots[i] = make(map[string]*Task);
}
return dm;
}
//启动延迟消息
func (dm *DelayMessage) Start() {
go dm.taskLoop();
go dm.timeLoop();
select {
case <-dm.closed:
{
dm.taskClose <- true;
dm.timeClose <- true;
break;
}
};
}
//关闭延迟消息
func (dm *DelayMessage) Close() {
dm.closed <- true;
}
//处理每1秒的任务
func (dm *DelayMessage) taskLoop() {
defer func() {
fmt.Println("taskLoop exit");
}();
for {
// TODO 看看怎么优化比较合适,要不加这个的话,程序会执行超过一次
time.Sleep(time.Second)
select {
case <-dm.taskClose:
{
return;
}
default:
{
//取出当前的槽的任务
tasks := dm.slots[dm.curIndex];
if len(tasks) > 0 {
//遍历任务,判断任务循环次数等于0,则运行任务
//否则任务循环次数减1
for k, v := range tasks {
if v.cycleNum == 0 {
fmt.Printf("\t\t\t\t\tCURINDEX[%v], key: %v, cyclenum: %v\n", dm.curIndex, k, v.cycleNum)
go v.exec(v.params...);
//删除运行过的任务 对于catagory=1的周期性任务不予删除
if v.catagory != TASK_TYPE_INTERVAL {
delete(tasks, k)
}
} else {
v.cycleNum--;
}
}
}
}
}
}
}
//处理每1秒移动下标
func (dm *DelayMessage) timeLoop() {
defer func() {
fmt.Println("timeLoop exit");
}();
tick := time.NewTicker(time.Second);
for {
select {
case <-dm.timeClose:
{
return;
}
case <-tick.C:
{
fmt.Printf("%v, [%v]\n", time.Now().Format("2006-01-02 15:04:05"), dm.curIndex);
//fmt.Println(dm.slots)
//判断当前下标,如果等于3599则重置为0,否则加1
if dm.curIndex == QUEUE_LENGTH - 1 {
dm.curIndex = 0;
} else {
dm.curIndex++;
}
}
}
}
}
//添加任务
//func (dm *DelayMessage) AddTask(t time.Time, key string, catagory int, exec TaskFunc, params []interface{}) error {
func (dm *DelayMessage) AddTask(seconds int, key string, catagory int, exec TaskFunc, params []interface{}) error {
//if dm.startTime.After(t) {
// return errors.New("时间错误");
//}
//当前时间与指定时间相差秒数
//subSecond := t.Unix() - dm.startTime.Unix();
//subSecond := int(t.Unix() - time.Now().Unix());
subSecond := seconds
//计算循环次数
cycleNum := int(subSecond / QUEUE_LENGTH);
//计算任务所在的slots的下标
ix := (subSecond + dm.curIndex ) % QUEUE_LENGTH ;
fmt.Printf("\t\t\t\t\t key: %v, cycle: %v, index: %v , curIndex: %v, subseconds: %v\n", key, cycleNum, ix, dm.curIndex, subSecond)
//把任务加入tasks中
tasks := dm.slots[ix];
if _, ok := tasks[key]; ok {
return errors.New("该slots中已存在key为" + key + "的任务");
}
tasks[key] = &Task{
cycleNum: cycleNum,
exec: exec,
params: params,
catagory: catagory,
};
// TODO 持久化部分,这样即便中途crash,下次重启也能得到及时的恢复
return nil;
}
func (dm *DelayMessage) DeleteTask(key string) error {
tasks := dm.slots[dm.curIndex]
if _, ok := tasks[key]; ok {
delete(tasks, key)
}
return nil
}
//func main() {
//创建延迟消息
//dm := NewDelayMessage();
////添加任务
//dm.AddTask(time.Now().Add(time.Second*2), "test1", TASK_TYPE_DELAY, func(args ...interface{}) {
// fmt.Println(args...);
//}, []interface{}{1, 2, 3});
//dm.AddTask(time.Now().Add(time.Second*4), "test2", TASK_TYPE_DELAY, func(args ...interface{}) {
// fmt.Println(args...);
//}, []interface{}{4, 5, 6});
//dm.AddTask(time.Now().Add(time.Second*12), "test3", TASK_TYPE_DELAY, func(args ...interface{}) {
// fmt.Println(args...);
//}, []interface{}{"hello", "world", "test"});
//dm.AddTask(time.Now().Add(time.Second), "test4", TASK_TYPE_INTERVAL, func(args ...interface{}) {
// fmt.Printf("操你妈", args...)
//}, []interface{}{1, 2, 3});
//
////40秒后关闭
////time.AfterFunc(time.Second*2, func() {
//// //dm.Close();
////});
//dm.Start();
//}
var mamager DelayMessage
//func publish(manager *DelayMessage, seconds int, key string, exec TaskFunc, params []interface{}) error {
// manager.AddTask(time.Now().Add(time.Second * time.Duration(seconds)), key, TASK_TYPE_DELAY, func(args... interface{}) {
// fmt.Println("key: " + key)
// }, params)
//
// return nil
//}
func httpPost(msg string, webhook string) {
formatter := `{
"msgtype": "text",
"text": {
"content":"%s",
},
"at": {
"atMobiles":[],
"isAtAll": false
}
}
`
content := fmt.Sprintf(formatter, msg + "[" + time.Now().String() + "]")
payload := []byte(content)
resp, err := http.Post(webhook, "application/json", bytes.NewBuffer(payload))
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Println(string(body))
}
type Message struct {
Sessionid string
Anchorid string
Msg string
}
func RedisPublish(info string) {
client, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {
log.Fatal(err)
return
}
defer client.Close()
resp, err := client.Do("publish", "channel", info)
if err != nil {
// TODO 跳转到监控报警
log.Fatal(err)
return
}
fmt.Println(resp)
}
func main() {
manager := NewDelayMessage()
go manager.Start()
app := iris.New()
app.Get("/hello", func(context iris.Context) {
context.WriteString("pong")
})
app.Get("/publish", func(context iris.Context) {
msg := context.FormValue("msg")
seconds, _ := strconv.Atoi(context.FormValue("seconds"))
catagory, _ := strconv.Atoi(context.FormValue("catagory"))
fmt.Println("get params: " + msg)
if catagory != TASK_TYPE_DELAY || catagory != TASK_TYPE_INTERVAL {
catagory = TASK_TYPE_DELAY
}
manager.AddTask(seconds, "test1", catagory, func(args ...interface{}) {
httpPost(args[0].(string), DINGTALK_WEBHOOK)
}, []interface{}{msg})
context.WriteString("Added Succeed!" + time.Now().String())
})
app.Get("/delay", func(ctx iris.Context) {
message := Message{
Sessionid: ctx.FormValue("sessionid"),
Anchorid: ctx.FormValue("anchorid"),
Msg:ctx.FormValue("msg"),
}
jsondata, err := json.Marshal(&message)
if err != nil {
ctx.WriteString(err.Error())
}
RedisPublish(string(jsondata))
ctx.WriteString(string(jsondata))
})
app.Run(iris.Addr(":8080"))
}% ➜
测试
开启服务 go run delayring.go , 然后在浏览器中访问服务,大致含义是3秒后触发一个 timeout 事件,触发钉钉机器人消息推送。
发布延迟任务
➜ asyncdemos go run delayring.go
Now listening on: http://localhost:8080
Application started. Press CMD+C to shut down.
2018-11-19 11:35:24, [0]
get params: 难受
key: test1, cycle: 0, index: 4 , curIndex: 1, subseconds: 3
2018-11-19 11:35:25, [1]
2018-11-19 11:35:26, [2]
2018-11-19 11:35:27, [3]
CURINDEX[4], key: test1, cyclenum: 0
{"errmsg":"ok","errcode":0}
2018-11-19 11:35:28, [4]
2018-11-19 11:35:29, [5]
2018-11-19 11:35:30, [6]
2018-11-19 11:35:31, [7]
^C[ERRO] 2018/11/19 11:35 http: Server closed
机器人消息推送监控结果
谈谈感受
- 仔细看测试结果,发现时间戳和对应执行时间戳还是可以对的上的。但是有一个极大的弊端就是数据。万一服务挂掉了,数据就会全部丢掉,这是不能容忍的。
- 代码可维护性也较低,当然了,代码没做啥涉及,封装的不够完善。
- 引入了 额外的服务 , 导致整个系统的可维护性降低,增大了服务宕机的危险。
- 语言相关性较强,对非golang的业务程序有一定的门槛。
借助第三方库
python的tornado一向以异步高效率著称,异步对它来说就是个普通的业务。所以我们无需考虑具体的实现细节,专注于业务逻辑即可。那么今天咱也来试试水。
代码实现
很幸运的一下子就搜到了对应的demo,如下:
➜ asyncdemos cat demo.py
#coding: utf8
__author__ = "郭 璞"
__email__ = "marksinoberg@gmail.com"
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.httpclient
import tornado.gen
from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor
import time
from tornado.options import define, options
define("port", default=8002, help="run on the port", type=int)
class SleepHandler(tornado.web.RequestHandler):
executor = ThreadPoolExecutor(2)
def get(self):
seconds = self.request.arguments.get("seconds", 10)
tornado.ioloop.IOLoop.instance().add_callback(self.sleep, seconds)
self.write("when i sleep")
@run_on_executor
def sleep(self, seconds):
print(time.time())
time.sleep(5)
print("yes", seconds)
print(time.time())
return seconds
if __name__ == "__main__":
# tornado.options.parse_command_line()
app = tornado.web.Application(
handlers=[(r"/sleep", SleepHandler), ])
http_server = tornado.httpserver.HTTPServer(app)
http_server.listen(8002)
tornado.ioloop.IOLoop.instance().start()%
运行服务: python demo.py , 然后访问服务:
访问tornado服务
查看下输出结果
服务输出结果
这里使用了默认值参数,所以可以看出也是正确的,服务在第三秒后得到了触发并进行了对应的执行操作。
谈谈感受
- 库支持,无需考虑底层细节,专注于业务流程即可。
- 面临着和自己写定时器一样的问题,那就是数据的同步,以及错误恢复等。
- 引入了第三方服务,系统可维护性以及宕机的可能性变大。
借助开源软件
在和周围人的讨论中,发现 延迟执行 的一个解决方案就是采用消息队列。比如beanstalk和rabbitMQ等。我没去调研rabbitMQ怎么用,这块内容挺大的,光是那一大坨的配置文件就让人头大,所以我倾向于使用beanstalk。
配置环境
用之前进行安装, 启动即可。
# 安装 sudo apt-get install beanstalkd # 启动, 并后台运行。如果觉得不保险,还可以用nohup的形式 beanstalkd -l 127.0.0.1 -p 12345 &
使用的细节可以参考下面的这篇文章。 PHP使用Beanstalkd消息队列
我这里问了方便自己看下原型效果,就用 python 简单写写了。开始之前记得安装beanstalk的依赖库 beanstalkc ,
pip install beanstalkc
代码实现
先来看看生产者。
guo@Server218:/tmp$ cat beanstalkdemo.py
#!/usr/bin python
import beanstalkc
import time
conn = beanstalkc.Connection(host="localhost", port=12345)
print(conn.tubes())
print(conn.stats())
conn.use("default")
ts = time.time()
handletime = ts + 10
conn.put("helloworld" + str(ts) + ", handletime:" + str(handletime), 1, 10)
print("putted")
再来看看消费者。
guo@Server218:/tmp$ cat consumer.py
#!/usr/bin python
import beanstalkc
import time
conn = beanstalkc.Connection(host="localhost", port=12345)
conn.use("default")
job = conn.reserve()
print(job.body)
job.delete()
ts = time.time()
print("CONSUME DONE: " + str(ts))
测试
- 先运行生产者。
guo@Server218:/tmp$ python beanstalkdemo.py
['default']
{'current-connections': 1, 'max-job-size': 65535, 'cmd-release': 0, 'cmd-reserve': 0, 'pid': 8384, 'cmd-bury': 0, 'current-producers': 0, 'total-jobs': 0, 'current-jobs-ready': 0, 'cmd-peek-buried': 0, 'current-tubes': 1, 'id': 'b0b7cf3b44c2e296', 'current-jobs-delayed': 0, 'uptime': 2, 'cmd-watch': 0, 'hostname': 'Server218', 'job-timeouts': 0, 'cmd-stats': 1, 'rusage-stime': 0.0, 'version': 1.1, 'current-jobs-reserved': 0, 'current-jobs-buried': 0, 'cmd-reserve-with-timeout': 0, 'cmd-put': 0, 'cmd-pause-tube': 0, 'cmd-list-tubes-watched': 0, 'cmd-list-tubes': 1, 'current-workers': 0, 'cmd-list-tube-used': 0, 'cmd-ignore': 0, 'binlog-records-migrated': 0, 'current-waiting': 0, 'cmd-peek': 0, 'cmd-peek-ready': 0, 'cmd-peek-delayed': 0, 'cmd-touch': 0, 'binlog-oldest-index': 0, 'binlog-current-index': 0, 'cmd-use': 0, 'total-connections': 1, 'cmd-delete': 0, 'binlog-max-size': 10485760, 'cmd-stats-job': 0, 'rusage-utime': 0.0, 'cmd-stats-tube': 0, 'binlog-records-written': 0, 'cmd-kick': 0, 'current-jobs-urgent': 0}
putted
- 跑一下消费者,看看效果。
guo@Server218:/tmp$ python consumer.py helloworld1542605160.63, handletime:1542605170.63 CONSUME DONE: 1542605172.33
从上面可以看出, 延迟执行 的目标已经实现了。
谈谈感受
- 引入了第三方服务,造成了维护成本的增加。
- 解耦性比较好,语言无关。
- 数据可较好的保存,不至于丢失数据,容错性好。
总结
调研了这么多,发现每一个都有自己的优缺点吧,没有说哪一个是最好的选择,只能算是合适的场景选择合适的服务。
且行且思
以上所述就是小编给大家介绍的《几种延迟任务的实现思路》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- RabbitMQ延迟消息的延迟极限是多少?
- RabbitMQ延迟消息的延迟极限是多少?
- 延迟静态绑定——static
- RabbitMQ实现延迟队列
- mybatis 延迟加载
- mybatis教程--延迟加载详解
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Learning Vue.js 2
Olga Filipova / Packt Publishing / 2017-1-5 / USD 41.99
About This Book Learn how to propagate DOM changes across the website without writing extensive jQuery callbacks code.Learn how to achieve reactivity and easily compose views with Vue.js and unders......一起来看看 《Learning Vue.js 2》 这本书的介绍吧!