几种延迟任务的实现思路

栏目: Python · 发布时间: 6年前

内容简介:最近有个延迟执行的任务需求,比如发了一个定时红包,服务器不能相信客户端的一切,所以就得做时间的同步,但是PHP相对来讲不是很适合做这种“XX秒后去执行一个什么样的动作这类的行为”,但是这个功能又是不可缺少的,然后就周末花时间调研了下相关的实现。大致有如下几种:下面针对这几点一一做下简单的实现, 然后考虑到可维护性, 数据丢失后怎么恢复,服务监控等一系列问题。最后选择一个场景上来说更合适的吧。在正式使用Redis来实现这一延迟需求之前,我还了解到Redis的key notification事件提醒,可以在某

前言

最近有个延迟执行的任务需求,比如发了一个定时红包,服务器不能相信客户端的一切,所以就得做时间的同步,但是 PHP 相对来讲不是很适合做这种“XX秒后去执行一个什么样的动作这类的行为”,但是这个功能又是不可缺少的,然后就周末花时间调研了下相关的实现。大致有如下几种:

  • 借助 Redis 的sorted_set和hash结构
  • 自己写一个定时器,不断“轮询”触发
  • 借助语言的异步库
  • 借助消息队列等服务。

下面针对这几点一一做下简单的实现, 然后考虑到可维护性, 数据丢失后怎么恢复,服务监控等一系列问题。最后选择一个场景上来说更合适的吧。

借助Redis实现

在正式使用Redis来实现这一延迟需求之前,我还了解到Redis的key notification事件提醒,可以在某一个key过期的时候触发一个动作,这对于我们做延迟任务来讲,的确是很好的一个契机,但是打开了它就会不可避免的造成效率上的降低,而且线上服务器一般不会再去修改了,因此这个特性,自己了解下,玩玩就行了。具体的实现还是得老老实实设计数据结构了。

结构涉及

我的做法是 QUEUE 加上 CONTAINER。即会有一个根据时间不断往前移动的时间轴作为我们的队列,然后在队列上每一个时间戳,作为一个链表往外散发,保存多个task。

几种延迟任务的实现思路

涉及描述

生产者productor.php

guo@Server218:/tmp$ cat productor.php
<?php

$redis = new Redis();
$redis->connect("localhost", 6379);
$redis->select(2);
$QUEUE = "asyncqueue:zset";
$SERIALIZER = "serialize:hash";
// 模拟生产延迟消息
for($index=0; $index<10; $index++) {
    // 每秒可能会产生多条数据,但是只要“当秒”有数据,就需要添加到queue中
    $ts = time();
    $cursecond = rand(0, 9) % 2 == 0;
    $tasklength = rand(0, 9) % 3;
    if($cursecond == true) {
        // 当前秒有task
        $redis->zadd($QUEUE, $ts, $ts);
        if($tasklength > 0) {
            for($i=0; $i<$tasklength;$i++) {
                $key = "2614677&".rand(0, 100000);
                $redis->hset($SERIALIZER.":".$ts, $key, $key);
                echo "[{$ts}] cursecond:{$cursecond}, KEY:{$key}\n";
            }
        }
    }
    sleep($tasklength);
}

消费者consumer.php

guo@Server218:/tmp$ cat consumer.php
<?php
$redis = new Redis();
$redis->connect("localhost", 6379);
$redis->select(2);
$QUEUE = "asyncqueue:zset";
$SERIALIZER = "serialize:hash";
$counter = 0;
while(true) {
    $ts = 1542596034 + $counter;
    $counter++;
    $ret = $redis->zrangebyscore($QUEUE, $ts, $ts, array("WITHSCORES"=>true));
    // 获取下具体的task并执行
    $items = $redis->hgetall($SERIALIZER.":".$ts);
    foreach($items as $key=>$member) {
        echo "CONSUMER[{$ts}]\t[{$key}]\t{$member}\n";
    }
    if($counter>=10) {
        break;
    }
}

