Ray源码解析之调度部分

栏目: C++ · 发布时间: 5年前

内容简介:Ray Version 0.4.0调度部分包括六个文件:调度策略的头文件,描述了SchedulingPolicy类的结构,包括构造函数,析构函数,Schedule函数(关键调度决策)。

Ray Version 0.4.0

调度部分包括六个文件:

  • 调度策略:头文件 scheduling_policy.h (定义SchedulingPolicy)和cpp文件 scheduling_policy.cc (为SchedulingPolicy实现调度策略)。
  • 调度队列:头文件 scheduling_queue.h ()和cpp文件 scheduling_queue.cc ()。
  • 调度资源:头文件 scheduling_resources.h ()和cpp文件 scheduling_resources.cc ()。
src/ray/raylet/scheduling_policy.h
---------------------------------------------

namespace ray {

    /// \brief Implements a scheduling policy for the node manager.
    class SchedulingPolicy {
     public:
      /// \param scheduling_queue: reference to a scheduler queues object for access to
      ///        tasks.
      SchedulingPolicy(const SchedulingQueue &scheduling_queue);
    
      /// Perform a scheduling operation, given a set of cluster resources and
      /// producing a mapping of tasks to node managers.
      ///
      ///  \param cluster_resources: a set of cluster resources representing
      ///         configured and current resource capacity on each node.
      /// \return Scheduling decision, mapping tasks to node managers for placement.
      std::unordered_map<TaskID, ClientID, UniqueIDHasher> Schedule(
          const std::unordered_map<ClientID, SchedulingResources, UniqueIDHasher>
              &cluster_resources);
    
      virtual ~SchedulingPolicy();
    
     private:
      /// An immutable reference to the scheduling task queues.
      const SchedulingQueue &scheduling_queue_;
    };
}

调度策略的头文件,描述了SchedulingPolicy类的结构,包括构造函数,析构函数,Schedule函数(关键调度决策)。

构造函数传入需要调度的任务队列,Schedule函数接受集群资源的一个 unordered_map 作为输入,输出为调度决策,也是一个 unordered_mapunordered_map 是C++ 11的新特性,其内部元素是无序的。

Schedule接受的输入map是以ClientID即本节点id为key,SchedulingResources调度资源为value,即一个 节点->资源 的映射。

Schedule输出map则是以TaskID为键,ClientID为值,即表示TaskID表示的Task调度到ClientID代表的节点上。

scheduling_policy.cc
----------------------------------

namespace ray {
....
    std::unordered_map<TaskID, ClientID, UniqueIDHasher> SchedulingPolicy::Schedule(
        const std::unordered_map<ClientID, SchedulingResources, UniqueIDHasher>
            &cluster_resources) {
      static ClientID local_node_id = ClientID::nil();
      std::unordered_map<TaskID, ClientID, UniqueIDHasher> decision;
      // TODO(atumanov): consider all cluster resources.
      SchedulingResources resource_supply = cluster_resources.at(local_node_id);
      const auto &resource_supply_set = resource_supply.GetAvailableResources();
    
      // Iterate over running tasks, get their resource demand and try to schedule.
      for (const auto &t : scheduling_queue_.GetReadyTasks()) {
        // Get task's resource demand
        const auto &resource_demand = t.GetTaskSpecification().GetRequiredResources();
        bool task_feasible = resource_demand.IsSubset(resource_supply_set);
        if (task_feasible) {
          const TaskID &task_id = t.GetTaskSpecification().TaskId();
          decision[task_id] = local_node_id;
        }
      }
      return decision;
    }
....
}

为简洁起见,这里之贴出了Schedule函数的实现。

从上述代码可以看到,ray的调度包括这么几个过程:

1)得到本地节点id

2)得到本地节点可提供的资源

3)对于每个准备好的任务,判断本地资源是否能满足该任务(task_feasible),能满足则调度到本地节点。

【问】那么不能满足的任务呢?这些任务没有对应的 local_node_id ,在decision中也就没有key,这部分任务该怎么办,有待后面分析。

【问】还有一个问题就是,程序中本地节点能满足资源要求就调度到本地节点,调度后并不会减少 resource_supply 即资源的供给,那么如果本地节点能满足所有任务的要求,岂不是所有任务都调度到此节点?

这里 resource_demand.IsSubset(resource_supply_set) 中的 IsSubset 表示资源需求是否是资源供给集的子集,如果是,表示满足条件。

scheduling_queue.h
--------------------------------

namespace ray {

class SchedulingQueue {
 public:
  SchedulingQueue() {}

  virtual ~SchedulingQueue() {}

  const std::list<Task> &GetWaitingTasks() const;
  const std::list<Task> &GetReadyTasks() const;
  const std::list<Task> &GetReadyMethods() const;
  const std::list<Task> &GetScheduledTasks() const;
  const std::list<Task> &GetRunningTasks() const;

  std::vector<Task> RemoveTasks(std::unordered_set<TaskID, UniqueIDHasher> tasks);


  void QueueWaitingTasks(const std::vector<Task> &tasks);
  void QueueReadyTasks(const std::vector<Task> &tasks);
  void QueueScheduledTasks(const std::vector<Task> &tasks);
  void QueueRunningTasks(const std::vector<Task> &tasks);

  /// Register an actor.
  ///
  /// \param actor_id The ID of the actor to register.
  /// \param actor_information Information about the actor.
  bool RegisterActor(ActorID actor_id, const ActorInformation &actor_information);

 private:
  std::list<Task> waiting_tasks_;
  std::list<Task> ready_tasks_;
  std::list<Task> scheduled_tasks_;
  std::list<Task> running_tasks_;
  /// The registry of known actors.
  std::unordered_map<ActorID, ActorInformation, UniqueIDHasher> actor_registry_;
};
}  // namespace ray

