Ray源码解析之调度部分

栏目: C++ · 发布时间: 5年前

内容简介:Ray Version 0.4.0调度部分包括六个文件:调度策略的头文件,描述了SchedulingPolicy类的结构,包括构造函数,析构函数,Schedule函数(关键调度决策)。

Ray Version 0.4.0

调度部分包括六个文件:

  • 调度策略:头文件 scheduling_policy.h (定义SchedulingPolicy)和cpp文件 scheduling_policy.cc (为SchedulingPolicy实现调度策略)。
  • 调度队列:头文件 scheduling_queue.h ()和cpp文件 scheduling_queue.cc ()。
  • 调度资源:头文件 scheduling_resources.h ()和cpp文件 scheduling_resources.cc ()。
src/ray/raylet/scheduling_policy.h
---------------------------------------------

namespace ray {

    /// \brief Implements a scheduling policy for the node manager.
    class SchedulingPolicy {
     public:
      /// \param scheduling_queue: reference to a scheduler queues object for access to
      ///        tasks.
      SchedulingPolicy(const SchedulingQueue &scheduling_queue);
    
      /// Perform a scheduling operation, given a set of cluster resources and
      /// producing a mapping of tasks to node managers.
      ///
      ///  \param cluster_resources: a set of cluster resources representing
      ///         configured and current resource capacity on each node.
      /// \return Scheduling decision, mapping tasks to node managers for placement.
      std::unordered_map<TaskID, ClientID, UniqueIDHasher> Schedule(
          const std::unordered_map<ClientID, SchedulingResources, UniqueIDHasher>
              &cluster_resources);
    
      virtual ~SchedulingPolicy();
    
     private:
      /// An immutable reference to the scheduling task queues.
      const SchedulingQueue &scheduling_queue_;
    };
}

调度策略的头文件,描述了SchedulingPolicy类的结构,包括构造函数,析构函数,Schedule函数(关键调度决策)。

构造函数传入需要调度的任务队列,Schedule函数接受集群资源的一个 unordered_map 作为输入,输出为调度决策,也是一个 unordered_mapunordered_map 是C++ 11的新特性,其内部元素是无序的。

Schedule接受的输入map是以ClientID即本节点id为key,SchedulingResources调度资源为value,即一个 节点->资源 的映射。

Schedule输出map则是以TaskID为键,ClientID为值,即表示TaskID表示的Task调度到ClientID代表的节点上。

scheduling_policy.cc
----------------------------------

namespace ray {
....
    std::unordered_map<TaskID, ClientID, UniqueIDHasher> SchedulingPolicy::Schedule(
        const std::unordered_map<ClientID, SchedulingResources, UniqueIDHasher>
            &cluster_resources) {
      static ClientID local_node_id = ClientID::nil();
      std::unordered_map<TaskID, ClientID, UniqueIDHasher> decision;
      // TODO(atumanov): consider all cluster resources.
      SchedulingResources resource_supply = cluster_resources.at(local_node_id);
      const auto &resource_supply_set = resource_supply.GetAvailableResources();
    
      // Iterate over running tasks, get their resource demand and try to schedule.
      for (const auto &t : scheduling_queue_.GetReadyTasks()) {
        // Get task's resource demand
        const auto &resource_demand = t.GetTaskSpecification().GetRequiredResources();
        bool task_feasible = resource_demand.IsSubset(resource_supply_set);
        if (task_feasible) {
          const TaskID &task_id = t.GetTaskSpecification().TaskId();
          decision[task_id] = local_node_id;
        }
      }
      return decision;
    }
....
}

为简洁起见,这里之贴出了Schedule函数的实现。

从上述代码可以看到,ray的调度包括这么几个过程:

1)得到本地节点id

2)得到本地节点可提供的资源

3)对于每个准备好的任务,判断本地资源是否能满足该任务(task_feasible),能满足则调度到本地节点。