测试

先来看看生产的具体内容。

guo@Server218:/tmp$ vim productor.php
[1542596034] cursecond:1, KEY:2614677&46685
[1542596034] cursecond:1, KEY:2614677&99086
[1542596036] cursecond:1, KEY:2614677&38241
[1542596037] cursecond:1, KEY:2614677&74988
[1542596038] cursecond:1, KEY:2614677&69443
[1542596038] cursecond:1, KEY:2614677&25523
[1542596040] cursecond:1, KEY:2614677&29642
[1542596040] cursecond:1, KEY:2614677&15928
[1542596042] cursecond:1, KEY:2614677&91626
[1542596042] cursecond:1, KEY:2614677&7382

Press ENTER or type command to continue

然后看看消费者是否正确消费。

Press ENTER or type command to continue
CONSUMER[1542596034]    [2614677&46685] 2614677&46685
CONSUMER[1542596034]    [2614677&99086] 2614677&99086
CONSUMER[1542596036]    [2614677&38241] 2614677&38241
CONSUMER[1542596037]    [2614677&74988] 2614677&74988
CONSUMER[1542596038]    [2614677&69443] 2614677&69443
CONSUMER[1542596038]    [2614677&25523] 2614677&25523
CONSUMER[1542596040]    [2614677&29642] 2614677&29642
CONSUMER[1542596040]    [2614677&15928] 2614677&15928
CONSUMER[1542596042]    [2614677&91626] 2614677&91626
CONSUMER[1542596042]    [2614677&7382]  2614677&7382

Press ENTER or type command to continue

谈谈看法

  • 利用Redis来实现,可以看出对Redis服务器的QPS会有一个微幅提升,这个问题可以通过multi管道来稍微优化下,这里就不多说了。
  • 数据不会丢,这样即便是服务挂掉也能将未消费的任务进行恢复。
  • 服务监控以及可维护性尚佳,基于Redis,稳定性能得到保证。
  • 不用切换语言,易于实现,也无需增加额外的中间件,减少了维护工作。

定时器⏲

原理

在网上搜索相关实现的时候,搜到一篇不错的文章。 golang实现延迟消息的原理与方法 不错的文章,核心思路就在于下面这张图了。

几种延迟任务的实现思路

定时器原理

代码实现

原文代码中有一个bug,就是在 执行任务轮询 的时候没有做休眠,会导致服务一直全速前进,这不太好。修改后的代码如下:

➜  asyncdemos cat delayring.go
package main

import (
    "time"
    "errors"
    "fmt"
    "github.com/kataras/iris"
    "net/http"
    "bytes"
    "log"
    "io/ioutil"
    "encoding/json"
    "github.com/garyburd/redigo/redis"
    "strconv"
)

const (
    TASK_TYPE_INTERVAL = 1
    TASK_TYPE_DELAY = 2
    QUEUE_LENGTH = 10
    DINGTALK_WEBHOOK = "https://oapi.dingtalk.com/robot/send?access_token=b716e1f9f2f7d4fb93d4bb79db65a117d589f886d1757"

)

//延迟消息
type DelayMessage struct {
    //当前下标
    curIndex int;
    //环形槽
    slots [QUEUE_LENGTH]map[string]*Task;
    //关闭
    closed chan bool;
    //任务关闭
    taskClose chan bool;
    //时间关闭
    timeClose chan bool;
    //启动时间
    startTime time.Time;
}

//执行的任务函数
type TaskFunc func(args ...interface{});

//任务
type Task struct {
    //循环次数
    cycleNum int;
    //执行的函数
    exec   TaskFunc;
    params []interface{};
    catagory int
}

//创建一个延迟消息
func NewDelayMessage() *DelayMessage {
    dm := &DelayMessage{
        curIndex:  0,
        closed:    make(chan bool),
        taskClose: make(chan bool),
        timeClose: make(chan bool),
        startTime: time.Now(),
    };
    for i := 0; i < QUEUE_LENGTH; i++ {
        dm.slots[i] = make(map[string]*Task);
    }
    return dm;
}

