内容简介:之前在分析controller-manager中说到,controller对于每个controller的控制格式基本一致,都是以。NewNodeLifecycleController负责创建资源对象,Run负责启动,完成任务的执行。NewNodeLifecycleController主要完成以下任务:
之前在分析controller-manager中说到,controller对于每个controller的控制格式基本一致,都是以 start***Controller 的方式封装成一个独立的方法,NodeController也不例外。在1.13.4的版本中,Node的控制器分成了两种(很早之前的版本只有一种),分别是 NodeIpamController 与 NodeLifecycleController 。其中,NodeIpamController主要处理Node的IPAM地址相关,NodeLifecycleController处理Node的整个生命周期,本文主要分析NodeLifecycleController。
startNodeLifecycleController 方法开始它的生命周期的管理流程。主要关注两个方法:
NewNodeLifecycleController 与
Run
。NewNodeLifecycleController负责创建资源对象,Run负责启动,完成任务的执行。
NewNodeLifecycleController分析
NewNodeLifecycleController主要完成以下任务:
1、根据给定的配置构造Controller大结构体,完成部分参数的配置任务;
2、为 podInformer 、 nodeInformer 、 leaseInformer 以及 daemonSetInformer 配置相应的回调方法,包括 AddFunc 、 UpdateFunc 以及 DeleteFunc 。这样,当相应的Node发生变化时,关联的controller能够及时监听到,并调用相应的处理方法;
3、返回构造完的结构体。
在配置的时候有几个需要注意的变量,后面会经常用到。
runTaintManager:表示启动一个TaintManager去从Node上驱逐Pod;
useTaintBasedEvictions:通过给Node添加 TaintNodeNotReady 和 TaintNodeUnreachable 污点的方式替换之前的直接驱逐Pod的方式,通过流控删除Pod。主要为了防止Pod在某一时间点突然被大量驱逐;
taintNodeByCondition:通过Node的状态给Node添加相应的污点。
另外,与之前的版本不同的是,添加了leaseInformer,它的主要作用是用来判断Node的健康。
Run方法分析
Run方法主要包含以下方法,每个方法都是以单独的goroutine运行:
1、 go nc.taintManager.Run(stopCh) :TaintManager,主要完成Pod的驱逐任务;
2、 doNoScheduleTaintingPassWorker :完成NoSchedule的污点更新任务;
3、 doNoExecuteTaintingPass 、 doEvictionPass :完成NoExecute的污点更新任务;
4、 monitorNodeHealth :检查Node的状态,并且处理Node的增删改查等任务,同时也会处理Pod的驱逐工作。
代码如下
// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
klog.Infof("Starting node controller")
defer klog.Infof("Shutting down node controller")
if !controller.WaitForCacheSync("taint", stopCh, nc.leaseInformerSynced, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
return
}
if nc.runTaintManager {
go nc.taintManager.Run(stopCh)
}
if nc.taintNodeByCondition {
// Close node update queue to cleanup go routine.
defer nc.nodeUpdateQueue.ShutDown()
// Start workers to update NoSchedule taint for nodes.
for i := 0; i < scheduler.UpdateWorkerSize; i++ {
// Thanks to "workqueue", each worker just need to get item from queue, because
// the item is flagged when got from queue: if new event come, the new item will
// be re-queued until "Done", so no more than one worker handle the same item and
// no event missed.
go wait.Until(nc.doNoScheduleTaintingPassWorker, time.Second, stopCh)
}
}
if nc.useTaintBasedEvictions {
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, stopCh)
} else {
// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, stopCh)
}
// Incorporate the results of node health signal pushed from kubelet to master.
go wait.Until(func() {
if err := nc.monitorNodeHealth(); err != nil {
klog.Errorf("Error monitoring node health: %v", err)
}
}, nc.nodeMonitorPeriod, stopCh)
<-stopCh
}
复制代码
执行过程
NodeLifecycleController的执行过程主要就是各个goroutine对应的任务,一一分析。
TaintManager
TaintManager通过Run方法开始启动。在Run方法内,主要做了几个工作:
1、初始化 nodeUpdateChannels 和 podUpdateChannels ,大小为8个channel,后面可以并行处理;
2、启动两个goroutine,分别监听nodeUpdateQueue和podUpdateQueue的消息;
3、并行启动8个工作任务,处理监听到的nodeUpdate和podUpdate的消息。
// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
klog.V(0).Infof("Starting NoExecuteTaintManager")
for i := 0; i < UpdateWorkerSize; i++ {
tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize))
tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))
}
// Functions that are responsible for taking work items out of the workqueues and putting them
// into channels.
go func(stopCh <-chan struct{}) {
for {
item, shutdown := tc.nodeUpdateQueue.Get()
if shutdown {
break
}
nodeUpdate := item.(nodeUpdateItem)
hash := hash(nodeUpdate.nodeName, UpdateWorkerSize)
select {
case <-stopCh:
tc.nodeUpdateQueue.Done(item)
return
case tc.nodeUpdateChannels[hash] <- nodeUpdate:
// tc.nodeUpdateQueue.Done is called by the nodeUpdateChannels worker
}
}
}(stopCh)
go func(stopCh <-chan struct{}) {
for {
item, shutdown := tc.podUpdateQueue.Get()
if shutdown {
break
}
podUpdate := item.(podUpdateItem)
hash := hash(podUpdate.nodeName, UpdateWorkerSize)
select {
case <-stopCh:
tc.podUpdateQueue.Done(item)
return
case tc.podUpdateChannels[hash] <- podUpdate:
// tc.podUpdateQueue.Done is called by the podUpdateChannels worker
}
}
}(stopCh)
wg := sync.WaitGroup{}
wg.Add(UpdateWorkerSize)
for i := 0; i < UpdateWorkerSize; i++ {
go tc.worker(i, wg.Done, stopCh)
}
wg.Wait()
}
复制代码
在并行启动的work任务中,优先处理nodeUpdate的事件,等到nodeUpdate处理完成之后,再去处理podUpdate。处理nodeUpdate的方法对应 handleNodeUpdate ,podUpdate对应 handlePodUpdate 。
handleNodeUpdate 主要的作用就是通过监听到的nodeName获取node信息,通过node信息获取该node上对应的taints。然后对该node上所有的pod,依次执行 processPodOnNode 方法。方法如下:
func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate nodeUpdateItem) {
node, err := tc.getNode(nodeUpdate.nodeName)
if err != nil {
if apierrors.IsNotFound(err) {
// Delete
klog.V(4).Infof("Noticed node deletion: %#v", nodeUpdate.nodeName)
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
delete(tc.taintedNodes, nodeUpdate.nodeName)
return
}
utilruntime.HandleError(fmt.Errorf("cannot get node %s: %v", nodeUpdate.nodeName, err))
return
}
// Create or Update
klog.V(4).Infof("Noticed node update: %#v", nodeUpdate)
taints := getNoExecuteTaints(node.Spec.Taints)
func() {
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
klog.V(4).Infof("Updating known taints on node %v: %v", node.Name, taints)
if len(taints) == 0 {
delete(tc.taintedNodes, node.Name)
} else {
tc.taintedNodes[node.Name] = taints
}
}()
pods, err := getPodsAssignedToNode(tc.client, node.Name)
if err != nil {
klog.Errorf(err.Error())
return
}
if len(pods) == 0 {
return
}
// Short circuit, to make this controller a bit faster.
if len(taints) == 0 {
klog.V(4).Infof("All taints were removed from the Node %v. Cancelling all evictions...", node.Name)
for i := range pods {
tc.cancelWorkWithEvent(types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name})
}
return
}
now := time.Now()
for i := range pods {
pod := &pods[i]
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
tc.processPodOnNode(podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now)
}
}
复制代码
handlePodUpdate 通过获取到单一的pod信息与node信息,也是最终执行 processPodOnNode 方法。方法如下:
func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate podUpdateItem) {
pod, err := tc.getPod(podUpdate.podName, podUpdate.podNamespace)
if err != nil {
if apierrors.IsNotFound(err) {
// Delete
podNamespacedName := types.NamespacedName{Namespace: podUpdate.podNamespace, Name: podUpdate.podName}
klog.V(4).Infof("Noticed pod deletion: %#v", podNamespacedName)
tc.cancelWorkWithEvent(podNamespacedName)
return
}
utilruntime.HandleError(fmt.Errorf("could not get pod %s/%s: %v", podUpdate.podName, podUpdate.podNamespace, err))
return
}
// We key the workqueue and shard workers by nodeName. If we don't match the current state we should not be the one processing the current object.
if pod.Spec.NodeName != podUpdate.nodeName {
return
}
// Create or Update
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
klog.V(4).Infof("Noticed pod update: %#v", podNamespacedName)
nodeName := pod.Spec.NodeName
if nodeName == "" {
return
}
taints, ok := func() ([]v1.Taint, bool) {
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
taints, ok := tc.taintedNodes[nodeName]
return taints, ok
}()
// It's possible that Node was deleted, or Taints were removed before, which triggered
// eviction cancelling if it was needed.
if !ok {
return
}
tc.processPodOnNode(podNamespacedName, nodeName, pod.Spec.Tolerations, taints, time.Now())
}
复制代码
processPodOnNode 方法主要将需要删除的Pod按照预定好的格式添加到 taintEvictionQueue ,该queue内的任务都是设置好定时任务时间的,在相应的时间内调用 deletePodHandler 方法去删除pod,该方法位于 pkg/controller/nodelifecycle/scheduler/taint_manager.go 下。方法如下:
func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName)) func(args *WorkArgs) error {
return func(args *WorkArgs) error {
ns := args.NamespacedName.Namespace
name := args.NamespacedName.Name
klog.V(0).Infof("NoExecuteTaintManager is deleting Pod: %v", args.NamespacedName.String())
if emitEventFunc != nil {
emitEventFunc(args.NamespacedName)
}
var err error
for i := 0; i < retries; i++ {
err = c.CoreV1().Pods(ns).Delete(name, &metav1.DeleteOptions{})
if err == nil {
break
}
time.Sleep(10 * time.Millisecond)
}
return err
}
}
复制代码
所以,TaintManager的主要作用就是将需要驱逐的Pod配置好定时删除的任务,然后从相应的Node上一一删除。
doNoScheduleTaintingPassWorker
当开启taintNodeByCondition特性的时候,则会调用 doNoScheduleTaintingPassWorker 去对Node做 NoSchedule的 污点更新。调用的是 doNoScheduleTaintingPass 方法。方法如下:
func (nc *Controller) doNoScheduleTaintingPass(nodeName string) error {
node, err := nc.nodeLister.Get(nodeName)
if err != nil {
// If node not found, just ignore it.
if apierrors.IsNotFound(err) {
return nil
}
return err
}
// Map node's condition to Taints.
var taints []v1.Taint
for _, condition := range node.Status.Conditions {
if taintMap, found := nodeConditionToTaintKeyStatusMap[condition.Type]; found {
if taintKey, found := taintMap[condition.Status]; found {
taints = append(taints, v1.Taint{
Key: taintKey,
Effect: v1.TaintEffectNoSchedule,
})
}
}
}
if node.Spec.Unschedulable {
// If unschedulable, append related taint.
taints = append(taints, v1.Taint{
Key: schedulerapi.TaintNodeUnschedulable,
Effect: v1.TaintEffectNoSchedule,
})
}
// Get exist taints of node.
nodeTaints := taintutils.TaintSetFilter(node.Spec.Taints, func(t *v1.Taint) bool {
// only NoSchedule taints are candidates to be compared with "taints" later
if t.Effect != v1.TaintEffectNoSchedule {
return false
}
// Find unschedulable taint of node.
if t.Key == schedulerapi.TaintNodeUnschedulable {
return true
}
// Find node condition taints of node.
_, found := taintKeyToNodeConditionMap[t.Key]
return found
})
taintsToAdd, taintsToDel := taintutils.TaintSetDiff(taints, nodeTaints)
// If nothing to add not delete, return true directly.
if len(taintsToAdd) == 0 && len(taintsToDel) == 0 {
return nil
}
if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) {
return fmt.Errorf("failed to swap taints of node %+v", node)
}
return nil
}
复制代码
doNoScheduleTaintingPass 主要做了以下工作:
1、根据nodeName获取node信息;
2、根据node.Status.Conditions字段,判断node是否需要添加NoSchedule污点,判断的标准如下:
只要处于任一状态,即需要添加NoSchedule污点;
3、如果Node为Unschedulable状态,同样添加NoSchedule的污点;
4、根据Node上已有的污点,判断之前添加的哪些污点是需要添加的,哪些是需要删除的;
5、调用SwapNodeControllerTaint
对Node进行污点的状态更新。
doNoExecuteTaintingPass与doEvictionPass
doNoExecuteTaintingPass 和 doEvictionPass 两者只会执行其一。
当开启useTaintBasedEvictions特性的时候,调用 doNoExecuteTaintingPass 方法为Node添加 NoExecute 污点;而 doEvictionPass 则是直接判断哪些Pod需要驱逐,直接去做删除工作。
doNoExecuteTaintingPass 方法中,通过获取
zoneNoExecuteTainter 内的数据对Node状态进行判断,如果需要则添加上NoExecute污点,并调用
SwapNodeControllerTaint 方法更新该Node上的污点。zoneNoExecuteTainter的信息是通过
monitorNodeHealth 方法获取到的,后面再分析。
doNoExecuteTaintingPass
的方法如下:
func (nc *Controller) doNoExecuteTaintingPass() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
for k := range nc.zoneNoExecuteTainter {
// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
nc.zoneNoExecuteTainter[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
node, err := nc.nodeLister.Get(value.Value)
if apierrors.IsNotFound(err) {
klog.Warningf("Node %v no longer present in nodeLister!", value.Value)
return true, 0
} else if err != nil {
klog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
// retry in 50 millisecond
return false, 50 * time.Millisecond
} else {
zone := utilnode.GetZoneKey(node)
evictionsNumber.WithLabelValues(zone).Inc()
}
_, condition := v1node.GetNodeCondition(&node.Status, v1.NodeReady)
// Because we want to mimic NodeStatus.Condition["Ready"] we make "unreachable" and "not ready" taints mutually exclusive.
taintToAdd := v1.Taint{}
oppositeTaint := v1.Taint{}
if condition.Status == v1.ConditionFalse {
taintToAdd = *NotReadyTaintTemplate
oppositeTaint = *UnreachableTaintTemplate
} else if condition.Status == v1.ConditionUnknown {
taintToAdd = *UnreachableTaintTemplate
oppositeTaint = *NotReadyTaintTemplate
} else {
// It seems that the Node is ready again, so there's no need to taint it.
klog.V(4).Infof("Node %v was in a taint queue, but it's ready now. Ignoring taint request.", value.Value)
return true, 0
}
return nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{&oppositeTaint}, node), 0
})
}
}
复制代码
doEvictionPass 则是直接通过获取 zonePodEvictor 内的数据,判断哪些Pod需要被驱除,则直接调用Pod的DELETE接口,完成Pod的驱逐任务。zonePodEvictor的信息也是通过 monitorNodeHealth 方法获取到的。 doEvictionPass 方法如下:
func (nc *Controller) doEvictionPass() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
for k := range nc.zonePodEvictor {
// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
nc.zonePodEvictor[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
node, err := nc.nodeLister.Get(value.Value)
if apierrors.IsNotFound(err) {
klog.Warningf("Node %v no longer present in nodeLister!", value.Value)
} else if err != nil {
klog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
} else {
zone := utilnode.GetZoneKey(node)
evictionsNumber.WithLabelValues(zone).Inc()
}
nodeUID, _ := value.UID.(string)
remaining, err := nodeutil.DeletePods(nc.kubeClient, nc.recorder, value.Value, nodeUID, nc.daemonSetStore)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0
}
if remaining {
klog.Infof("Pods awaiting deletion due to Controller eviction")
}
return true, 0
})
}
}
复制代码
两种方法的不同在于, doNoExecuteTaintingPass 只是对Node打上污点,而 doEvictionPass 则是完成了最终的删除工作。 doEvictionPass 的这种方式会导致某一个时间段内,大量的Pod需要被删除,会产生很大的流量;而 doNoExecuteTaintingPass 通过给Node打上污点,让TaintManager去做最终的Pod删除工作,TaintManager的删除任务是分时间段定时执行的,所以不会产生这种大流量的问题。因此建议开启这个特性,在kube-controller-manager的启动参数加上 --feature-gates=TaintBasedEvictions=true 即可。
monitorNodeHealth
前面几个goroutine的任务主要围绕着Taint来展开,而 monitorNodeHealth 则是定时更新Node的信息,并产生数据的来源。
未完待续。。。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 以太坊源码分析(36)ethdb源码分析
- [源码分析] kubelet源码分析(一)之 NewKubeletCommand
- libmodbus源码分析(3)从机(服务端)功能源码分析
- [源码分析] nfs-client-provisioner源码分析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- Spring事务源码分析专题(一)JdbcTemplate使用及源码分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Adobe Dreamweaver CS5中文版经典教程
Adobe公司 / 陈宗斌 / 人民邮电 / 2011-1 / 45.00元
《Adobe Dreamweaver CS5中文版经典教程》由Adobe公司的专家编写,是AdobeDreamweavelCS5软件的官方指定培训教材。全书共分为17课,每一课先介绍重要的知识点,然后借助具体的示例进行讲解,步骤详细、重点明确,手把手教你如何进行实际操作。全书是一个有机的整体,它涵盖了Dreamweavercs5的基础知识、HTML基础、CSS基础、创建页面布局、使用层叠样式表、使......一起来看看 《Adobe Dreamweaver CS5中文版经典教程》 这本书的介绍吧!
HTML 压缩/解压工具
在线压缩/解压 HTML 代码
Base64 编码/解码
Base64 编码/解码