内容简介:最近有个延迟执行的任务需求,比如发了一个定时红包,服务器不能相信客户端的一切,所以就得做时间的同步,但是PHP相对来讲不是很适合做这种“XX秒后去执行一个什么样的动作这类的行为”,但是这个功能又是不可缺少的,然后就周末花时间调研了下相关的实现。大致有如下几种:下面针对这几点一一做下简单的实现, 然后考虑到可维护性, 数据丢失后怎么恢复,服务监控等一系列问题。最后选择一个场景上来说更合适的吧。在正式使用Redis来实现这一延迟需求之前,我还了解到Redis的key notification事件提醒,可以在某
前言
最近有个延迟执行的任务需求,比如发了一个定时红包,服务器不能相信客户端的一切,所以就得做时间的同步,但是 PHP 相对来讲不是很适合做这种“XX秒后去执行一个什么样的动作这类的行为”,但是这个功能又是不可缺少的,然后就周末花时间调研了下相关的实现。大致有如下几种:
- 借助 Redis 的sorted_set和hash结构
- 自己写一个定时器,不断“轮询”触发
- 借助语言的异步库
- 借助消息队列等服务。
下面针对这几点一一做下简单的实现, 然后考虑到可维护性, 数据丢失后怎么恢复,服务监控等一系列问题。最后选择一个场景上来说更合适的吧。
借助Redis实现
在正式使用Redis来实现这一延迟需求之前,我还了解到Redis的key notification事件提醒,可以在某一个key过期的时候触发一个动作,这对于我们做延迟任务来讲,的确是很好的一个契机,但是打开了它就会不可避免的造成效率上的降低,而且线上服务器一般不会再去修改了,因此这个特性,自己了解下,玩玩就行了。具体的实现还是得老老实实设计数据结构了。
结构涉及
我的做法是 QUEUE 加上 CONTAINER。即会有一个根据时间不断往前移动的时间轴作为我们的队列,然后在队列上每一个时间戳,作为一个链表往外散发,保存多个task。
涉及描述
生产者productor.php
guo@Server218:/tmp$ cat productor.php <?php $redis = new Redis(); $redis->connect("localhost", 6379); $redis->select(2); $QUEUE = "asyncqueue:zset"; $SERIALIZER = "serialize:hash"; // 模拟生产延迟消息 for($index=0; $index<10; $index++) { // 每秒可能会产生多条数据,但是只要“当秒”有数据,就需要添加到queue中 $ts = time(); $cursecond = rand(0, 9) % 2 == 0; $tasklength = rand(0, 9) % 3; if($cursecond == true) { // 当前秒有task $redis->zadd($QUEUE, $ts, $ts); if($tasklength > 0) { for($i=0; $i<$tasklength;$i++) { $key = "2614677&".rand(0, 100000); $redis->hset($SERIALIZER.":".$ts, $key, $key); echo "[{$ts}] cursecond:{$cursecond}, KEY:{$key}\n"; } } } sleep($tasklength); }
消费者consumer.php
guo@Server218:/tmp$ cat consumer.php <?php $redis = new Redis(); $redis->connect("localhost", 6379); $redis->select(2); $QUEUE = "asyncqueue:zset"; $SERIALIZER = "serialize:hash"; $counter = 0; while(true) { $ts = 1542596034 + $counter; $counter++; $ret = $redis->zrangebyscore($QUEUE, $ts, $ts, array("WITHSCORES"=>true)); // 获取下具体的task并执行 $items = $redis->hgetall($SERIALIZER.":".$ts); foreach($items as $key=>$member) { echo "CONSUMER[{$ts}]\t[{$key}]\t{$member}\n"; } if($counter>=10) { break; } }
测试
先来看看生产的具体内容。
guo@Server218:/tmp$ vim productor.php [1542596034] cursecond:1, KEY:2614677&46685 [1542596034] cursecond:1, KEY:2614677&99086 [1542596036] cursecond:1, KEY:2614677&38241 [1542596037] cursecond:1, KEY:2614677&74988 [1542596038] cursecond:1, KEY:2614677&69443 [1542596038] cursecond:1, KEY:2614677&25523 [1542596040] cursecond:1, KEY:2614677&29642 [1542596040] cursecond:1, KEY:2614677&15928 [1542596042] cursecond:1, KEY:2614677&91626 [1542596042] cursecond:1, KEY:2614677&7382 Press ENTER or type command to continue
然后看看消费者是否正确消费。
Press ENTER or type command to continue CONSUMER[1542596034] [2614677&46685] 2614677&46685 CONSUMER[1542596034] [2614677&99086] 2614677&99086 CONSUMER[1542596036] [2614677&38241] 2614677&38241 CONSUMER[1542596037] [2614677&74988] 2614677&74988 CONSUMER[1542596038] [2614677&69443] 2614677&69443 CONSUMER[1542596038] [2614677&25523] 2614677&25523 CONSUMER[1542596040] [2614677&29642] 2614677&29642 CONSUMER[1542596040] [2614677&15928] 2614677&15928 CONSUMER[1542596042] [2614677&91626] 2614677&91626 CONSUMER[1542596042] [2614677&7382] 2614677&7382 Press ENTER or type command to continue
谈谈看法
- 利用Redis来实现,可以看出对Redis服务器的QPS会有一个微幅提升,这个问题可以通过multi管道来稍微优化下,这里就不多说了。
- 数据不会丢,这样即便是服务挂掉也能将未消费的任务进行恢复。
- 服务监控以及可维护性尚佳,基于Redis,稳定性能得到保证。
- 不用切换语言,易于实现,也无需增加额外的中间件,减少了维护工作。
定时器⏲
原理
在网上搜索相关实现的时候,搜到一篇不错的文章。 golang实现延迟消息的原理与方法 不错的文章,核心思路就在于下面这张图了。
定时器原理
代码实现
原文代码中有一个bug,就是在 执行任务轮询 的时候没有做休眠,会导致服务一直全速前进,这不太好。修改后的代码如下:
➜ asyncdemos cat delayring.go package main import ( "time" "errors" "fmt" "github.com/kataras/iris" "net/http" "bytes" "log" "io/ioutil" "encoding/json" "github.com/garyburd/redigo/redis" "strconv" ) const ( TASK_TYPE_INTERVAL = 1 TASK_TYPE_DELAY = 2 QUEUE_LENGTH = 10 DINGTALK_WEBHOOK = "https://oapi.dingtalk.com/robot/send?access_token=b716e1f9f2f7d4fb93d4bb79db65a117d589f886d1757" ) //延迟消息 type DelayMessage struct { //当前下标 curIndex int; //环形槽 slots [QUEUE_LENGTH]map[string]*Task; //关闭 closed chan bool; //任务关闭 taskClose chan bool; //时间关闭 timeClose chan bool; //启动时间 startTime time.Time; } //执行的任务函数 type TaskFunc func(args ...interface{}); //任务 type Task struct { //循环次数 cycleNum int; //执行的函数 exec TaskFunc; params []interface{}; catagory int } //创建一个延迟消息 func NewDelayMessage() *DelayMessage { dm := &DelayMessage{ curIndex: 0, closed: make(chan bool), taskClose: make(chan bool), timeClose: make(chan bool), startTime: time.Now(), }; for i := 0; i < QUEUE_LENGTH; i++ { dm.slots[i] = make(map[string]*Task); } return dm; } //启动延迟消息 func (dm *DelayMessage) Start() { go dm.taskLoop(); go dm.timeLoop(); select { case <-dm.closed: { dm.taskClose <- true; dm.timeClose <- true; break; } }; } //关闭延迟消息 func (dm *DelayMessage) Close() { dm.closed <- true; } //处理每1秒的任务 func (dm *DelayMessage) taskLoop() { defer func() { fmt.Println("taskLoop exit"); }(); for { // TODO 看看怎么优化比较合适,要不加这个的话,程序会执行超过一次 time.Sleep(time.Second) select { case <-dm.taskClose: { return; } default: { //取出当前的槽的任务 tasks := dm.slots[dm.curIndex]; if len(tasks) > 0 { //遍历任务,判断任务循环次数等于0,则运行任务 //否则任务循环次数减1 for k, v := range tasks { if v.cycleNum == 0 { fmt.Printf("\t\t\t\t\tCURINDEX[%v], key: %v, cyclenum: %v\n", dm.curIndex, k, v.cycleNum) go v.exec(v.params...); //删除运行过的任务 对于catagory=1的周期性任务不予删除 if v.catagory != TASK_TYPE_INTERVAL { delete(tasks, k) } } else { v.cycleNum--; } } } } } } } //处理每1秒移动下标 func (dm *DelayMessage) timeLoop() { defer func() { fmt.Println("timeLoop exit"); }(); tick := time.NewTicker(time.Second); for { select { case <-dm.timeClose: { return; } case <-tick.C: { fmt.Printf("%v, [%v]\n", time.Now().Format("2006-01-02 15:04:05"), dm.curIndex); //fmt.Println(dm.slots) //判断当前下标,如果等于3599则重置为0,否则加1 if dm.curIndex == QUEUE_LENGTH - 1 { dm.curIndex = 0; } else { dm.curIndex++; } } } } } //添加任务 //func (dm *DelayMessage) AddTask(t time.Time, key string, catagory int, exec TaskFunc, params []interface{}) error { func (dm *DelayMessage) AddTask(seconds int, key string, catagory int, exec TaskFunc, params []interface{}) error { //if dm.startTime.After(t) { // return errors.New("时间错误"); //} //当前时间与指定时间相差秒数 //subSecond := t.Unix() - dm.startTime.Unix(); //subSecond := int(t.Unix() - time.Now().Unix()); subSecond := seconds //计算循环次数 cycleNum := int(subSecond / QUEUE_LENGTH); //计算任务所在的slots的下标 ix := (subSecond + dm.curIndex ) % QUEUE_LENGTH ; fmt.Printf("\t\t\t\t\t key: %v, cycle: %v, index: %v , curIndex: %v, subseconds: %v\n", key, cycleNum, ix, dm.curIndex, subSecond) //把任务加入tasks中 tasks := dm.slots[ix]; if _, ok := tasks[key]; ok { return errors.New("该slots中已存在key为" + key + "的任务"); } tasks[key] = &Task{ cycleNum: cycleNum, exec: exec, params: params, catagory: catagory, }; // TODO 持久化部分,这样即便中途crash,下次重启也能得到及时的恢复 return nil; } func (dm *DelayMessage) DeleteTask(key string) error { tasks := dm.slots[dm.curIndex] if _, ok := tasks[key]; ok { delete(tasks, key) } return nil } //func main() { //创建延迟消息 //dm := NewDelayMessage(); ////添加任务 //dm.AddTask(time.Now().Add(time.Second*2), "test1", TASK_TYPE_DELAY, func(args ...interface{}) { // fmt.Println(args...); //}, []interface{}{1, 2, 3}); //dm.AddTask(time.Now().Add(time.Second*4), "test2", TASK_TYPE_DELAY, func(args ...interface{}) { // fmt.Println(args...); //}, []interface{}{4, 5, 6}); //dm.AddTask(time.Now().Add(time.Second*12), "test3", TASK_TYPE_DELAY, func(args ...interface{}) { // fmt.Println(args...); //}, []interface{}{"hello", "world", "test"}); //dm.AddTask(time.Now().Add(time.Second), "test4", TASK_TYPE_INTERVAL, func(args ...interface{}) { // fmt.Printf("操你妈", args...) //}, []interface{}{1, 2, 3}); // ////40秒后关闭 ////time.AfterFunc(time.Second*2, func() { //// //dm.Close(); ////}); //dm.Start(); //} var mamager DelayMessage //func publish(manager *DelayMessage, seconds int, key string, exec TaskFunc, params []interface{}) error { // manager.AddTask(time.Now().Add(time.Second * time.Duration(seconds)), key, TASK_TYPE_DELAY, func(args... interface{}) { // fmt.Println("key: " + key) // }, params) // // return nil //} func httpPost(msg string, webhook string) { formatter := `{ "msgtype": "text", "text": { "content":"%s", }, "at": { "atMobiles":[], "isAtAll": false } } ` content := fmt.Sprintf(formatter, msg + "[" + time.Now().String() + "]") payload := []byte(content) resp, err := http.Post(webhook, "application/json", bytes.NewBuffer(payload)) if err != nil { log.Fatal(err) } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { log.Fatal(err) } fmt.Println(string(body)) } type Message struct { Sessionid string Anchorid string Msg string } func RedisPublish(info string) { client, err := redis.Dial("tcp", "127.0.0.1:6379") if err != nil { log.Fatal(err) return } defer client.Close() resp, err := client.Do("publish", "channel", info) if err != nil { // TODO 跳转到监控报警 log.Fatal(err) return } fmt.Println(resp) } func main() { manager := NewDelayMessage() go manager.Start() app := iris.New() app.Get("/hello", func(context iris.Context) { context.WriteString("pong") }) app.Get("/publish", func(context iris.Context) { msg := context.FormValue("msg") seconds, _ := strconv.Atoi(context.FormValue("seconds")) catagory, _ := strconv.Atoi(context.FormValue("catagory")) fmt.Println("get params: " + msg) if catagory != TASK_TYPE_DELAY || catagory != TASK_TYPE_INTERVAL { catagory = TASK_TYPE_DELAY } manager.AddTask(seconds, "test1", catagory, func(args ...interface{}) { httpPost(args[0].(string), DINGTALK_WEBHOOK) }, []interface{}{msg}) context.WriteString("Added Succeed!" + time.Now().String()) }) app.Get("/delay", func(ctx iris.Context) { message := Message{ Sessionid: ctx.FormValue("sessionid"), Anchorid: ctx.FormValue("anchorid"), Msg:ctx.FormValue("msg"), } jsondata, err := json.Marshal(&message) if err != nil { ctx.WriteString(err.Error()) } RedisPublish(string(jsondata)) ctx.WriteString(string(jsondata)) }) app.Run(iris.Addr(":8080")) }% ➜
测试
开启服务 go run delayring.go
, 然后在浏览器中访问服务,大致含义是3秒后触发一个 timeout 事件,触发钉钉机器人消息推送。
发布延迟任务
➜ asyncdemos go run delayring.go Now listening on: http://localhost:8080 Application started. Press CMD+C to shut down. 2018-11-19 11:35:24, [0] get params: 难受 key: test1, cycle: 0, index: 4 , curIndex: 1, subseconds: 3 2018-11-19 11:35:25, [1] 2018-11-19 11:35:26, [2] 2018-11-19 11:35:27, [3] CURINDEX[4], key: test1, cyclenum: 0 {"errmsg":"ok","errcode":0} 2018-11-19 11:35:28, [4] 2018-11-19 11:35:29, [5] 2018-11-19 11:35:30, [6] 2018-11-19 11:35:31, [7] ^C[ERRO] 2018/11/19 11:35 http: Server closed
机器人消息推送监控结果
谈谈感受
- 仔细看测试结果,发现时间戳和对应执行时间戳还是可以对的上的。但是有一个极大的弊端就是数据。万一服务挂掉了,数据就会全部丢掉,这是不能容忍的。
- 代码可维护性也较低,当然了,代码没做啥涉及,封装的不够完善。
- 引入了 额外的服务 , 导致整个系统的可维护性降低,增大了服务宕机的危险。
- 语言相关性较强,对非golang的业务程序有一定的门槛。
借助第三方库
python的tornado一向以异步高效率著称,异步对它来说就是个普通的业务。所以我们无需考虑具体的实现细节,专注于业务逻辑即可。那么今天咱也来试试水。
代码实现
很幸运的一下子就搜到了对应的demo,如下:
➜ asyncdemos cat demo.py #coding: utf8 __author__ = "郭 璞" __email__ = "marksinoberg@gmail.com" import tornado.httpserver import tornado.ioloop import tornado.options import tornado.web import tornado.httpclient import tornado.gen from tornado.concurrent import run_on_executor from concurrent.futures import ThreadPoolExecutor import time from tornado.options import define, options define("port", default=8002, help="run on the port", type=int) class SleepHandler(tornado.web.RequestHandler): executor = ThreadPoolExecutor(2) def get(self): seconds = self.request.arguments.get("seconds", 10) tornado.ioloop.IOLoop.instance().add_callback(self.sleep, seconds) self.write("when i sleep") @run_on_executor def sleep(self, seconds): print(time.time()) time.sleep(5) print("yes", seconds) print(time.time()) return seconds if __name__ == "__main__": # tornado.options.parse_command_line() app = tornado.web.Application( handlers=[(r"/sleep", SleepHandler), ]) http_server = tornado.httpserver.HTTPServer(app) http_server.listen(8002) tornado.ioloop.IOLoop.instance().start()%
运行服务: python demo.py
, 然后访问服务:
访问tornado服务
查看下输出结果
服务输出结果
这里使用了默认值参数,所以可以看出也是正确的,服务在第三秒后得到了触发并进行了对应的执行操作。
谈谈感受
- 库支持,无需考虑底层细节,专注于业务流程即可。
- 面临着和自己写定时器一样的问题,那就是数据的同步,以及错误恢复等。
- 引入了第三方服务,系统可维护性以及宕机的可能性变大。
借助开源软件
在和周围人的讨论中,发现 延迟执行 的一个解决方案就是采用消息队列。比如beanstalk和rabbitMQ等。我没去调研rabbitMQ怎么用,这块内容挺大的,光是那一大坨的配置文件就让人头大,所以我倾向于使用beanstalk。
配置环境
用之前进行安装, 启动即可。
# 安装 sudo apt-get install beanstalkd # 启动, 并后台运行。如果觉得不保险,还可以用nohup的形式 beanstalkd -l 127.0.0.1 -p 12345 &
使用的细节可以参考下面的这篇文章。 PHP使用Beanstalkd消息队列
我这里问了方便自己看下原型效果,就用 python 简单写写了。开始之前记得安装beanstalk的依赖库 beanstalkc ,
pip install beanstalkc
代码实现
先来看看生产者。
guo@Server218:/tmp$ cat beanstalkdemo.py #!/usr/bin python import beanstalkc import time conn = beanstalkc.Connection(host="localhost", port=12345) print(conn.tubes()) print(conn.stats()) conn.use("default") ts = time.time() handletime = ts + 10 conn.put("helloworld" + str(ts) + ", handletime:" + str(handletime), 1, 10) print("putted")
再来看看消费者。
guo@Server218:/tmp$ cat consumer.py #!/usr/bin python import beanstalkc import time conn = beanstalkc.Connection(host="localhost", port=12345) conn.use("default") job = conn.reserve() print(job.body) job.delete() ts = time.time() print("CONSUME DONE: " + str(ts))
测试
- 先运行生产者。
guo@Server218:/tmp$ python beanstalkdemo.py ['default'] {'current-connections': 1, 'max-job-size': 65535, 'cmd-release': 0, 'cmd-reserve': 0, 'pid': 8384, 'cmd-bury': 0, 'current-producers': 0, 'total-jobs': 0, 'current-jobs-ready': 0, 'cmd-peek-buried': 0, 'current-tubes': 1, 'id': 'b0b7cf3b44c2e296', 'current-jobs-delayed': 0, 'uptime': 2, 'cmd-watch': 0, 'hostname': 'Server218', 'job-timeouts': 0, 'cmd-stats': 1, 'rusage-stime': 0.0, 'version': 1.1, 'current-jobs-reserved': 0, 'current-jobs-buried': 0, 'cmd-reserve-with-timeout': 0, 'cmd-put': 0, 'cmd-pause-tube': 0, 'cmd-list-tubes-watched': 0, 'cmd-list-tubes': 1, 'current-workers': 0, 'cmd-list-tube-used': 0, 'cmd-ignore': 0, 'binlog-records-migrated': 0, 'current-waiting': 0, 'cmd-peek': 0, 'cmd-peek-ready': 0, 'cmd-peek-delayed': 0, 'cmd-touch': 0, 'binlog-oldest-index': 0, 'binlog-current-index': 0, 'cmd-use': 0, 'total-connections': 1, 'cmd-delete': 0, 'binlog-max-size': 10485760, 'cmd-stats-job': 0, 'rusage-utime': 0.0, 'cmd-stats-tube': 0, 'binlog-records-written': 0, 'cmd-kick': 0, 'current-jobs-urgent': 0} putted
- 跑一下消费者,看看效果。
guo@Server218:/tmp$ python consumer.py helloworld1542605160.63, handletime:1542605170.63 CONSUME DONE: 1542605172.33
从上面可以看出, 延迟执行 的目标已经实现了。
谈谈感受
- 引入了第三方服务,造成了维护成本的增加。
- 解耦性比较好,语言无关。
- 数据可较好的保存,不至于丢失数据,容错性好。
总结
调研了这么多,发现每一个都有自己的优缺点吧,没有说哪一个是最好的选择,只能算是合适的场景选择合适的服务。
且行且思
以上所述就是小编给大家介绍的《几种延迟任务的实现思路》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- RabbitMQ延迟消息的延迟极限是多少?
- RabbitMQ延迟消息的延迟极限是多少?
- 延迟静态绑定——static
- RabbitMQ实现延迟队列
- mybatis 延迟加载
- mybatis教程--延迟加载详解
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。