//启动延迟消息
func (dm *DelayMessage) Start() {
    go dm.taskLoop();
    go dm.timeLoop();
    select {
    case <-dm.closed:
        {
            dm.taskClose <- true;
            dm.timeClose <- true;
            break;
        }
    };
}

//关闭延迟消息
func (dm *DelayMessage) Close() {
    dm.closed <- true;
}

//处理每1秒的任务
func (dm *DelayMessage) taskLoop() {
    defer func() {
        fmt.Println("taskLoop exit");
    }();
    for {
        // TODO 看看怎么优化比较合适,要不加这个的话,程序会执行超过一次
        time.Sleep(time.Second)
        select {
        case <-dm.taskClose:
            {
                return;
            }
        default:
            {
                //取出当前的槽的任务
                tasks := dm.slots[dm.curIndex];
                if len(tasks) > 0 {
                    //遍历任务,判断任务循环次数等于0,则运行任务
                    //否则任务循环次数减1
                    for k, v := range tasks {
                        if v.cycleNum == 0 {
                            fmt.Printf("\t\t\t\t\tCURINDEX[%v], key: %v, cyclenum: %v\n", dm.curIndex, k, v.cycleNum)
                            go v.exec(v.params...);
                            //删除运行过的任务 对于catagory=1的周期性任务不予删除
                            if v.catagory != TASK_TYPE_INTERVAL {
                                delete(tasks, k)
                            }
                        } else {
                            v.cycleNum--;
                        }
                    }
                }
            }
        }
    }
}

//处理每1秒移动下标
func (dm *DelayMessage) timeLoop() {
    defer func() {
        fmt.Println("timeLoop exit");
    }();
    tick := time.NewTicker(time.Second);
    for {
        select {
        case <-dm.timeClose:
            {
                return;
            }
        case <-tick.C:
            {
                fmt.Printf("%v, [%v]\n", time.Now().Format("2006-01-02 15:04:05"), dm.curIndex);
                //fmt.Println(dm.slots)
                //判断当前下标,如果等于3599则重置为0,否则加1
                if dm.curIndex == QUEUE_LENGTH - 1 {
                    dm.curIndex = 0;
                } else {
                    dm.curIndex++;
                }
            }
        }
    }
}

//添加任务
//func (dm *DelayMessage) AddTask(t time.Time, key string, catagory int, exec TaskFunc, params []interface{}) error {
func (dm *DelayMessage) AddTask(seconds int, key string, catagory int, exec TaskFunc, params []interface{}) error {
    //if dm.startTime.After(t) {
    //  return errors.New("时间错误");
    //}
    //当前时间与指定时间相差秒数
    //subSecond := t.Unix() - dm.startTime.Unix();
    //subSecond := int(t.Unix() - time.Now().Unix());
    subSecond := seconds
    //计算循环次数
    cycleNum := int(subSecond / QUEUE_LENGTH);
    //计算任务所在的slots的下标
    ix := (subSecond + dm.curIndex ) % QUEUE_LENGTH ;
    fmt.Printf("\t\t\t\t\t key: %v, cycle: %v, index: %v , curIndex: %v, subseconds: %v\n", key, cycleNum, ix, dm.curIndex, subSecond)
    //把任务加入tasks中
    tasks := dm.slots[ix];
    if _, ok := tasks[key]; ok {
        return errors.New("该slots中已存在key为" + key + "的任务");
    }
    tasks[key] = &Task{
        cycleNum: cycleNum,
        exec:     exec,
        params:   params,
        catagory: catagory,
    };
    // TODO 持久化部分,这样即便中途crash,下次重启也能得到及时的恢复
    return nil;
}