【问】那么不能满足的任务呢?这些任务没有对应的 local_node_id ,在decision中也就没有key,这部分任务该怎么办,有待后面分析。

【问】还有一个问题就是,程序中本地节点能满足资源要求就调度到本地节点,调度后并不会减少 resource_supply 即资源的供给,那么如果本地节点能满足所有任务的要求,岂不是所有任务都调度到此节点?

这里 resource_demand.IsSubset(resource_supply_set) 中的 IsSubset 表示资源需求是否是资源供给集的子集,如果是,表示满足条件。

scheduling_queue.h
--------------------------------

namespace ray {

class SchedulingQueue {
 public:
  SchedulingQueue() {}

  virtual ~SchedulingQueue() {}

  const std::list<Task> &GetWaitingTasks() const;
  const std::list<Task> &GetReadyTasks() const;
  const std::list<Task> &GetReadyMethods() const;
  const std::list<Task> &GetScheduledTasks() const;
  const std::list<Task> &GetRunningTasks() const;

  std::vector<Task> RemoveTasks(std::unordered_set<TaskID, UniqueIDHasher> tasks);


  void QueueWaitingTasks(const std::vector<Task> &tasks);
  void QueueReadyTasks(const std::vector<Task> &tasks);
  void QueueScheduledTasks(const std::vector<Task> &tasks);
  void QueueRunningTasks(const std::vector<Task> &tasks);

  /// Register an actor.
  ///
  /// \param actor_id The ID of the actor to register.
  /// \param actor_information Information about the actor.
  bool RegisterActor(ActorID actor_id, const ActorInformation &actor_information);

 private:
  std::list<Task> waiting_tasks_;
  std::list<Task> ready_tasks_;
  std::list<Task> scheduled_tasks_;
  std::list<Task> running_tasks_;
  /// The registry of known actors.
  std::unordered_map<ActorID, ActorInformation, UniqueIDHasher> actor_registry_;
};
}  // namespace ray

调度队列定义头文件,封装了调度队列,每个队列包含着各自类型的任务。

(1) waiting: for object dependencies to become available,

(2) ready: object dependencies are available and the task is ready to be scheduled

(3) scheduled: the task has been scheduled but is waiting for a worker

(4) running: the task has been scheduled and is running on a worker.

scheduling_queue.cc 文件中则实现了这些队列的getter方法,以及进队列方法,移除方法。

比如说移除方法:

// Helper function to remove tasks in the given set of task_ids from a
// queue, and append them to the given vector removed_tasks.
void removeTasksFromQueue(std::list<Task> &queue,
                          std::unordered_set<TaskID, UniqueIDHasher> &task_ids,
                          std::vector<Task> &removed_tasks) {
  for (auto it = queue.begin(); it != queue.end();) {
    auto task_id = task_ids.find(it->GetTaskSpecification().TaskId());
    if (task_id != task_ids.end()) {
      task_ids.erase(task_id);
      removed_tasks.push_back(std::move(*it));
      it = queue.erase(it);
    } else {
      it++;
    }
  }
}

从queue中移除task_ids中包含的所有taskID代表的task。

scheduling_resources.h
------------------------------------

namespace ray {

/// Resource availability status reports whether the resource requirement is
/// (1) infeasible, (2) feasible but currently unavailable, or (3) available.
typedef enum {
  kInfeasible,            ///< Cannot ever satisfy resource requirements.
  kResourcesUnavailable,  ///< Feasible, but not currently available.
  kFeasible               ///< Feasible and currently available.
} ResourceAvailabilityStatus;

class ResourceSet {
 public:
  ResourceSet();
  ResourceSet(const std::unordered_map<std::string, double> &resource_map);
  ~ResourceSet();

  bool operator==(const ResourceSet &rhs) const;

  bool IsEqual(const ResourceSet &other) const;

  bool IsSubset(const ResourceSet &other) const;

  bool IsSuperset(const ResourceSet &other) const;

