内容简介:本文将从源码实现上对 libgo 的调度策略进行分析,主要涉及到上一篇文章中的三个结构体的定义:三者的关系如下图所示:
本文将从源码实现上对 libgo 的调度策略进行分析,主要涉及到上一篇文章中的三个结构体的定义:
- 调度器 Scheduler(简称 S)
- 执行器 Processer(简称 P)
- 协程 Task(简称 T)
三者的关系如下图所示:
本文会列出类内的主要成员和主要函数做以分析。
1. 协程调度器:class Scheduler
libgo/scheduler/scheduler.h
class Scheduler{ public: /* * 创建一个调度器,初始化 libgo * 创建主线程的执行器,如果后续 STart 的时候没有参数,默认只有一个执行器去做 * 当仅使用一个线程进行协程调度时, 协程地执行会严格地遵循其创建顺序. * */ static Scheduler* Create(); /* * 创建一个协程 Task 对象,并添加到当前的执行器 processer 的任务队列中, * 调度器的任务数 taskCount_ +1 * */ void CreateTask(TaskF const& fn, TaskOpt const& opt); /* 启动调度器 * @minThreadNumber : 最小调度线程数, 为0时, 设置为cpu核心数. * @maxThreadNumber : 最大调度线程数, 为0时, 设置为minThreadNumber. * 如果maxThreadNumber大于minThreadNumber, 则当协程产生长时间阻塞时, * 可以自动扩展调度线程数. * 唤醒定时器线程 * 每个调度线程都会调用 Process 开始调度,最后开启 id 为 0 的调度线程 * 如果 maxThreadNumber_ > 1 的话,会开启调度线程 DispatcherThread * */ void Start(int minThreadNumber = 1, int maxThreadNumber = 0); /* * 停止调度,停止后无法恢复, 仅用于安全退出main函数 * 如果某个调度线程被协程阻塞, 必须等待阻塞结束才能退出. * */ void Stop(); private: /* * 调度线程,主要为平衡多个 processer 的负载将高负载或阻塞的 p 中的协程 steal 给低负载的 p * 如果全部阻塞但是还有协程待执行,会起新线程,线程数不超过 maxThreadNumber_ * 会将阻塞 P 中的协程分摊给负载较少的 P * */ void DispatcherThread(); /* * 创建一个新的 Processer,并添加到双端队列 processers_ 中 * */ void NewProcessThread(); private: atomic_t<uint32_t> taskCount_{0}; // 用来统计协程数量 Deque<Processer*> processers_; // DispatcherThread双端队列,用来存放所有的执行器,每个执行器都会单独开一个线程去执行,线程中回调 Process() 方法。 LFLock started_; // libgo 提供的自选锁 };
调度器负责管理 1~N 个调度线程,每个调度线程一个执行器 Processer。调度器仅负责均衡各个执行器的负载,防止全部卡住的情况,并不涉及协程的切换等工作。
使用
ligbo提供了默认的协程调度器 co_sched
#define g_Scheduler ::co::Scheduler::getInstance() #define co_sched g_Scheduler
用户也可以创建自己的协程调度器
co::Scheduler* my_sched = co::Scheduler::Create();
启动调度
std::thread t([my_sched]{mysched->Start();}); t.detach();
调度器原理
-
schedule 负责整个系统的协程调度,协程的运行依赖于执行器 Processer(简称 P),因此在调度器初始化的时候会选择创建 P 的数量(支持动态增长),所有的执行器会添加到双端队列中。主线程也作为一个执行器,在创建 Scheduler 对象的时候创建,位于双端队列下标为 0 的位置(注意:只是创建对象,并没有开始运行);
-
当调用了 Start() 函数后,会正式开始运行。在 Start 函数内部,会创建指定数量的执行器 P,具体数量取决于参数,默认会创建 minThreadNumber 个,当全部执行器都阻塞之后,会动态扩展,最多 maxThreadNumber 个执行器。每个执行器都会运行于一个单独的线程,执行器负责该线程内部协程的切换和执行;
-
当创建协程时,会将协程添加到某一个处于活跃状态的执行器,如果恰好都不活跃,也会添加到某一个 P 中,这并不影响执行器的正常工作,因为调度器的调度线程会去处理它;
-
Start 函数内部,除了上述执行器所在线程,还会开启调度线程 DispatcherThread,调度线程会平衡各个 P 的协程数量和负载,进行 steal,如果所有 P 都阻塞,会根据 maxThreadNumber 动态增加 P 的数量,如果仅仅部分 P 阻塞,会将阻塞的 P 中的协程全部拿出(steal),均摊到负载最小的 P 中;
-
Schedule 也会选择性开启协程的定时器线程;
- 开启 FastSteadyClock 线程。
关于定时器以及时钟的实现,会在之后的文章中讨论。
2. 协程执行器:class Processer
libgo/scheduler/processer.h
每个协程执行器对应一个线程,负责本线程的协程调度,但并非线程安全的,是协程调度的核心。
class Processer { public: // 协程挂起标识,用于后续进行唤醒和超时判断 struct SuspendEntry { // ... }; // 协程切出 ALWAYS_INLINE static void StaticCoYield(); // 挂起当前协程 static SuspendEntry Suspend(); // 挂起当前协程, 并在指定时间后自动唤醒 static SuspendEntry Suspend(FastSteadyClock::duration dur); // 唤醒协程 static bool Wakeup(SuspendEntry const& entry); private: /* * 执行器对协程的调度,也是执行器所在现在的主处理逻辑 * */ void Process(); /* * 从当前执行器中偷 n 个协程并返回 * n 为0则全部偷出来,否则取出相应的个数 * */ SList<Task> Steal(std::size_t n); private: int id_; // 线程 id,与 shcedule 中的 _processer 下标对应 Scheduler * scheduler_; // 该执行器依赖的调度器 volatile bool active_ = true; // 该执行器的活跃状态,活跃表明该执行器未被阻塞,由调度器的调度线程控制 volatile uint64_t switchCount_ = 0; // 协程调度的次数 // 当前正在运行的协程 Task* runningTask_{nullptr}; Task* nextTask_{nullptr}; // 协程队列 typedef TSQueue<Task, true> TaskQueue; TaskQueue runnableQueue_; // 运行协程队列 TaskQueue waitQueue_; // 等待协程队列 TSQueue<Task, false> gcQueue_; // 待回收的协程队列,协程运行完毕之后,会被添加到该队列中,等待回收 TaskQueue newQueue_; // 新添加到该执行器中的协程,包括刚刚 steal 过来的协程,该队列中的协程暂不会执行,会由 Process() 函数将该队列中的协程不断添加到 runnableQueue_ 中 volatile uint64_t switchCount_ = 0; // 协程调度的次数 // 执行器等待的条件变量 std::mutex cvMutex_; std::condition_variable cv_; std::atomic_bool waiting_{false}; }; // 通过条件变量,唤醒处于等待状态但是有任务的 P void NotifyCondition();
执行器对协程的调度 Process()
执行器 Processer 维护了三个线程安全的协程队列:
- runnableQueue_:可运行协程队列;
- waitQueue_:存放挂起的协程;
- newQueue_:该队列中存放的是新加入的协程,包括新创建的协程,唤醒挂起的协程,还有 steal 来的协程;
void Processer::Process() { GetCurrentProcesser() = this; bool & isStop = *stop_; while (!isStop) { runnableQueue_.front(runningTask_); // 获取一个可以运行对协程对象 if (!runningTask_) { if (AddNewTasks()) runnableQueue_.front(runningTask_); if (!runningTask_) { WaitCondition(); // 没有可以执行的协程,wait 条件变量 AddNewTasks(); continue; } } addNewQuota_ = 1; while (runningTask_ && !isStop) { runningTask_->state_ = TaskState::runnable; runningTask_->proc_ = this; ++switchCount_; runningTask_->SwapIn(); switch (runningTask_->state_) { case TaskState::runnable: { std::unique_lock<TaskQueue::lock_t> lock(runnableQueue_.LockRef()); auto next = (Task*)runningTask_->next; if (next) { runningTask_ = next; runningTask_->check_ = runnableQueue_.check_; break; } if (addNewQuota_ < 1 || newQueue_.emptyUnsafe()) { runningTask_ = nullptr; } else { lock.unlock(); if (AddNewTasks()) { runnableQueue_.next(runningTask_, runningTask_); -- addNewQuota_; } else { std::unique_lock<TaskQueue::lock_t> lock2(runnableQueue_.LockRef()); runningTask_ = nullptr; } } } break; case TaskState::block: { std::unique_lock<TaskQueue::lock_t> lock(runnableQueue_.LockRef()); runningTask_ = nextTask_; nextTask_ = nullptr; } break; case TaskState::done: default: { runnableQueue_.next(runningTask_, nextTask_); if (!nextTask_ && addNewQuota_ > 0) { if (AddNewTasks()) { runnableQueue_.next(runningTask_, nextTask_); -- addNewQuota_; } } DebugPrint(dbg_task, "task(%s) done.", runningTask_->DebugInfo()); runnableQueue_.erase(runningTask_); if (gcQueue_.size() > 16) // 执行完毕的协程,需要回收资源 GC(); gcQueue_.push(runningTask_); if (runningTask_->eptr_) { std::exception_ptr ep = runningTask_->eptr_; std::rethrow_exception(ep); } std::unique_lock<TaskQueue::lock_t> lock(runnableQueue_.LockRef()); runningTask_ = nextTask_; nextTask_ = nullptr; } break; } } } }
在调度器 Schedule 执行 Stop() 函数之前,执行器 P 会一直处于调度协程阶段 Process()。在期间,执行器 P 会将运行队列 runnableQueue 中的第一个协程获取进行执行,如果可运行队列为空,执行器会尝试将处于 newQueue 中的协程添加到可运行队列中去,如果 newQueue_ 为空,说明此时该执行器处于无协程可调度状态,通过设置条件变量,将执行器设置为等待状态;
当获取到一个可执行协程之后,会执行该协程的任务。协程的执行流程是通过状态机来实现的。(协程有三个状态:运行中,阻塞,执行完毕)
- 对于运行中的协程,我们只需要确定下一个要执行的协程对象即可;
- 对于阻塞的协程,只有当协程挂起时(调用了 Suspend 方法),状态才会切换到这里,因此,这时候只需要去执行 nextTask 即可;
- 对于运行完毕的协程,只有当 Task 处理函数执行完成之后,状态才会切换到这里,因此,需要考虑对该协程资源进行回收;
条件变量
Processer 使用了 std::mutex,并且提供了条件变量用来唤醒。当调度器尝试获取下一个可运行的协程对象时,若此时无可用协程对象,就会主动去等待该条件变量,默认100毫秒的超时时间。
void Processer::WaitCondition() { GC(); std::unique_lock<std::mutex> lock(cvMutex_); waiting_ = true; cv_.wait_for(lock, std::chrono::milliseconds(100)); waiting_ = false; }
当调度器向该执行器中增加了新的协程对象时,会唤醒该条件变量,继续执行 Process 流程。使用条件变量唤醒的效率,要远远高于不断去轮询。
为什么在使用了条件变量后还要设置超时时间,定时轮询,即使条件变量没有被唤醒也希望它返回呢?
因为我们不希望线程会在这里阻塞,只要没有新的协程加入,就一直在死等。我们希望线程在等待的同时,也可以定时跳出,执行一些其它的检测工作等。
从执行器中偷指定数量的协程出来 -> steal()
简单来说,从执行器中取协程出来,就是从执行器维护的双端队列中获取执行个数的结点。
为什么要取出来?前面提到过,要么该执行器负载过大,要么该执行器处于阻塞的状态。
SList<Task> Processer::Steal(std::size_t n) { if (n > 0) { // steal 指定个数协程 newQueue_.AssertLink(); auto slist = newQueue_.pop_back(n); newQueue_.AssertLink(); if (slist.size() >= n) return slist; std::unique_lock<TaskQueue::lock_t> lock(runnableQueue_.LockRef()); bool pushRunningTask = false, pushNextTask = false; if (runningTask_) pushRunningTask = runnableQueue_.eraseWithoutLock(runningTask_, true) || slist.erase(runningTask_, newQueue_.check_); if (nextTask_) pushNextTask = runnableQueue_.eraseWithoutLock(nextTask_, true) || slist.erase(nextTask_, newQueue_.check_); auto slist2 = runnableQueue_.pop_backWithoutLock(n - slist.size()); if (pushRunningTask) runnableQueue_.pushWithoutLock(runningTask_); if (pushNextTask) runnableQueue_.pushWithoutLock(nextTask_); lock.unlock(); slist2.append(std::move(slist)); if (!slist2.empty()) DebugPrint(dbg_scheduler, "Proc(%d).Stealed = %d", id_, (int)slist2.size()); return slist2; } else { // steal all newQueue_.AssertLink(); auto slist = newQueue_.pop_all(); newQueue_.AssertLink(); std::unique_lock<TaskQueue::lock_t> lock(runnableQueue_.LockRef()); bool pushRunningTask = false, pushNextTask = false; if (runningTask_) pushRunningTask = runnableQueue_.eraseWithoutLock(runningTask_, true) || slist.erase(runningTask_, newQueue_.check_); if (nextTask_) pushNextTask = runnableQueue_.eraseWithoutLock(nextTask_, true) || slist.erase(nextTask_, newQueue_.check_); auto slist2 = runnableQueue_.pop_allWithoutLock(); if (pushRunningTask) runnableQueue_.pushWithoutLock(runningTask_); if (pushNextTask) runnableQueue_.pushWithoutLock(nextTask_); lock.unlock(); slist2.append(std::move(slist)); if (!slist2.empty()) DebugPrint(dbg_scheduler, "Proc(%d).Stealed all = %d", id_, (int)slist2.size()); return slist2; } }
首先,会从 newQueue 队列中获取协程结点,因为 newQueue 中的结点还没有添加到运行队列中,因此可以直接取出;如果 newQueue 中协程数量不足,会从 runnableQueue 队列尾部中继续获取结点。由于 runnableQueue 队列中我们记录了正在执行的协程和下一次将执行的协程(runningTask & nextTask ),需要特殊处理。在从 runnableQueue 偷协程之前,会将 runningTask & nextTask 从队列删除,待偷完结点之后再次添加到当前 runnableQueue_ 队列中。
简单说,偷协程的工作,不会从队列中获取到 runningTask & nextTask 标识的协程。
协程挂起 Suspend
static SuspendEntry Suspend();
一种方式是直接挂起,会将该协程状态转换为 TaskState::block,然后将该协程从 runnableQueue 中删除,再添加到 waitQueue 中;
另外一种方式是挂起之后(第一种方式执行完毕之后),允许配置一个时间段之后去自动唤醒该协程。
wakeup
用于唤醒协程
唤醒协程要做的,就是讲待唤醒的协程从 waitQueue_ 中删除并重新添加到 newQueue_中去。
StaticCoYield
用于在一个执行器中切出当前协程
有两种可能,一种是协程被阻塞需要挂起;另外一种是协程执行完毕,主动切出。
具体实现是通过获取当前执行器正在执行的协程 Task,调用 SwapOut() 方法实现。
ALWAYS_INLINE void Processer::StaticCoYield() { auto proc = GetCurrentProcesser(); if (proc) proc->CoYield(); } ALWAYS_INLINE void Processer::CoYield() { Task *tk = GetCurrentTask(); assert(tk); ++ tk->yieldCount_; #if ENABLE_DEBUGGER DebugPrint(dbg_yield, "yield task(%s) state = %s", tk->DebugInfo(), GetTaskStateName(tk->state_)); if (Listener::GetTaskListener()) Listener::GetTaskListener()->onSwapOut(tk->id_); #endif tk->SwapOut(); }
几个需要注意的问题
> 可能会切出协程上下文的几种情况:
- 协程被挂起;
- 协程执行完毕;
- 用户主动切出 co_yield。
#define co_yield do { ::co::Processer::StaticCoYield(); } while (0)
> 协程被挂起的几种情况:
- 系统函数被 hook;
- libgo_poll (被 hook 的 io 操作函数会调用 libgo_poll 实现切换)
- select
- sleep、usleep、nanosleep
- 调用了协程锁 CoMutex(co_mutex),协程读写锁 CoRWMutex(co_rwmutex),或者使用了 channel。
> 切入协程上下文的几种情况:
- 执行器在调度(Process)期间;
- 唤醒挂起协程不会切入上下文,只是从等待队列中重新添加到 newQueue_。
3. 协程对象:struct Task
# 协程状态 enum class TaskState { runnable, // 可运行 block, // 阻塞 done, // 协程运行完毕 }; typedef std::function<void()> TaskF; // c++11提供的函数模板 struct Task { TaskState state_ = TaskState::runnable; uint64_t id_; // 当前调度器下协程编号,从0开始 TaskF fn_; // 协程运行的函数 uint64_t yieldCount_ = 0; // 协程切出的次数 Context ctx_; // 上下文信息 Processer* proc_ = nullptr; // 归属于哪个执行器 // 提供了协程切入、切出、切换到指定线程三个函数 ALWAYS_INLINE void SwapIn(); ALWAYS_INLINE void SwapTo(Task* other); ALWAYS_INLINE void SwapOut(); private: static void StaticRun(intptr_t vp); // 参数为 Task*,函数会去执行该 Task 的 fn_(),执行完毕后,协程状态改为 TaskState::done,并在执行器 P 中切出 };
每个 Task 对象是一个协程,在使用过程中,创建一个协程实际就是创建了一个 Task 对象,再添加到对应的执行器 P 中。之前提到过,执行器进行协程调度是通过一个状态机来实现的,这里的 TaskState 就是协程状态,协程函数 fn_ 会在 StaticRun 静态方法中调用,该静态方法注册到了协程上下文 _ctx 中。
除此之外,Task 类内部,也提供了协程的切入切出方法,本质也是调用了上下文的切换。
StaticRun
控制协程的运行,内部调用了 Task::Run() 方法,会在协程函数 fn_ 执行完毕之后,将协程状态转换为 TaskState::done,并将协程切出。
void Task::Run() { auto call_fn = [this]() { this->fn_(); this->fn_ = TaskF(); //让协程function对象的析构也在协程中执行 }; \\ ... call_fn(); \\ ... state_ = TaskState::done; Processer::StaticCoYield(); } void Task::StaticRun(intptr_t vp) { Task* tk = (Task*)vp; tk->Run(); }
这里就是对 libgo 调度相关实现的描述,本文跳过了对定时器和时钟部分的实现,这个会在之后单独叙述。本文涉及到的代码在源码目录下的
libgo-master/libgo/scheduler/processer.cpp libgo-master/libgo/scheduler/processer.h libgo-master/libgo/scheduler/scheduler.cpp libgo-master/libgo/scheduler/scheduler.h
有兴趣的读者可以对照源码学习,欢迎讨论学习
以上所述就是小编给大家介绍的《libgo 源码剖析(2. libgo调度策略源码实现)》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Go调度源码浅析
- 剖析 React 源码:调度原理
- Golang 源码学习调度逻辑(三):工作线程的执行流程与调度循环
- CFS调度器(2)-源码解析
- Ray源码解析之调度部分
- Kafka 源码解析:延时任务调度策略
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
程序员的算法趣题
[ 日] 增井敏克 / 绝 云 / 人民邮电出版社 / 2017-7 / 55.00元
本书是一本解谜式的趣味算法书,从实际应用出发,通过趣味谜题的解谜过程,引导读者在愉悦中提升思维能力、掌握算法精髓。此外,本书作者在谜题解答上,通过算法的关键原理讲解,从思维细节入手,发掘启发性算法新解,并辅以Ruby、JavaScript等不同语言编写的源代码示例,使读者在算法思维与编程实践的分合之间,切实提高编程能力。 本书适合已经学习过排序、搜索等知名算法,并想要学习更多有趣算法以提升编程技巧......一起来看看 《程序员的算法趣题》 这本书的介绍吧!