func (dm *DelayMessage) DeleteTask(key string) error {
    tasks := dm.slots[dm.curIndex]
    if _, ok := tasks[key]; ok {
        delete(tasks, key)
    }
    return nil
}

//func main() {
    //创建延迟消息
    //dm := NewDelayMessage();
    ////添加任务
    //dm.AddTask(time.Now().Add(time.Second*2), "test1", TASK_TYPE_DELAY, func(args ...interface{}) {
    //  fmt.Println(args...);
    //}, []interface{}{1, 2, 3});
    //dm.AddTask(time.Now().Add(time.Second*4), "test2", TASK_TYPE_DELAY,  func(args ...interface{}) {
    //  fmt.Println(args...);
    //}, []interface{}{4, 5, 6});
    //dm.AddTask(time.Now().Add(time.Second*12), "test3", TASK_TYPE_DELAY, func(args ...interface{}) {
    //  fmt.Println(args...);
    //}, []interface{}{"hello", "world", "test"});
    //dm.AddTask(time.Now().Add(time.Second), "test4", TASK_TYPE_INTERVAL, func(args ...interface{}) {
    //  fmt.Printf("操你妈", args...)
    //}, []interface{}{1, 2, 3});
    //
    ////40秒后关闭
    ////time.AfterFunc(time.Second*2, func() {
    ////    //dm.Close();
    ////});
    //dm.Start();
//}


var mamager DelayMessage


//func publish(manager *DelayMessage, seconds int, key string, exec TaskFunc, params []interface{}) error {
//  manager.AddTask(time.Now().Add(time.Second * time.Duration(seconds)), key, TASK_TYPE_DELAY, func(args... interface{}) {
//      fmt.Println("key: " + key)
//  }, params)
//
//  return nil
//}

func httpPost(msg string, webhook string) {
    formatter := `{
            "msgtype": "text",
            "text": {
                "content":"%s",
            },
            "at": {
                "atMobiles":[],
                "isAtAll": false
           }
        }
    `
    content := fmt.Sprintf(formatter, msg + "[" + time.Now().String() + "]")
    payload := []byte(content)
    resp, err := http.Post(webhook, "application/json", bytes.NewBuffer(payload))
    if err != nil {
        log.Fatal(err)
    }
    defer resp.Body.Close()
    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println(string(body))
}

type Message struct {
    Sessionid string
    Anchorid string
    Msg string
}

func RedisPublish(info string) {
    client, err := redis.Dial("tcp", "127.0.0.1:6379")
    if err != nil {
        log.Fatal(err)
        return
    }
    defer client.Close()
    resp, err := client.Do("publish", "channel", info)
    if err != nil {
        // TODO 跳转到监控报警
        log.Fatal(err)
        return
    }
    fmt.Println(resp)
}

func main() {
    manager := NewDelayMessage()
    go manager.Start()
    app := iris.New()
    app.Get("/hello", func(context iris.Context) {
        context.WriteString("pong")
    })
    app.Get("/publish", func(context iris.Context) {
        msg := context.FormValue("msg")
        seconds, _ := strconv.Atoi(context.FormValue("seconds"))
        catagory, _ := strconv.Atoi(context.FormValue("catagory"))
        fmt.Println("get params: " + msg)
        if catagory != TASK_TYPE_DELAY || catagory != TASK_TYPE_INTERVAL {
            catagory = TASK_TYPE_DELAY
        }
        manager.AddTask(seconds, "test1", catagory, func(args ...interface{}) {
            httpPost(args[0].(string), DINGTALK_WEBHOOK)
        }, []interface{}{msg})
        context.WriteString("Added Succeed!" + time.Now().String())
    })
    app.Get("/delay", func(ctx iris.Context) {
        message := Message{
            Sessionid: ctx.FormValue("sessionid"),
            Anchorid: ctx.FormValue("anchorid"),
            Msg:ctx.FormValue("msg"),
        }
        jsondata, err := json.Marshal(&message)
        if err != nil {
            ctx.WriteString(err.Error())
        }
        RedisPublish(string(jsondata))
        ctx.WriteString(string(jsondata))

    })
    app.Run(iris.Addr(":8080"))

}%                                                                                                                                                                              ➜