调度队列定义头文件,封装了调度队列,每个队列包含着各自类型的任务。

(1) waiting: for object dependencies to become available,

(2) ready: object dependencies are available and the task is ready to be scheduled

(3) scheduled: the task has been scheduled but is waiting for a worker

(4) running: the task has been scheduled and is running on a worker.

scheduling_queue.cc 文件中则实现了这些队列的getter方法,以及进队列方法,移除方法。

比如说移除方法:

// Helper function to remove tasks in the given set of task_ids from a
// queue, and append them to the given vector removed_tasks.
void removeTasksFromQueue(std::list<Task> &queue,
                          std::unordered_set<TaskID, UniqueIDHasher> &task_ids,
                          std::vector<Task> &removed_tasks) {
  for (auto it = queue.begin(); it != queue.end();) {
    auto task_id = task_ids.find(it->GetTaskSpecification().TaskId());
    if (task_id != task_ids.end()) {
      task_ids.erase(task_id);
      removed_tasks.push_back(std::move(*it));
      it = queue.erase(it);
    } else {
      it++;
    }
  }
}

从queue中移除task_ids中包含的所有taskID代表的task。

scheduling_resources.h
------------------------------------

namespace ray {

/// Resource availability status reports whether the resource requirement is
/// (1) infeasible, (2) feasible but currently unavailable, or (3) available.
typedef enum {
  kInfeasible,            ///< Cannot ever satisfy resource requirements.
  kResourcesUnavailable,  ///< Feasible, but not currently available.
  kFeasible               ///< Feasible and currently available.
} ResourceAvailabilityStatus;

class ResourceSet {
 public:
  ResourceSet();
  ResourceSet(const std::unordered_map<std::string, double> &resource_map);
  ~ResourceSet();

  bool operator==(const ResourceSet &rhs) const;

  bool IsEqual(const ResourceSet &other) const;

  bool IsSubset(const ResourceSet &other) const;

  bool IsSuperset(const ResourceSet &other) const;

  bool AddResource(const std::string &resource_name, double capacity);

  bool RemoveResource(const std::string &resource_name);

  bool AddResources(const ResourceSet &other);

  bool SubtractResources(const ResourceSet &other);

  // 返回指定资源的容量值(赋给value指向的值),如果资源有大于0的数量,且value不为空指针,则返回true
  bool GetResource(const std::string &resource_name, double *value) const;

 private:
  // 资源容量map
  std::unordered_map<std::string, double> resource_capacity_;
};

/// \class SchedulingResources
/// SchedulingResources 封装资源的状态和资源的计数。资源包括配置资源束容量和GPU分配图。
class SchedulingResources {
 public:
  SchedulingResources();
  SchedulingResources(const ResourceSet &total);
  ~SchedulingResources();
  // 检查一个资源集是否能被满足,有几种状态,(1) infeasible, (2) feasible but currently unavailable, or (3) available.
  ResourceAvailabilityStatus CheckResourcesSatisfied(ResourceSet &set) const;

  // 请求现在可用的资源集,返回一个不可变的资源集合
  const ResourceSet &GetAvailableResources() const;

  bool Release(const ResourceSet &resources);

  bool Acquire(const ResourceSet &resources);

 private:
  /// 静态资源配置
  ResourceSet resources_total_;
  /// 动态资源容量
  ResourceSet resources_available_;
  /// gpu_map - replace with ResourceMap (for generality).
};

}  // namespace ray

调度资源定义中包含两个类,一个是资源集(ResourceSet),资源集维护了一个 资源名->容量 的map映射 resource_capacity_ ,并实现了相等,资源子集和超集,添加和删除资源等接口。

另一个是调度资源类(SchedulingResources),维护了两个资源集,一个是 resources_total_ 总的静态资源配置,描述集群总共有哪些资源,以及一个 resources_available_ 可用资源集,每个时刻的可用资源量是动态变化的,所以也叫动态资源集。实现了检查资源能够满足,请求现在可用的资源,申请(Acquire)和释放(Release)等接口。

申请(Acquire)和释放(Release)接口会分别调用资源集的AddResources,RemoveResources接口。

scheduling_resources.cc 中,资源集的相等是通过判断是否互为子集来实现的:

bool ResourceSet::operator==(const ResourceSet &rhs) const {
  return (this->IsSubset(rhs) && rhs.IsSubset(*this));
}

资源集是否为另一资源集的子集的判断函数如下实现:

bool ResourceSet::IsSubset(const ResourceSet &other) const {
  // Check to make sure all keys of this are in other.
  for (const auto &resource_pair : resource_capacity_) {
    const auto &resource_name = resource_pair.first;
    const double lhs_quantity = resource_pair.second;
    double rhs_quantity = 0;
    if (!other.GetResource(resource_name, &rhs_quantity)) {
      // Resource not found in rhs, therefore lhs is not a subset of rhs.
      return false;
    }
    if (lhs_quantity > rhs_quantity) {
      // Resource found in rhs, but lhs capacity exceeds rhs capacity.
      return false;
    }
  }
  return true;
}

看完后面4个文件再回过头去看调度策略的实现就明了多了。


以上所述就是小编给大家介绍的《Ray源码解析之调度部分》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Usability for the Web

Usability for the Web

Tom Brinck、Darren Gergle、Scott D. Wood / Morgan Kaufmann / 2001-10-15 / USD 65.95

Every stage in the design of a new web site is an opportunity to meet or miss deadlines and budgetary goals. Every stage is an opportunity to boost or undercut the site's usability. Thi......一起来看看 《Usability for the Web》 这本书的介绍吧!

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具