  bool AddResource(const std::string &resource_name, double capacity);

  bool RemoveResource(const std::string &resource_name);

  bool AddResources(const ResourceSet &other);

  bool SubtractResources(const ResourceSet &other);

  // 返回指定资源的容量值(赋给value指向的值),如果资源有大于0的数量,且value不为空指针,则返回true
  bool GetResource(const std::string &resource_name, double *value) const;

 private:
  // 资源容量map
  std::unordered_map<std::string, double> resource_capacity_;
};

/// \class SchedulingResources
/// SchedulingResources 封装资源的状态和资源的计数。资源包括配置资源束容量和GPU分配图。
class SchedulingResources {
 public:
  SchedulingResources();
  SchedulingResources(const ResourceSet &total);
  ~SchedulingResources();
  // 检查一个资源集是否能被满足,有几种状态,(1) infeasible, (2) feasible but currently unavailable, or (3) available.
  ResourceAvailabilityStatus CheckResourcesSatisfied(ResourceSet &set) const;

  // 请求现在可用的资源集,返回一个不可变的资源集合
  const ResourceSet &GetAvailableResources() const;

  bool Release(const ResourceSet &resources);

  bool Acquire(const ResourceSet &resources);

 private:
  /// 静态资源配置
  ResourceSet resources_total_;
  /// 动态资源容量
  ResourceSet resources_available_;
  /// gpu_map - replace with ResourceMap (for generality).
};

}  // namespace ray

调度资源定义中包含两个类,一个是资源集(ResourceSet),资源集维护了一个 资源名->容量 的map映射 resource_capacity_ ,并实现了相等,资源子集和超集,添加和删除资源等接口。

另一个是调度资源类(SchedulingResources),维护了两个资源集,一个是 resources_total_ 总的静态资源配置,描述集群总共有哪些资源,以及一个 resources_available_ 可用资源集,每个时刻的可用资源量是动态变化的,所以也叫动态资源集。实现了检查资源能够满足,请求现在可用的资源,申请(Acquire)和释放(Release)等接口。

申请(Acquire)和释放(Release)接口会分别调用资源集的AddResources,RemoveResources接口。

scheduling_resources.cc 中,资源集的相等是通过判断是否互为子集来实现的:

bool ResourceSet::operator==(const ResourceSet &rhs) const {
  return (this->IsSubset(rhs) && rhs.IsSubset(*this));
}

资源集是否为另一资源集的子集的判断函数如下实现:

bool ResourceSet::IsSubset(const ResourceSet &other) const {
  // Check to make sure all keys of this are in other.
  for (const auto &resource_pair : resource_capacity_) {
    const auto &resource_name = resource_pair.first;
    const double lhs_quantity = resource_pair.second;
    double rhs_quantity = 0;
    if (!other.GetResource(resource_name, &rhs_quantity)) {
      // Resource not found in rhs, therefore lhs is not a subset of rhs.
      return false;
    }
    if (lhs_quantity > rhs_quantity) {
      // Resource found in rhs, but lhs capacity exceeds rhs capacity.
      return false;
    }
  }
  return true;
}

看完后面4个文件再回过头去看调度策略的实现就明了多了。


以上所述就是小编给大家介绍的《Ray源码解析之调度部分》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

后现代经济

后现代经济

姜奇平 / 中信出版社 / 2009-7 / 45.00元

《后现代经济:网络时代的个性化和多元化》站在历史“终结”与“开始”的切换点上,以价值、交换、货币、资本、组织、制度、福利等方面为线索,扬弃现代性经济学,对工业化进行反思,深刻剖析了“一切坚固的东西都烟消云散”的局限性,在此基础上展开对现代性经济的解构和建构。“9·11”中坚固的世贸中心大楼灰飞烟灭,2008年坚固的华尔街投资神话彻底破灭,坚固的雷曼兄弟公司在挺立了158年后烟消云散……一切坚固的东......一起来看看 《后现代经济》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具