测试

开启服务 go run delayring.go , 然后在浏览器中访问服务,大致含义是3秒后触发一个 timeout 事件,触发钉钉机器人消息推送。

几种延迟任务的实现思路

发布延迟任务

➜  asyncdemos go run delayring.go
Now listening on: http://localhost:8080
Application started. Press CMD+C to shut down.
2018-11-19 11:35:24, [0]
get params: 难受
                     key: test1, cycle: 0, index: 4 , curIndex: 1, subseconds: 3
2018-11-19 11:35:25, [1]
2018-11-19 11:35:26, [2]
2018-11-19 11:35:27, [3]
                    CURINDEX[4], key: test1, cyclenum: 0
{"errmsg":"ok","errcode":0}
2018-11-19 11:35:28, [4]
2018-11-19 11:35:29, [5]
2018-11-19 11:35:30, [6]
2018-11-19 11:35:31, [7]
^C[ERRO] 2018/11/19 11:35 http: Server closed
几种延迟任务的实现思路

机器人消息推送监控结果

谈谈感受

  • 仔细看测试结果,发现时间戳和对应执行时间戳还是可以对的上的。但是有一个极大的弊端就是数据。万一服务挂掉了,数据就会全部丢掉,这是不能容忍的。
  • 代码可维护性也较低,当然了,代码没做啥涉及,封装的不够完善。
  • 引入了 额外的服务 , 导致整个系统的可维护性降低,增大了服务宕机的危险。
  • 语言相关性较强,对非golang的业务程序有一定的门槛。

借助第三方库

python的tornado一向以异步高效率著称,异步对它来说就是个普通的业务。所以我们无需考虑具体的实现细节,专注于业务逻辑即可。那么今天咱也来试试水。

代码实现

很幸运的一下子就搜到了对应的demo,如下:

➜  asyncdemos cat demo.py
#coding: utf8
__author__ = "郭 璞"
__email__ = "marksinoberg@gmail.com"

import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.httpclient
import tornado.gen
from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor
import time
from tornado.options import define, options

define("port", default=8002, help="run on the port", type=int)

class SleepHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(2)

    def get(self):
        seconds = self.request.arguments.get("seconds", 10)
        tornado.ioloop.IOLoop.instance().add_callback(self.sleep, seconds)
        self.write("when i sleep")

    @run_on_executor
    def sleep(self, seconds):
        print(time.time())
        time.sleep(5)
        print("yes", seconds)
        print(time.time())
        return seconds

if __name__ == "__main__":
    # tornado.options.parse_command_line()
    app = tornado.web.Application(
        handlers=[(r"/sleep", SleepHandler), ])
    http_server = tornado.httpserver.HTTPServer(app)
    http_server.listen(8002)
    tornado.ioloop.IOLoop.instance().start()%

运行服务: python demo.py , 然后访问服务:

几种延迟任务的实现思路

访问tornado服务

查看下输出结果

几种延迟任务的实现思路

服务输出结果

这里使用了默认值参数,所以可以看出也是正确的,服务在第三秒后得到了触发并进行了对应的执行操作。

谈谈感受

  • 库支持,无需考虑底层细节,专注于业务流程即可。
  • 面临着和自己写定时器一样的问题,那就是数据的同步,以及错误恢复等。
  • 引入了第三方服务,系统可维护性以及宕机的可能性变大。

借助开源软件

在和周围人的讨论中,发现 延迟执行 的一个解决方案就是采用消息队列。比如beanstalk和rabbitMQ等。我没去调研rabbitMQ怎么用,这块内容挺大的,光是那一大坨的配置文件就让人头大,所以我倾向于使用beanstalk。

配置环境

用之前进行安装, 启动即可。

