内容简介:本文将从源码实现上对 libgo 的调度策略进行分析,主要涉及到上一篇文章中的三个结构体的定义:三者的关系如下图所示:
本文将从源码实现上对 libgo 的调度策略进行分析,主要涉及到上一篇文章中的三个结构体的定义:
- 调度器 Scheduler(简称 S)
- 执行器 Processer(简称 P)
- 协程 Task(简称 T)
三者的关系如下图所示:
本文会列出类内的主要成员和主要函数做以分析。
1. 协程调度器:class Scheduler
libgo/scheduler/scheduler.h
class Scheduler{ public: /* * 创建一个调度器,初始化 libgo * 创建主线程的执行器,如果后续 STart 的时候没有参数,默认只有一个执行器去做 * 当仅使用一个线程进行协程调度时, 协程地执行会严格地遵循其创建顺序. * */ static Scheduler* Create(); /* * 创建一个协程 Task 对象,并添加到当前的执行器 processer 的任务队列中, * 调度器的任务数 taskCount_ +1 * */ void CreateTask(TaskF const& fn, TaskOpt const& opt); /* 启动调度器 * @minThreadNumber : 最小调度线程数, 为0时, 设置为cpu核心数. * @maxThreadNumber : 最大调度线程数, 为0时, 设置为minThreadNumber. * 如果maxThreadNumber大于minThreadNumber, 则当协程产生长时间阻塞时, * 可以自动扩展调度线程数. * 唤醒定时器线程 * 每个调度线程都会调用 Process 开始调度,最后开启 id 为 0 的调度线程 * 如果 maxThreadNumber_ > 1 的话,会开启调度线程 DispatcherThread * */ void Start(int minThreadNumber = 1, int maxThreadNumber = 0); /* * 停止调度,停止后无法恢复, 仅用于安全退出main函数 * 如果某个调度线程被协程阻塞, 必须等待阻塞结束才能退出. * */ void Stop(); private: /* * 调度线程,主要为平衡多个 processer 的负载将高负载或阻塞的 p 中的协程 steal 给低负载的 p * 如果全部阻塞但是还有协程待执行,会起新线程,线程数不超过 maxThreadNumber_ * 会将阻塞 P 中的协程分摊给负载较少的 P * */ void DispatcherThread(); /* * 创建一个新的 Processer,并添加到双端队列 processers_ 中 * */ void NewProcessThread(); private: atomic_t<uint32_t> taskCount_{0}; // 用来统计协程数量 Deque<Processer*> processers_; // DispatcherThread双端队列,用来存放所有的执行器,每个执行器都会单独开一个线程去执行,线程中回调 Process() 方法。 LFLock started_; // libgo 提供的自选锁 };
调度器负责管理 1~N 个调度线程,每个调度线程一个执行器 Processer。调度器仅负责均衡各个执行器的负载,防止全部卡住的情况,并不涉及协程的切换等工作。
使用
ligbo提供了默认的协程调度器 co_sched
#define g_Scheduler ::co::Scheduler::getInstance() #define co_sched g_Scheduler
用户也可以创建自己的协程调度器
co::Scheduler* my_sched = co::Scheduler::Create();
启动调度
std::thread t([my_sched]{mysched->Start();}); t.detach();
调度器原理
-
schedule 负责整个系统的协程调度,协程的运行依赖于执行器 Processer(简称 P),因此在调度器初始化的时候会选择创建 P 的数量(支持动态增长),所有的执行器会添加到双端队列中。主线程也作为一个执行器,在创建 Scheduler 对象的时候创建,位于双端队列下标为 0 的位置(注意:只是创建对象,并没有开始运行);
-
当调用了 Start() 函数后,会正式开始运行。在 Start 函数内部,会创建指定数量的执行器 P,具体数量取决于参数,默认会创建 minThreadNumber 个,当全部执行器都阻塞之后,会动态扩展,最多 maxThreadNumber 个执行器。每个执行器都会运行于一个单独的线程,执行器负责该线程内部协程的切换和执行;
-
当创建协程时,会将协程添加到某一个处于活跃状态的执行器,如果恰好都不活跃,也会添加到某一个 P 中,这并不影响执行器的正常工作,因为调度器的调度线程会去处理它;
-
Start 函数内部,除了上述执行器所在线程,还会开启调度线程 DispatcherThread,调度线程会平衡各个 P 的协程数量和负载,进行 steal,如果所有 P 都阻塞,会根据 maxThreadNumber 动态增加 P 的数量,如果仅仅部分 P 阻塞,会将阻塞的 P 中的协程全部拿出(steal),均摊到负载最小的 P 中;
-
Schedule 也会选择性开启协程的定时器线程;
- 开启 FastSteadyClock 线程。
关于定时器以及时钟的实现,会在之后的文章中讨论。
2. 协程执行器:class Processer
libgo/scheduler/processer.h
每个协程执行器对应一个线程,负责本线程的协程调度,但并非线程安全的,是协程调度的核心。
class Processer { public: // 协程挂起标识,用于后续进行唤醒和超时判断 struct SuspendEntry { // ... }; // 协程切出 ALWAYS_INLINE static void StaticCoYield(); // 挂起当前协程 static SuspendEntry Suspend(); // 挂起当前协程, 并在指定时间后自动唤醒 static SuspendEntry Suspend(FastSteadyClock::duration dur); // 唤醒协程 static bool Wakeup(SuspendEntry const& entry); private: /* * 执行器对协程的调度,也是执行器所在现在的主处理逻辑 * */ void Process(); /* * 从当前执行器中偷 n 个协程并返回 * n 为0则全部偷出来,否则取出相应的个数 * */ SList<Task> Steal(std::size_t n); private: int id_; // 线程 id,与 shcedule 中的 _processer 下标对应 Scheduler * scheduler_; // 该执行器依赖的调度器 volatile bool active_ = true; // 该执行器的活跃状态,活跃表明该执行器未被阻塞,由调度器的调度线程控制 volatile uint64_t switchCount_ = 0; // 协程调度的次数 // 当前正在运行的协程 Task* runningTask_{nullptr}; Task* nextTask_{nullptr}; // 协程队列 typedef TSQueue<Task, true> TaskQueue; TaskQueue runnableQueue_; // 运行协程队列 TaskQueue waitQueue_; // 等待协程队列 TSQueue<Task, false> gcQueue_; // 待回收的协程队列,协程运行完毕之后,会被添加到该队列中,等待回收 TaskQueue newQueue_; // 新添加到该执行器中的协程,包括刚刚 steal 过来的协程,该队列中的协程暂不会执行,会由 Process() 函数将该队列中的协程不断添加到 runnableQueue_ 中 volatile uint64_t switchCount_ = 0; // 协程调度的次数 // 执行器等待的条件变量 std::mutex cvMutex_; std::condition_variable cv_; std::atomic_bool waiting_{false}; }; // 通过条件变量,唤醒处于等待状态但是有任务的 P void NotifyCondition();
执行器对协程的调度 Process()
执行器 Processer 维护了三个线程安全的协程队列:
- runnableQueue_:可运行协程队列;
- waitQueue_:存放挂起的协程;
- newQueue_:该队列中存放的是新加入的协程,包括新创建的协程,唤醒挂起的协程,还有 steal 来的协程;
void Processer::Process() { GetCurrentProcesser() = this; bool & isStop = *stop_; while (!isStop) { runnableQueue_.front(runningTask_); // 获取一个可以运行对协程对象 if (!runningTask_) { if (AddNewTasks()) runnableQueue_.front(runningTask_); if (!runningTask_) { WaitCondition(); // 没有可以执行的协程,wait 条件变量 AddNewTasks(); continue; } } addNewQuota_ = 1; while (runningTask_ && !isStop) { runningTask_->state_ = TaskState::runnable; runningTask_->proc_ = this; ++switchCount_; runningTask_->SwapIn(); switch (runningTask_->state_) { case TaskState::runnable: { std::unique_lock<TaskQueue::lock_t> lock(runnableQueue_.LockRef()); auto next = (Task*)runningTask_->next; if (next) { runningTask_ = next; runningTask_->check_ = runnableQueue_.check_; break; } if (addNewQuota_ < 1 || newQueue_.emptyUnsafe()) { runningTask_ = nullptr; } else { lock.unlock(); if (AddNewTasks()) { runnableQueue_.next(runningTask_, runningTask_); -- addNewQuota_; } else { std::unique_lock<TaskQueue::lock_t> lock2(runnableQueue_.LockRef()); runningTask_ = nullptr; } } } break; case TaskState::block: { std::unique_lock<TaskQueue::lock_t> lock(runnableQueue_.LockRef()); runningTask_ = nextTask_; nextTask_ = nullptr; } break; case TaskState::done: default: { runnableQueue_.next(runningTask_, nextTask_); if (!nextTask_ && addNewQuota_ > 0) { if (AddNewTasks()) { runnableQueue_.next(runningTask_, nextTask_); -- addNewQuota_; } } DebugPrint(dbg_task, "task(%s) done.", runningTask_->DebugInfo()); runnableQueue_.erase(runningTask_); if (gcQueue_.size() > 16) // 执行完毕的协程,需要回收资源 GC(); gcQueue_.push(runningTask_); if (runningTask_->eptr_) { std::exception_ptr ep = runningTask_->eptr_; std::rethrow_exception(ep); } std::unique_lock<TaskQueue::lock_t> lock(runnableQueue_.LockRef()); runningTask_ = nextTask_; nextTask_ = nullptr; } break; } } } }
在调度器 Schedule 执行 Stop() 函数之前,执行器 P 会一直处于调度协程阶段 Process()。在期间,执行器 P 会将运行队列 runnableQueue 中的第一个协程获取进行执行,如果可运行队列为空,执行器会尝试将处于 newQueue 中的协程添加到可运行队列中去,如果 newQueue_ 为空,说明此时该执行器处于无协程可调度状态,通过设置条件变量,将执行器设置为等待状态;
当获取到一个可执行协程之后,会执行该协程的任务。协程的执行流程是通过状态机来实现的。(协程有三个状态:运行中,阻塞,执行完毕)
- 对于运行中的协程,我们只需要确定下一个要执行的协程对象即可;
- 对于阻塞的协程,只有当协程挂起时(调用了 Suspend 方法),状态才会切换到这里,因此,这时候只需要去执行 nextTask 即可;
- 对于运行完毕的协程,只有当 Task 处理函数执行完成之后,状态才会切换到这里,因此,需要考虑对该协程资源进行回收;
条件变量
Processer 使用了 std::mutex,并且提供了条件变量用来唤醒。当调度器尝试获取下一个可运行的协程对象时,若此时无可用协程对象,就会主动去等待该条件变量,默认100毫秒的超时时间。
void Processer::WaitCondition() { GC(); std::unique_lock<std::mutex> lock(cvMutex_); waiting_ = true; cv_.wait_for(lock, std::chrono::milliseconds(100)); waiting_ = false; }
当调度器向该执行器中增加了新的协程对象时,会唤醒该条件变量,继续执行 Process 流程。使用条件变量唤醒的效率,要远远高于不断去轮询。
为什么在使用了条件变量后还要设置超时时间,定时轮询,即使条件变量没有被唤醒也希望它返回呢?
因为我们不希望线程会在这里阻塞,只要没有新的协程加入,就一直在死等。我们希望线程在等待的同时,也可以定时跳出,执行一些其它的检测工作等。
从执行器中偷指定数量的协程出来 -> steal()
简单来说,从执行器中取协程出来,就是从执行器维护的双端队列中获取执行个数的结点。
为什么要取出来?前面提到过,要么该执行器负载过大,要么该执行器处于阻塞的状态。
SList<Task> Processer::Steal(std::size_t n) { if (n > 0) { // steal 指定个数协程 newQueue_.AssertLink(); auto slist = newQueue_.pop_back(n); newQueue_.AssertLink(); if (slist.size() >= n) return slist; std::unique_lock<TaskQueue::lock_t> lock(runnableQueue_.LockRef()); bool pushRunningTask = false, pushNextTask = false; if (runningTask_) pushRunningTask = runnableQueue_.eraseWithoutLock(runningTask_, true) || slist.erase(runningTask_, newQueue_.check_); if (nextTask_) pushNextTask = runnableQueue_.eraseWithoutLock(nextTask_, true) || slist.erase(nextTask_, newQueue_.check_); auto slist2 = runnableQueue_.pop_backWithoutLock(n - slist.size()); if (pushRunningTask) runnableQueue_.pushWithoutLock(runningTask_); if (pushNextTask) runnableQueue_.pushWithoutLock(nextTask_); lock.unlock(); slist2.append(std::move(slist)); if (!slist2.empty()) DebugPrint(dbg_scheduler, "Proc(%d).Stealed = %d", id_, (int)slist2.size()); return slist2; } else { // steal all newQueue_.AssertLink(); auto slist = newQueue_.pop_all(); newQueue_.AssertLink(); std::unique_lock<TaskQueue::lock_t> lock(runnableQueue_.LockRef()); bool pushRunningTask = false, pushNextTask = false; if (runningTask_) pushRunningTask = runnableQueue_.eraseWithoutLock(runningTask_, true) || slist.erase(runningTask_, newQueue_.check_); if (nextTask_) pushNextTask = runnableQueue_.eraseWithoutLock(nextTask_, true) || slist.erase(nextTask_, newQueue_.check_); auto slist2 = runnableQueue_.pop_allWithoutLock(); if (pushRunningTask) runnableQueue_.pushWithoutLock(runningTask_); if (pushNextTask) runnableQueue_.pushWithoutLock(nextTask_); lock.unlock(); slist2.append(std::move(slist)); if (!slist2.empty()) DebugPrint(dbg_scheduler, "Proc(%d).Stealed all = %d", id_, (int)slist2.size()); return slist2; } }
首先,会从 newQueue 队列中获取协程结点,因为 newQueue 中的结点还没有添加到运行队列中,因此可以直接取出;如果 newQueue 中协程数量不足,会从 runnableQueue 队列尾部中继续获取结点。由于 runnableQueue 队列中我们记录了正在执行的协程和下一次将执行的协程(runningTask & nextTask ),需要特殊处理。在从 runnableQueue 偷协程之前,会将 runningTask & nextTask 从队列删除,待偷完结点之后再次添加到当前 runnableQueue_ 队列中。
简单说,偷协程的工作,不会从队列中获取到 runningTask & nextTask 标识的协程。
协程挂起 Suspend
static SuspendEntry Suspend();
一种方式是直接挂起,会将该协程状态转换为 TaskState::block,然后将该协程从 runnableQueue 中删除,再添加到 waitQueue 中;
另外一种方式是挂起之后(第一种方式执行完毕之后),允许配置一个时间段之后去自动唤醒该协程。
wakeup
用于唤醒协程
唤醒协程要做的,就是讲待唤醒的协程从 waitQueue_ 中删除并重新添加到 newQueue_中去。
StaticCoYield
用于在一个执行器中切出当前协程
有两种可能,一种是协程被阻塞需要挂起;另外一种是协程执行完毕,主动切出。
具体实现是通过获取当前执行器正在执行的协程 Task,调用 SwapOut() 方法实现。
ALWAYS_INLINE void Processer::StaticCoYield() { auto proc = GetCurrentProcesser(); if (proc) proc->CoYield(); } ALWAYS_INLINE void Processer::CoYield() { Task *tk = GetCurrentTask(); assert(tk); ++ tk->yieldCount_; #if ENABLE_DEBUGGER DebugPrint(dbg_yield, "yield task(%s) state = %s", tk->DebugInfo(), GetTaskStateName(tk->state_)); if (Listener::GetTaskListener()) Listener::GetTaskListener()->onSwapOut(tk->id_); #endif tk->SwapOut(); }
几个需要注意的问题
> 可能会切出协程上下文的几种情况:
- 协程被挂起;
- 协程执行完毕;
- 用户主动切出 co_yield。
#define co_yield do { ::co::Processer::StaticCoYield(); } while (0)
> 协程被挂起的几种情况:
- 系统函数被 hook;
- libgo_poll (被 hook 的 io 操作函数会调用 libgo_poll 实现切换)
- select
- sleep、usleep、nanosleep
- 调用了协程锁 CoMutex(co_mutex),协程读写锁 CoRWMutex(co_rwmutex),或者使用了 channel。
> 切入协程上下文的几种情况:
- 执行器在调度(Process)期间;
- 唤醒挂起协程不会切入上下文,只是从等待队列中重新添加到 newQueue_。
3. 协程对象:struct Task
# 协程状态 enum class TaskState { runnable, // 可运行 block, // 阻塞 done, // 协程运行完毕 }; typedef std::function<void()> TaskF; // c++11提供的函数模板 struct Task { TaskState state_ = TaskState::runnable; uint64_t id_; // 当前调度器下协程编号,从0开始 TaskF fn_; // 协程运行的函数 uint64_t yieldCount_ = 0; // 协程切出的次数 Context ctx_; // 上下文信息 Processer* proc_ = nullptr; // 归属于哪个执行器 // 提供了协程切入、切出、切换到指定线程三个函数 ALWAYS_INLINE void SwapIn(); ALWAYS_INLINE void SwapTo(Task* other); ALWAYS_INLINE void SwapOut(); private: static void StaticRun(intptr_t vp); // 参数为 Task*,函数会去执行该 Task 的 fn_(),执行完毕后,协程状态改为 TaskState::done,并在执行器 P 中切出 };
每个 Task 对象是一个协程,在使用过程中,创建一个协程实际就是创建了一个 Task 对象,再添加到对应的执行器 P 中。之前提到过,执行器进行协程调度是通过一个状态机来实现的,这里的 TaskState 就是协程状态,协程函数 fn_ 会在 StaticRun 静态方法中调用,该静态方法注册到了协程上下文 _ctx 中。
除此之外,Task 类内部,也提供了协程的切入切出方法,本质也是调用了上下文的切换。
StaticRun
控制协程的运行,内部调用了 Task::Run() 方法,会在协程函数 fn_ 执行完毕之后,将协程状态转换为 TaskState::done,并将协程切出。
void Task::Run() { auto call_fn = [this]() { this->fn_(); this->fn_ = TaskF(); //让协程function对象的析构也在协程中执行 }; \\ ... call_fn(); \\ ... state_ = TaskState::done; Processer::StaticCoYield(); } void Task::StaticRun(intptr_t vp) { Task* tk = (Task*)vp; tk->Run(); }
这里就是对 libgo 调度相关实现的描述,本文跳过了对定时器和时钟部分的实现,这个会在之后单独叙述。本文涉及到的代码在源码目录下的
libgo-master/libgo/scheduler/processer.cpp libgo-master/libgo/scheduler/processer.h libgo-master/libgo/scheduler/scheduler.cpp libgo-master/libgo/scheduler/scheduler.h
有兴趣的读者可以对照源码学习,欢迎讨论学习
以上所述就是小编给大家介绍的《libgo 源码剖析(2. libgo调度策略源码实现)》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Go调度源码浅析
- 剖析 React 源码:调度原理
- Golang 源码学习调度逻辑(三):工作线程的执行流程与调度循环
- CFS调度器(2)-源码解析
- Ray源码解析之调度部分
- Kafka 源码解析:延时任务调度策略
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
复杂网络理论及其应用
汪小帆、李翔、陈关荣 / 清华大学出版社 / 2006 / 45.00元
国内首部复杂网络专著 【图书目录】 第1章 引论 1.1 引言 1.2 复杂网络研究简史 1.3 基本概念 1.4 本书内容简介 参考文献 第2章 网络拓扑基本模型及其性质 2.1 引言 2.2 规则网络 2.3 随机图 2.4 小世界网络模型 2.5 无标度网络模型 ......一起来看看 《复杂网络理论及其应用》 这本书的介绍吧!
随机密码生成器
多种字符组合密码
UNIX 时间戳转换
UNIX 时间戳转换