内容简介:Ray Version 0.4.0调度部分包括六个文件:调度策略的头文件,描述了SchedulingPolicy类的结构,包括构造函数,析构函数,Schedule函数(关键调度决策)。
Ray Version 0.4.0
调度部分包括六个文件:
-
调度策略:头文件
scheduling_policy.h
(定义SchedulingPolicy)和cpp文件scheduling_policy.cc
(为SchedulingPolicy实现调度策略)。 -
调度队列:头文件
scheduling_queue.h
()和cpp文件scheduling_queue.cc
()。 -
调度资源:头文件
scheduling_resources.h
()和cpp文件scheduling_resources.cc
()。
src/ray/raylet/scheduling_policy.h --------------------------------------------- namespace ray { /// \brief Implements a scheduling policy for the node manager. class SchedulingPolicy { public: /// \param scheduling_queue: reference to a scheduler queues object for access to /// tasks. SchedulingPolicy(const SchedulingQueue &scheduling_queue); /// Perform a scheduling operation, given a set of cluster resources and /// producing a mapping of tasks to node managers. /// /// \param cluster_resources: a set of cluster resources representing /// configured and current resource capacity on each node. /// \return Scheduling decision, mapping tasks to node managers for placement. std::unordered_map<TaskID, ClientID, UniqueIDHasher> Schedule( const std::unordered_map<ClientID, SchedulingResources, UniqueIDHasher> &cluster_resources); virtual ~SchedulingPolicy(); private: /// An immutable reference to the scheduling task queues. const SchedulingQueue &scheduling_queue_; }; }
调度策略的头文件,描述了SchedulingPolicy类的结构,包括构造函数,析构函数,Schedule函数(关键调度决策)。
构造函数传入需要调度的任务队列,Schedule函数接受集群资源的一个 unordered_map
作为输入,输出为调度决策,也是一个 unordered_map
。 unordered_map
是C++ 11的新特性,其内部元素是无序的。
Schedule接受的输入map是以ClientID即本节点id为key,SchedulingResources调度资源为value,即一个 节点->资源
的映射。
Schedule输出map则是以TaskID为键,ClientID为值,即表示TaskID表示的Task调度到ClientID代表的节点上。
scheduling_policy.cc ---------------------------------- namespace ray { .... std::unordered_map<TaskID, ClientID, UniqueIDHasher> SchedulingPolicy::Schedule( const std::unordered_map<ClientID, SchedulingResources, UniqueIDHasher> &cluster_resources) { static ClientID local_node_id = ClientID::nil(); std::unordered_map<TaskID, ClientID, UniqueIDHasher> decision; // TODO(atumanov): consider all cluster resources. SchedulingResources resource_supply = cluster_resources.at(local_node_id); const auto &resource_supply_set = resource_supply.GetAvailableResources(); // Iterate over running tasks, get their resource demand and try to schedule. for (const auto &t : scheduling_queue_.GetReadyTasks()) { // Get task's resource demand const auto &resource_demand = t.GetTaskSpecification().GetRequiredResources(); bool task_feasible = resource_demand.IsSubset(resource_supply_set); if (task_feasible) { const TaskID &task_id = t.GetTaskSpecification().TaskId(); decision[task_id] = local_node_id; } } return decision; } .... }
为简洁起见,这里之贴出了Schedule函数的实现。
从上述代码可以看到,ray的调度包括这么几个过程:
1)得到本地节点id
2)得到本地节点可提供的资源
3)对于每个准备好的任务,判断本地资源是否能满足该任务(task_feasible),能满足则调度到本地节点。
【问】那么不能满足的任务呢?这些任务没有对应的 local_node_id
,在decision中也就没有key,这部分任务该怎么办,有待后面分析。
【问】还有一个问题就是,程序中本地节点能满足资源要求就调度到本地节点,调度后并不会减少 resource_supply
即资源的供给,那么如果本地节点能满足所有任务的要求,岂不是所有任务都调度到此节点?
这里 resource_demand.IsSubset(resource_supply_set)
中的 IsSubset
表示资源需求是否是资源供给集的子集,如果是,表示满足条件。
scheduling_queue.h -------------------------------- namespace ray { class SchedulingQueue { public: SchedulingQueue() {} virtual ~SchedulingQueue() {} const std::list<Task> &GetWaitingTasks() const; const std::list<Task> &GetReadyTasks() const; const std::list<Task> &GetReadyMethods() const; const std::list<Task> &GetScheduledTasks() const; const std::list<Task> &GetRunningTasks() const; std::vector<Task> RemoveTasks(std::unordered_set<TaskID, UniqueIDHasher> tasks); void QueueWaitingTasks(const std::vector<Task> &tasks); void QueueReadyTasks(const std::vector<Task> &tasks); void QueueScheduledTasks(const std::vector<Task> &tasks); void QueueRunningTasks(const std::vector<Task> &tasks); /// Register an actor. /// /// \param actor_id The ID of the actor to register. /// \param actor_information Information about the actor. bool RegisterActor(ActorID actor_id, const ActorInformation &actor_information); private: std::list<Task> waiting_tasks_; std::list<Task> ready_tasks_; std::list<Task> scheduled_tasks_; std::list<Task> running_tasks_; /// The registry of known actors. std::unordered_map<ActorID, ActorInformation, UniqueIDHasher> actor_registry_; }; } // namespace ray
调度队列定义头文件,封装了调度队列,每个队列包含着各自类型的任务。
(1) waiting: for object dependencies to become available,
(2) ready: object dependencies are available and the task is ready to be scheduled
(3) scheduled: the task has been scheduled but is waiting for a worker
(4) running: the task has been scheduled and is running on a worker.
scheduling_queue.cc
文件中则实现了这些队列的getter方法,以及进队列方法,移除方法。
比如说移除方法:
// Helper function to remove tasks in the given set of task_ids from a // queue, and append them to the given vector removed_tasks. void removeTasksFromQueue(std::list<Task> &queue, std::unordered_set<TaskID, UniqueIDHasher> &task_ids, std::vector<Task> &removed_tasks) { for (auto it = queue.begin(); it != queue.end();) { auto task_id = task_ids.find(it->GetTaskSpecification().TaskId()); if (task_id != task_ids.end()) { task_ids.erase(task_id); removed_tasks.push_back(std::move(*it)); it = queue.erase(it); } else { it++; } } }
从queue中移除task_ids中包含的所有taskID代表的task。
scheduling_resources.h ------------------------------------ namespace ray { /// Resource availability status reports whether the resource requirement is /// (1) infeasible, (2) feasible but currently unavailable, or (3) available. typedef enum { kInfeasible, ///< Cannot ever satisfy resource requirements. kResourcesUnavailable, ///< Feasible, but not currently available. kFeasible ///< Feasible and currently available. } ResourceAvailabilityStatus; class ResourceSet { public: ResourceSet(); ResourceSet(const std::unordered_map<std::string, double> &resource_map); ~ResourceSet(); bool operator==(const ResourceSet &rhs) const; bool IsEqual(const ResourceSet &other) const; bool IsSubset(const ResourceSet &other) const; bool IsSuperset(const ResourceSet &other) const; bool AddResource(const std::string &resource_name, double capacity); bool RemoveResource(const std::string &resource_name); bool AddResources(const ResourceSet &other); bool SubtractResources(const ResourceSet &other); // 返回指定资源的容量值(赋给value指向的值),如果资源有大于0的数量,且value不为空指针,则返回true bool GetResource(const std::string &resource_name, double *value) const; private: // 资源容量map std::unordered_map<std::string, double> resource_capacity_; }; /// \class SchedulingResources /// SchedulingResources 封装资源的状态和资源的计数。资源包括配置资源束容量和GPU分配图。 class SchedulingResources { public: SchedulingResources(); SchedulingResources(const ResourceSet &total); ~SchedulingResources(); // 检查一个资源集是否能被满足,有几种状态,(1) infeasible, (2) feasible but currently unavailable, or (3) available. ResourceAvailabilityStatus CheckResourcesSatisfied(ResourceSet &set) const; // 请求现在可用的资源集,返回一个不可变的资源集合 const ResourceSet &GetAvailableResources() const; bool Release(const ResourceSet &resources); bool Acquire(const ResourceSet &resources); private: /// 静态资源配置 ResourceSet resources_total_; /// 动态资源容量 ResourceSet resources_available_; /// gpu_map - replace with ResourceMap (for generality). }; } // namespace ray
调度资源定义中包含两个类,一个是资源集(ResourceSet),资源集维护了一个 资源名->容量
的map映射 resource_capacity_
,并实现了相等,资源子集和超集,添加和删除资源等接口。
另一个是调度资源类(SchedulingResources),维护了两个资源集,一个是 resources_total_
总的静态资源配置,描述集群总共有哪些资源,以及一个 resources_available_
可用资源集,每个时刻的可用资源量是动态变化的,所以也叫动态资源集。实现了检查资源能够满足,请求现在可用的资源,申请(Acquire)和释放(Release)等接口。
申请(Acquire)和释放(Release)接口会分别调用资源集的AddResources,RemoveResources接口。
在 scheduling_resources.cc
中,资源集的相等是通过判断是否互为子集来实现的:
bool ResourceSet::operator==(const ResourceSet &rhs) const { return (this->IsSubset(rhs) && rhs.IsSubset(*this)); }
资源集是否为另一资源集的子集的判断函数如下实现:
bool ResourceSet::IsSubset(const ResourceSet &other) const { // Check to make sure all keys of this are in other. for (const auto &resource_pair : resource_capacity_) { const auto &resource_name = resource_pair.first; const double lhs_quantity = resource_pair.second; double rhs_quantity = 0; if (!other.GetResource(resource_name, &rhs_quantity)) { // Resource not found in rhs, therefore lhs is not a subset of rhs. return false; } if (lhs_quantity > rhs_quantity) { // Resource found in rhs, but lhs capacity exceeds rhs capacity. return false; } } return true; }
看完后面4个文件再回过头去看调度策略的实现就明了多了。
以上所述就是小编给大家介绍的《Ray源码解析之调度部分》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Go调度源码浅析
- 剖析 React 源码:调度原理
- libgo 源码剖析(2. libgo调度策略源码实现)
- Golang 源码学习调度逻辑(三):工作线程的执行流程与调度循环
- CFS调度器(2)-源码解析
- Kafka 源码解析:延时任务调度策略
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。