内容简介:之前在分析controller-manager中说到,controller对于每个controller的控制格式基本一致,都是以。NewNodeLifecycleController负责创建资源对象,Run负责启动,完成任务的执行。NewNodeLifecycleController主要完成以下任务:
之前在分析controller-manager中说到,controller对于每个controller的控制格式基本一致,都是以 start***Controller
的方式封装成一个独立的方法,NodeController也不例外。在1.13.4的版本中,Node的控制器分成了两种(很早之前的版本只有一种),分别是 NodeIpamController 与 NodeLifecycleController 。其中,NodeIpamController主要处理Node的IPAM地址相关,NodeLifecycleController处理Node的整个生命周期,本文主要分析NodeLifecycleController。
startNodeLifecycleController
方法开始它的生命周期的管理流程。主要关注两个方法:
NewNodeLifecycleController 与
Run
。NewNodeLifecycleController负责创建资源对象,Run负责启动,完成任务的执行。
NewNodeLifecycleController分析
NewNodeLifecycleController主要完成以下任务:
1、根据给定的配置构造Controller大结构体,完成部分参数的配置任务;
2、为 podInformer
、 nodeInformer
、 leaseInformer
以及 daemonSetInformer
配置相应的回调方法,包括 AddFunc
、 UpdateFunc
以及 DeleteFunc
。这样,当相应的Node发生变化时,关联的controller能够及时监听到,并调用相应的处理方法;
3、返回构造完的结构体。
在配置的时候有几个需要注意的变量,后面会经常用到。
runTaintManager:表示启动一个TaintManager去从Node上驱逐Pod;
useTaintBasedEvictions:通过给Node添加 TaintNodeNotReady
和 TaintNodeUnreachable
污点的方式替换之前的直接驱逐Pod的方式,通过流控删除Pod。主要为了防止Pod在某一时间点突然被大量驱逐;
taintNodeByCondition:通过Node的状态给Node添加相应的污点。
另外,与之前的版本不同的是,添加了leaseInformer,它的主要作用是用来判断Node的健康。
Run方法分析
Run方法主要包含以下方法,每个方法都是以单独的goroutine运行:
1、 go nc.taintManager.Run(stopCh)
:TaintManager,主要完成Pod的驱逐任务;
2、 doNoScheduleTaintingPassWorker
:完成NoSchedule的污点更新任务;
3、 doNoExecuteTaintingPass
、 doEvictionPass
:完成NoExecute的污点更新任务;
4、 monitorNodeHealth
:检查Node的状态,并且处理Node的增删改查等任务,同时也会处理Pod的驱逐工作。
代码如下
// Run starts an asynchronous loop that monitors the status of cluster nodes. func (nc *Controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() klog.Infof("Starting node controller") defer klog.Infof("Shutting down node controller") if !controller.WaitForCacheSync("taint", stopCh, nc.leaseInformerSynced, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) { return } if nc.runTaintManager { go nc.taintManager.Run(stopCh) } if nc.taintNodeByCondition { // Close node update queue to cleanup go routine. defer nc.nodeUpdateQueue.ShutDown() // Start workers to update NoSchedule taint for nodes. for i := 0; i < scheduler.UpdateWorkerSize; i++ { // Thanks to "workqueue", each worker just need to get item from queue, because // the item is flagged when got from queue: if new event come, the new item will // be re-queued until "Done", so no more than one worker handle the same item and // no event missed. go wait.Until(nc.doNoScheduleTaintingPassWorker, time.Second, stopCh) } } if nc.useTaintBasedEvictions { // Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated // taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints. go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, stopCh) } else { // Managing eviction of nodes: // When we delete pods off a node, if the node was not empty at the time we then // queue an eviction watcher. If we hit an error, retry deletion. go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, stopCh) } // Incorporate the results of node health signal pushed from kubelet to master. go wait.Until(func() { if err := nc.monitorNodeHealth(); err != nil { klog.Errorf("Error monitoring node health: %v", err) } }, nc.nodeMonitorPeriod, stopCh) <-stopCh } 复制代码
执行过程
NodeLifecycleController的执行过程主要就是各个goroutine对应的任务,一一分析。
TaintManager
TaintManager通过Run方法开始启动。在Run方法内,主要做了几个工作:
1、初始化 nodeUpdateChannels
和 podUpdateChannels
,大小为8个channel,后面可以并行处理;
2、启动两个goroutine,分别监听nodeUpdateQueue和podUpdateQueue的消息;
3、并行启动8个工作任务,处理监听到的nodeUpdate和podUpdate的消息。
// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed. func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) { klog.V(0).Infof("Starting NoExecuteTaintManager") for i := 0; i < UpdateWorkerSize; i++ { tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize)) tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize)) } // Functions that are responsible for taking work items out of the workqueues and putting them // into channels. go func(stopCh <-chan struct{}) { for { item, shutdown := tc.nodeUpdateQueue.Get() if shutdown { break } nodeUpdate := item.(nodeUpdateItem) hash := hash(nodeUpdate.nodeName, UpdateWorkerSize) select { case <-stopCh: tc.nodeUpdateQueue.Done(item) return case tc.nodeUpdateChannels[hash] <- nodeUpdate: // tc.nodeUpdateQueue.Done is called by the nodeUpdateChannels worker } } }(stopCh) go func(stopCh <-chan struct{}) { for { item, shutdown := tc.podUpdateQueue.Get() if shutdown { break } podUpdate := item.(podUpdateItem) hash := hash(podUpdate.nodeName, UpdateWorkerSize) select { case <-stopCh: tc.podUpdateQueue.Done(item) return case tc.podUpdateChannels[hash] <- podUpdate: // tc.podUpdateQueue.Done is called by the podUpdateChannels worker } } }(stopCh) wg := sync.WaitGroup{} wg.Add(UpdateWorkerSize) for i := 0; i < UpdateWorkerSize; i++ { go tc.worker(i, wg.Done, stopCh) } wg.Wait() } 复制代码
在并行启动的work任务中,优先处理nodeUpdate的事件,等到nodeUpdate处理完成之后,再去处理podUpdate。处理nodeUpdate的方法对应 handleNodeUpdate
,podUpdate对应 handlePodUpdate
。
handleNodeUpdate
主要的作用就是通过监听到的nodeName获取node信息,通过node信息获取该node上对应的taints。然后对该node上所有的pod,依次执行 processPodOnNode
方法。方法如下:
func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate nodeUpdateItem) { node, err := tc.getNode(nodeUpdate.nodeName) if err != nil { if apierrors.IsNotFound(err) { // Delete klog.V(4).Infof("Noticed node deletion: %#v", nodeUpdate.nodeName) tc.taintedNodesLock.Lock() defer tc.taintedNodesLock.Unlock() delete(tc.taintedNodes, nodeUpdate.nodeName) return } utilruntime.HandleError(fmt.Errorf("cannot get node %s: %v", nodeUpdate.nodeName, err)) return } // Create or Update klog.V(4).Infof("Noticed node update: %#v", nodeUpdate) taints := getNoExecuteTaints(node.Spec.Taints) func() { tc.taintedNodesLock.Lock() defer tc.taintedNodesLock.Unlock() klog.V(4).Infof("Updating known taints on node %v: %v", node.Name, taints) if len(taints) == 0 { delete(tc.taintedNodes, node.Name) } else { tc.taintedNodes[node.Name] = taints } }() pods, err := getPodsAssignedToNode(tc.client, node.Name) if err != nil { klog.Errorf(err.Error()) return } if len(pods) == 0 { return } // Short circuit, to make this controller a bit faster. if len(taints) == 0 { klog.V(4).Infof("All taints were removed from the Node %v. Cancelling all evictions...", node.Name) for i := range pods { tc.cancelWorkWithEvent(types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name}) } return } now := time.Now() for i := range pods { pod := &pods[i] podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} tc.processPodOnNode(podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now) } } 复制代码
handlePodUpdate
通过获取到单一的pod信息与node信息,也是最终执行 processPodOnNode
方法。方法如下:
func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate podUpdateItem) { pod, err := tc.getPod(podUpdate.podName, podUpdate.podNamespace) if err != nil { if apierrors.IsNotFound(err) { // Delete podNamespacedName := types.NamespacedName{Namespace: podUpdate.podNamespace, Name: podUpdate.podName} klog.V(4).Infof("Noticed pod deletion: %#v", podNamespacedName) tc.cancelWorkWithEvent(podNamespacedName) return } utilruntime.HandleError(fmt.Errorf("could not get pod %s/%s: %v", podUpdate.podName, podUpdate.podNamespace, err)) return } // We key the workqueue and shard workers by nodeName. If we don't match the current state we should not be the one processing the current object. if pod.Spec.NodeName != podUpdate.nodeName { return } // Create or Update podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} klog.V(4).Infof("Noticed pod update: %#v", podNamespacedName) nodeName := pod.Spec.NodeName if nodeName == "" { return } taints, ok := func() ([]v1.Taint, bool) { tc.taintedNodesLock.Lock() defer tc.taintedNodesLock.Unlock() taints, ok := tc.taintedNodes[nodeName] return taints, ok }() // It's possible that Node was deleted, or Taints were removed before, which triggered // eviction cancelling if it was needed. if !ok { return } tc.processPodOnNode(podNamespacedName, nodeName, pod.Spec.Tolerations, taints, time.Now()) } 复制代码
processPodOnNode
方法主要将需要删除的Pod按照预定好的格式添加到 taintEvictionQueue
,该queue内的任务都是设置好定时任务时间的,在相应的时间内调用 deletePodHandler
方法去删除pod,该方法位于 pkg/controller/nodelifecycle/scheduler/taint_manager.go
下。方法如下:
func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName)) func(args *WorkArgs) error { return func(args *WorkArgs) error { ns := args.NamespacedName.Namespace name := args.NamespacedName.Name klog.V(0).Infof("NoExecuteTaintManager is deleting Pod: %v", args.NamespacedName.String()) if emitEventFunc != nil { emitEventFunc(args.NamespacedName) } var err error for i := 0; i < retries; i++ { err = c.CoreV1().Pods(ns).Delete(name, &metav1.DeleteOptions{}) if err == nil { break } time.Sleep(10 * time.Millisecond) } return err } } 复制代码
所以,TaintManager的主要作用就是将需要驱逐的Pod配置好定时删除的任务,然后从相应的Node上一一删除。
doNoScheduleTaintingPassWorker
当开启taintNodeByCondition特性的时候,则会调用 doNoScheduleTaintingPassWorker
去对Node做 NoSchedule的 污点更新。调用的是 doNoScheduleTaintingPass
方法。方法如下:
func (nc *Controller) doNoScheduleTaintingPass(nodeName string) error { node, err := nc.nodeLister.Get(nodeName) if err != nil { // If node not found, just ignore it. if apierrors.IsNotFound(err) { return nil } return err } // Map node's condition to Taints. var taints []v1.Taint for _, condition := range node.Status.Conditions { if taintMap, found := nodeConditionToTaintKeyStatusMap[condition.Type]; found { if taintKey, found := taintMap[condition.Status]; found { taints = append(taints, v1.Taint{ Key: taintKey, Effect: v1.TaintEffectNoSchedule, }) } } } if node.Spec.Unschedulable { // If unschedulable, append related taint. taints = append(taints, v1.Taint{ Key: schedulerapi.TaintNodeUnschedulable, Effect: v1.TaintEffectNoSchedule, }) } // Get exist taints of node. nodeTaints := taintutils.TaintSetFilter(node.Spec.Taints, func(t *v1.Taint) bool { // only NoSchedule taints are candidates to be compared with "taints" later if t.Effect != v1.TaintEffectNoSchedule { return false } // Find unschedulable taint of node. if t.Key == schedulerapi.TaintNodeUnschedulable { return true } // Find node condition taints of node. _, found := taintKeyToNodeConditionMap[t.Key] return found }) taintsToAdd, taintsToDel := taintutils.TaintSetDiff(taints, nodeTaints) // If nothing to add not delete, return true directly. if len(taintsToAdd) == 0 && len(taintsToDel) == 0 { return nil } if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) { return fmt.Errorf("failed to swap taints of node %+v", node) } return nil } 复制代码
doNoScheduleTaintingPass
主要做了以下工作:
1、根据nodeName获取node信息;
2、根据node.Status.Conditions字段,判断node是否需要添加NoSchedule污点,判断的标准如下:
只要处于任一状态,即需要添加NoSchedule污点;
3、如果Node为Unschedulable状态,同样添加NoSchedule的污点;
4、根据Node上已有的污点,判断之前添加的哪些污点是需要添加的,哪些是需要删除的;
5、调用SwapNodeControllerTaint
对Node进行污点的状态更新。
doNoExecuteTaintingPass与doEvictionPass
doNoExecuteTaintingPass
和 doEvictionPass
两者只会执行其一。
当开启useTaintBasedEvictions特性的时候,调用 doNoExecuteTaintingPass
方法为Node添加 NoExecute 污点;而 doEvictionPass
则是直接判断哪些Pod需要驱逐,直接去做删除工作。
doNoExecuteTaintingPass
方法中,通过获取
zoneNoExecuteTainter 内的数据对Node状态进行判断,如果需要则添加上NoExecute污点,并调用
SwapNodeControllerTaint
方法更新该Node上的污点。zoneNoExecuteTainter的信息是通过
monitorNodeHealth
方法获取到的,后面再分析。
doNoExecuteTaintingPass
的方法如下:
func (nc *Controller) doNoExecuteTaintingPass() { nc.evictorLock.Lock() defer nc.evictorLock.Unlock() for k := range nc.zoneNoExecuteTainter { // Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded). nc.zoneNoExecuteTainter[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) { node, err := nc.nodeLister.Get(value.Value) if apierrors.IsNotFound(err) { klog.Warningf("Node %v no longer present in nodeLister!", value.Value) return true, 0 } else if err != nil { klog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err) // retry in 50 millisecond return false, 50 * time.Millisecond } else { zone := utilnode.GetZoneKey(node) evictionsNumber.WithLabelValues(zone).Inc() } _, condition := v1node.GetNodeCondition(&node.Status, v1.NodeReady) // Because we want to mimic NodeStatus.Condition["Ready"] we make "unreachable" and "not ready" taints mutually exclusive. taintToAdd := v1.Taint{} oppositeTaint := v1.Taint{} if condition.Status == v1.ConditionFalse { taintToAdd = *NotReadyTaintTemplate oppositeTaint = *UnreachableTaintTemplate } else if condition.Status == v1.ConditionUnknown { taintToAdd = *UnreachableTaintTemplate oppositeTaint = *NotReadyTaintTemplate } else { // It seems that the Node is ready again, so there's no need to taint it. klog.V(4).Infof("Node %v was in a taint queue, but it's ready now. Ignoring taint request.", value.Value) return true, 0 } return nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{&oppositeTaint}, node), 0 }) } } 复制代码
doEvictionPass
则是直接通过获取 zonePodEvictor 内的数据,判断哪些Pod需要被驱除,则直接调用Pod的DELETE接口,完成Pod的驱逐任务。zonePodEvictor的信息也是通过 monitorNodeHealth
方法获取到的。 doEvictionPass
方法如下:
func (nc *Controller) doEvictionPass() { nc.evictorLock.Lock() defer nc.evictorLock.Unlock() for k := range nc.zonePodEvictor { // Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded). nc.zonePodEvictor[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) { node, err := nc.nodeLister.Get(value.Value) if apierrors.IsNotFound(err) { klog.Warningf("Node %v no longer present in nodeLister!", value.Value) } else if err != nil { klog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err) } else { zone := utilnode.GetZoneKey(node) evictionsNumber.WithLabelValues(zone).Inc() } nodeUID, _ := value.UID.(string) remaining, err := nodeutil.DeletePods(nc.kubeClient, nc.recorder, value.Value, nodeUID, nc.daemonSetStore) if err != nil { utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err)) return false, 0 } if remaining { klog.Infof("Pods awaiting deletion due to Controller eviction") } return true, 0 }) } } 复制代码
两种方法的不同在于, doNoExecuteTaintingPass
只是对Node打上污点,而 doEvictionPass
则是完成了最终的删除工作。 doEvictionPass
的这种方式会导致某一个时间段内,大量的Pod需要被删除,会产生很大的流量;而 doNoExecuteTaintingPass
通过给Node打上污点,让TaintManager去做最终的Pod删除工作,TaintManager的删除任务是分时间段定时执行的,所以不会产生这种大流量的问题。因此建议开启这个特性,在kube-controller-manager的启动参数加上 --feature-gates=TaintBasedEvictions=true
即可。
monitorNodeHealth
前面几个goroutine的任务主要围绕着Taint来展开,而 monitorNodeHealth
则是定时更新Node的信息,并产生数据的来源。
未完待续。。。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 以太坊源码分析(36)ethdb源码分析
- [源码分析] kubelet源码分析(一)之 NewKubeletCommand
- libmodbus源码分析(3)从机(服务端)功能源码分析
- [源码分析] nfs-client-provisioner源码分析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- Spring事务源码分析专题(一)JdbcTemplate使用及源码分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
JavaScript精粹
爱德华兹 / 高铁军 / 人民邮电出版社 / 2007-6 / 49.00元
《JavaScript 精粹》主要介绍JavaScript应用中一些常见的问题及其解决方法,从最基础的数字、字符串、数组到进阶的DOM、表单验证、cookie,再到较为高级的Ajax,书中均有涉及。《JavaScript 精粹》覆盖现在非常流行和通用的技术,提出很多出现频率较高的Web开发常见问题,并提供了大量的技巧和解决方案,具有很强的实用性和通用性,书中的代码也具有很强的兼容性。《JavaSc......一起来看看 《JavaScript精粹》 这本书的介绍吧!
Base64 编码/解码
Base64 编码/解码
RGB HSV 转换
RGB HSV 互转工具