# 安装
sudo apt-get install beanstalkd
# 启动, 并后台运行。如果觉得不保险,还可以用nohup的形式
beanstalkd -l 127.0.0.1 -p 12345 &

使用的细节可以参考下面的这篇文章。 PHP使用Beanstalkd消息队列

我这里问了方便自己看下原型效果,就用 python 简单写写了。开始之前记得安装beanstalk的依赖库 beanstalkc

pip install beanstalkc

代码实现

先来看看生产者。

guo@Server218:/tmp$ cat beanstalkdemo.py
#!/usr/bin python
import beanstalkc
import time
conn = beanstalkc.Connection(host="localhost", port=12345)
print(conn.tubes())
print(conn.stats())
conn.use("default")
ts = time.time()
handletime = ts + 10
conn.put("helloworld" + str(ts) + ", handletime:" + str(handletime), 1, 10)
print("putted")

再来看看消费者。

guo@Server218:/tmp$ cat consumer.py
#!/usr/bin python
import beanstalkc
import time
conn = beanstalkc.Connection(host="localhost", port=12345)
conn.use("default")
job = conn.reserve()
print(job.body)
job.delete()
ts = time.time()
print("CONSUME DONE: " + str(ts))

测试

  • 先运行生产者。
guo@Server218:/tmp$ python beanstalkdemo.py
['default']
{'current-connections': 1, 'max-job-size': 65535, 'cmd-release': 0, 'cmd-reserve': 0, 'pid': 8384, 'cmd-bury': 0, 'current-producers': 0, 'total-jobs': 0, 'current-jobs-ready': 0, 'cmd-peek-buried': 0, 'current-tubes': 1, 'id': 'b0b7cf3b44c2e296', 'current-jobs-delayed': 0, 'uptime': 2, 'cmd-watch': 0, 'hostname': 'Server218', 'job-timeouts': 0, 'cmd-stats': 1, 'rusage-stime': 0.0, 'version': 1.1, 'current-jobs-reserved': 0, 'current-jobs-buried': 0, 'cmd-reserve-with-timeout': 0, 'cmd-put': 0, 'cmd-pause-tube': 0, 'cmd-list-tubes-watched': 0, 'cmd-list-tubes': 1, 'current-workers': 0, 'cmd-list-tube-used': 0, 'cmd-ignore': 0, 'binlog-records-migrated': 0, 'current-waiting': 0, 'cmd-peek': 0, 'cmd-peek-ready': 0, 'cmd-peek-delayed': 0, 'cmd-touch': 0, 'binlog-oldest-index': 0, 'binlog-current-index': 0, 'cmd-use': 0, 'total-connections': 1, 'cmd-delete': 0, 'binlog-max-size': 10485760, 'cmd-stats-job': 0, 'rusage-utime': 0.0, 'cmd-stats-tube': 0, 'binlog-records-written': 0, 'cmd-kick': 0, 'current-jobs-urgent': 0}
putted
  • 跑一下消费者,看看效果。
guo@Server218:/tmp$ python consumer.py
helloworld1542605160.63, handletime:1542605170.63
CONSUME DONE: 1542605172.33

从上面可以看出, 延迟执行 的目标已经实现了。

谈谈感受

  • 引入了第三方服务,造成了维护成本的增加。
  • 解耦性比较好,语言无关。
  • 数据可较好的保存,不至于丢失数据,容错性好。

总结

调研了这么多,发现每一个都有自己的优缺点吧,没有说哪一个是最好的选择,只能算是合适的场景选择合适的服务。

且行且思


以上所述就是小编给大家介绍的《几种延迟任务的实现思路》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Hacker's Delight

Hacker's Delight

Henry S. Warren Jr. / Addison-Wesley / 2002-7-27 / USD 59.99

A collection useful programming advice the author has collected over the years; small algorithms that make the programmer's task easier. * At long last, proven short-cuts to mastering difficult aspec......一起来看看 《Hacker's Delight》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具