Kubernetes源码分析之Node Controller

栏目: 编程工具 · 发布时间: 5年前

内容简介:之前在分析controller-manager中说到,controller对于每个controller的控制格式基本一致,都是以。NewNodeLifecycleController负责创建资源对象,Run负责启动,完成任务的执行。NewNodeLifecycleController主要完成以下任务:

之前在分析controller-manager中说到,controller对于每个controller的控制格式基本一致,都是以 start***Controller 的方式封装成一个独立的方法,NodeController也不例外。在1.13.4的版本中,Node的控制器分成了两种(很早之前的版本只有一种),分别是 NodeIpamControllerNodeLifecycleController 。其中,NodeIpamController主要处理Node的IPAM地址相关,NodeLifecycleController处理Node的整个生命周期,本文主要分析NodeLifecycleController。

Kubernetes源码分析之Node Controller
如图,NodeLifecycleController以 startNodeLifecycleController 方法开始它的生命周期的管理流程。主要关注两个方法: NewNodeLifecycleControllerRun

。NewNodeLifecycleController负责创建资源对象,Run负责启动,完成任务的执行。

NewNodeLifecycleController分析

NewNodeLifecycleController主要完成以下任务:

1、根据给定的配置构造Controller大结构体,完成部分参数的配置任务;

2、为 podInformernodeInformerleaseInformer 以及 daemonSetInformer 配置相应的回调方法,包括 AddFuncUpdateFunc 以及 DeleteFunc 。这样,当相应的Node发生变化时,关联的controller能够及时监听到,并调用相应的处理方法;

3、返回构造完的结构体。

在配置的时候有几个需要注意的变量,后面会经常用到。

Kubernetes源码分析之Node Controller

runTaintManager:表示启动一个TaintManager去从Node上驱逐Pod;

useTaintBasedEvictions:通过给Node添加 TaintNodeNotReadyTaintNodeUnreachable 污点的方式替换之前的直接驱逐Pod的方式,通过流控删除Pod。主要为了防止Pod在某一时间点突然被大量驱逐;

taintNodeByCondition:通过Node的状态给Node添加相应的污点。

另外,与之前的版本不同的是,添加了leaseInformer,它的主要作用是用来判断Node的健康。

Kubernetes源码分析之Node Controller

Run方法分析

Run方法主要包含以下方法,每个方法都是以单独的goroutine运行:

1、 go nc.taintManager.Run(stopCh) :TaintManager,主要完成Pod的驱逐任务;

2、 doNoScheduleTaintingPassWorker :完成NoSchedule的污点更新任务;

3、 doNoExecuteTaintingPassdoEvictionPass :完成NoExecute的污点更新任务;

4、 monitorNodeHealth :检查Node的状态,并且处理Node的增删改查等任务,同时也会处理Pod的驱逐工作。

代码如下

// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *Controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()

	klog.Infof("Starting node controller")
	defer klog.Infof("Shutting down node controller")

	if !controller.WaitForCacheSync("taint", stopCh, nc.leaseInformerSynced, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
		return
	}

	if nc.runTaintManager {
		go nc.taintManager.Run(stopCh)
	}

	if nc.taintNodeByCondition {
		// Close node update queue to cleanup go routine.
		defer nc.nodeUpdateQueue.ShutDown()

		// Start workers to update NoSchedule taint for nodes.
		for i := 0; i < scheduler.UpdateWorkerSize; i++ {
			// Thanks to "workqueue", each worker just need to get item from queue, because
			// the item is flagged when got from queue: if new event come, the new item will
			// be re-queued until "Done", so no more than one worker handle the same item and
			// no event missed.
			go wait.Until(nc.doNoScheduleTaintingPassWorker, time.Second, stopCh)
		}
	}

	if nc.useTaintBasedEvictions {
		// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
		// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
		go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, stopCh)
	} else {
		// Managing eviction of nodes:
		// When we delete pods off a node, if the node was not empty at the time we then
		// queue an eviction watcher. If we hit an error, retry deletion.
		go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, stopCh)
	}

	// Incorporate the results of node health signal pushed from kubelet to master.
	go wait.Until(func() {
		if err := nc.monitorNodeHealth(); err != nil {
			klog.Errorf("Error monitoring node health: %v", err)
		}
	}, nc.nodeMonitorPeriod, stopCh)

	<-stopCh
}
复制代码

执行过程

NodeLifecycleController的执行过程主要就是各个goroutine对应的任务,一一分析。

TaintManager

TaintManager通过Run方法开始启动。在Run方法内,主要做了几个工作:

1、初始化 nodeUpdateChannelspodUpdateChannels ,大小为8个channel,后面可以并行处理;

2、启动两个goroutine,分别监听nodeUpdateQueue和podUpdateQueue的消息;

3、并行启动8个工作任务,处理监听到的nodeUpdate和podUpdate的消息。

// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
	klog.V(0).Infof("Starting NoExecuteTaintManager")

	for i := 0; i < UpdateWorkerSize; i++ {
		tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize))
		tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))
	}

	// Functions that are responsible for taking work items out of the workqueues and putting them
	// into channels.
	go func(stopCh <-chan struct{}) {
		for {
			item, shutdown := tc.nodeUpdateQueue.Get()
			if shutdown {
				break
			}
			nodeUpdate := item.(nodeUpdateItem)
			hash := hash(nodeUpdate.nodeName, UpdateWorkerSize)
			select {
			case <-stopCh:
				tc.nodeUpdateQueue.Done(item)
				return
			case tc.nodeUpdateChannels[hash] <- nodeUpdate:
				// tc.nodeUpdateQueue.Done is called by the nodeUpdateChannels worker
			}
		}
	}(stopCh)

	go func(stopCh <-chan struct{}) {
		for {
			item, shutdown := tc.podUpdateQueue.Get()
			if shutdown {
				break
			}
			podUpdate := item.(podUpdateItem)
			hash := hash(podUpdate.nodeName, UpdateWorkerSize)
			select {
			case <-stopCh:
				tc.podUpdateQueue.Done(item)
				return
			case tc.podUpdateChannels[hash] <- podUpdate:
				// tc.podUpdateQueue.Done is called by the podUpdateChannels worker
			}
		}
	}(stopCh)

	wg := sync.WaitGroup{}
	wg.Add(UpdateWorkerSize)
	for i := 0; i < UpdateWorkerSize; i++ {
		go tc.worker(i, wg.Done, stopCh)
	}
	wg.Wait()
}
复制代码

在并行启动的work任务中,优先处理nodeUpdate的事件,等到nodeUpdate处理完成之后,再去处理podUpdate。处理nodeUpdate的方法对应 handleNodeUpdate ,podUpdate对应 handlePodUpdate

handleNodeUpdate 主要的作用就是通过监听到的nodeName获取node信息,通过node信息获取该node上对应的taints。然后对该node上所有的pod,依次执行 processPodOnNode 方法。方法如下:

func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate nodeUpdateItem) {
	node, err := tc.getNode(nodeUpdate.nodeName)
	if err != nil {
		if apierrors.IsNotFound(err) {
			// Delete
			klog.V(4).Infof("Noticed node deletion: %#v", nodeUpdate.nodeName)
			tc.taintedNodesLock.Lock()
			defer tc.taintedNodesLock.Unlock()
			delete(tc.taintedNodes, nodeUpdate.nodeName)
			return
		}
		utilruntime.HandleError(fmt.Errorf("cannot get node %s: %v", nodeUpdate.nodeName, err))
		return
	}

	// Create or Update
	klog.V(4).Infof("Noticed node update: %#v", nodeUpdate)
	taints := getNoExecuteTaints(node.Spec.Taints)
	func() {
		tc.taintedNodesLock.Lock()
		defer tc.taintedNodesLock.Unlock()
		klog.V(4).Infof("Updating known taints on node %v: %v", node.Name, taints)
		if len(taints) == 0 {
			delete(tc.taintedNodes, node.Name)
		} else {
			tc.taintedNodes[node.Name] = taints
		}
	}()
	pods, err := getPodsAssignedToNode(tc.client, node.Name)
	if err != nil {
		klog.Errorf(err.Error())
		return
	}
	if len(pods) == 0 {
		return
	}
	// Short circuit, to make this controller a bit faster.
	if len(taints) == 0 {
		klog.V(4).Infof("All taints were removed from the Node %v. Cancelling all evictions...", node.Name)
		for i := range pods {
			tc.cancelWorkWithEvent(types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name})
		}
		return
	}

	now := time.Now()
	for i := range pods {
		pod := &pods[i]
		podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
		tc.processPodOnNode(podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now)
	}
}
复制代码

handlePodUpdate 通过获取到单一的pod信息与node信息,也是最终执行 processPodOnNode 方法。方法如下:

func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate podUpdateItem) {
	pod, err := tc.getPod(podUpdate.podName, podUpdate.podNamespace)
	if err != nil {
		if apierrors.IsNotFound(err) {
			// Delete
			podNamespacedName := types.NamespacedName{Namespace: podUpdate.podNamespace, Name: podUpdate.podName}
			klog.V(4).Infof("Noticed pod deletion: %#v", podNamespacedName)
			tc.cancelWorkWithEvent(podNamespacedName)
			return
		}
		utilruntime.HandleError(fmt.Errorf("could not get pod %s/%s: %v", podUpdate.podName, podUpdate.podNamespace, err))
		return
	}

	// We key the workqueue and shard workers by nodeName. If we don't match the current state we should not be the one processing the current object.
	if pod.Spec.NodeName != podUpdate.nodeName {
		return
	}

	// Create or Update
	podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
	klog.V(4).Infof("Noticed pod update: %#v", podNamespacedName)
	nodeName := pod.Spec.NodeName
	if nodeName == "" {
		return
	}
	taints, ok := func() ([]v1.Taint, bool) {
		tc.taintedNodesLock.Lock()
		defer tc.taintedNodesLock.Unlock()
		taints, ok := tc.taintedNodes[nodeName]
		return taints, ok
	}()
	// It's possible that Node was deleted, or Taints were removed before, which triggered
	// eviction cancelling if it was needed.
	if !ok {
		return
	}
	tc.processPodOnNode(podNamespacedName, nodeName, pod.Spec.Tolerations, taints, time.Now())
}
复制代码

processPodOnNode 方法主要将需要删除的Pod按照预定好的格式添加到 taintEvictionQueue ,该queue内的任务都是设置好定时任务时间的,在相应的时间内调用 deletePodHandler 方法去删除pod,该方法位于 pkg/controller/nodelifecycle/scheduler/taint_manager.go 下。方法如下:

func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName)) func(args *WorkArgs) error {
	return func(args *WorkArgs) error {
		ns := args.NamespacedName.Namespace
		name := args.NamespacedName.Name
		klog.V(0).Infof("NoExecuteTaintManager is deleting Pod: %v", args.NamespacedName.String())
		if emitEventFunc != nil {
			emitEventFunc(args.NamespacedName)
		}
		var err error
		for i := 0; i < retries; i++ {
			err = c.CoreV1().Pods(ns).Delete(name, &metav1.DeleteOptions{})
			if err == nil {
				break
			}
			time.Sleep(10 * time.Millisecond)
		}
		return err
	}
}
复制代码

所以,TaintManager的主要作用就是将需要驱逐的Pod配置好定时删除的任务,然后从相应的Node上一一删除。

doNoScheduleTaintingPassWorker

当开启taintNodeByCondition特性的时候,则会调用 doNoScheduleTaintingPassWorker 去对Node做 NoSchedule的 污点更新。调用的是 doNoScheduleTaintingPass 方法。方法如下:

func (nc *Controller) doNoScheduleTaintingPass(nodeName string) error {
	node, err := nc.nodeLister.Get(nodeName)
	if err != nil {
		// If node not found, just ignore it.
		if apierrors.IsNotFound(err) {
			return nil
		}
		return err
	}

	// Map node's condition to Taints.
	var taints []v1.Taint
	for _, condition := range node.Status.Conditions {
		if taintMap, found := nodeConditionToTaintKeyStatusMap[condition.Type]; found {
			if taintKey, found := taintMap[condition.Status]; found {
				taints = append(taints, v1.Taint{
					Key:    taintKey,
					Effect: v1.TaintEffectNoSchedule,
				})
			}
		}
	}
	if node.Spec.Unschedulable {
		// If unschedulable, append related taint.
		taints = append(taints, v1.Taint{
			Key:    schedulerapi.TaintNodeUnschedulable,
			Effect: v1.TaintEffectNoSchedule,
		})
	}

	// Get exist taints of node.
	nodeTaints := taintutils.TaintSetFilter(node.Spec.Taints, func(t *v1.Taint) bool {
		// only NoSchedule taints are candidates to be compared with "taints" later
		if t.Effect != v1.TaintEffectNoSchedule {
			return false
		}
		// Find unschedulable taint of node.
		if t.Key == schedulerapi.TaintNodeUnschedulable {
			return true
		}
		// Find node condition taints of node.
		_, found := taintKeyToNodeConditionMap[t.Key]
		return found
	})
	taintsToAdd, taintsToDel := taintutils.TaintSetDiff(taints, nodeTaints)
	// If nothing to add not delete, return true directly.
	if len(taintsToAdd) == 0 && len(taintsToDel) == 0 {
		return nil
	}
	if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) {
		return fmt.Errorf("failed to swap taints of node %+v", node)
	}
	return nil
}
复制代码

doNoScheduleTaintingPass 主要做了以下工作:

1、根据nodeName获取node信息;

2、根据node.Status.Conditions字段,判断node是否需要添加NoSchedule污点,判断的标准如下:

Kubernetes源码分析之Node Controller

只要处于任一状态,即需要添加NoSchedule污点;

3、如果Node为Unschedulable状态,同样添加NoSchedule的污点;

4、根据Node上已有的污点,判断之前添加的哪些污点是需要添加的,哪些是需要删除的;

5、调用 SwapNodeControllerTaint

对Node进行污点的状态更新。

doNoExecuteTaintingPass与doEvictionPass

doNoExecuteTaintingPassdoEvictionPass 两者只会执行其一。

Kubernetes源码分析之Node Controller

当开启useTaintBasedEvictions特性的时候,调用 doNoExecuteTaintingPass 方法为Node添加 NoExecute 污点;而 doEvictionPass 则是直接判断哪些Pod需要驱逐,直接去做删除工作。

doNoExecuteTaintingPass 方法中,通过获取 zoneNoExecuteTainter 内的数据对Node状态进行判断,如果需要则添加上NoExecute污点,并调用 SwapNodeControllerTaint 方法更新该Node上的污点。zoneNoExecuteTainter的信息是通过 monitorNodeHealth 方法获取到的,后面再分析。 doNoExecuteTaintingPass

的方法如下:

func (nc *Controller) doNoExecuteTaintingPass() {
	nc.evictorLock.Lock()
	defer nc.evictorLock.Unlock()
	for k := range nc.zoneNoExecuteTainter {
		// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
		nc.zoneNoExecuteTainter[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
			node, err := nc.nodeLister.Get(value.Value)
			if apierrors.IsNotFound(err) {
				klog.Warningf("Node %v no longer present in nodeLister!", value.Value)
				return true, 0
			} else if err != nil {
				klog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
				// retry in 50 millisecond
				return false, 50 * time.Millisecond
			} else {
				zone := utilnode.GetZoneKey(node)
				evictionsNumber.WithLabelValues(zone).Inc()
			}
			_, condition := v1node.GetNodeCondition(&node.Status, v1.NodeReady)
			// Because we want to mimic NodeStatus.Condition["Ready"] we make "unreachable" and "not ready" taints mutually exclusive.
			taintToAdd := v1.Taint{}
			oppositeTaint := v1.Taint{}
			if condition.Status == v1.ConditionFalse {
				taintToAdd = *NotReadyTaintTemplate
				oppositeTaint = *UnreachableTaintTemplate
			} else if condition.Status == v1.ConditionUnknown {
				taintToAdd = *UnreachableTaintTemplate
				oppositeTaint = *NotReadyTaintTemplate
			} else {
				// It seems that the Node is ready again, so there's no need to taint it.
				klog.V(4).Infof("Node %v was in a taint queue, but it's ready now. Ignoring taint request.", value.Value)
				return true, 0
			}

			return nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{&oppositeTaint}, node), 0
		})
	}
}
复制代码

doEvictionPass 则是直接通过获取 zonePodEvictor 内的数据,判断哪些Pod需要被驱除,则直接调用Pod的DELETE接口,完成Pod的驱逐任务。zonePodEvictor的信息也是通过 monitorNodeHealth 方法获取到的。 doEvictionPass 方法如下:

func (nc *Controller) doEvictionPass() {
	nc.evictorLock.Lock()
	defer nc.evictorLock.Unlock()
	for k := range nc.zonePodEvictor {
		// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
		nc.zonePodEvictor[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
			node, err := nc.nodeLister.Get(value.Value)
			if apierrors.IsNotFound(err) {
				klog.Warningf("Node %v no longer present in nodeLister!", value.Value)
			} else if err != nil {
				klog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
			} else {
				zone := utilnode.GetZoneKey(node)
				evictionsNumber.WithLabelValues(zone).Inc()
			}
			nodeUID, _ := value.UID.(string)
			remaining, err := nodeutil.DeletePods(nc.kubeClient, nc.recorder, value.Value, nodeUID, nc.daemonSetStore)
			if err != nil {
				utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
				return false, 0
			}
			if remaining {
				klog.Infof("Pods awaiting deletion due to Controller eviction")
			}
			return true, 0
		})
	}
}
复制代码

两种方法的不同在于, doNoExecuteTaintingPass 只是对Node打上污点,而 doEvictionPass 则是完成了最终的删除工作。 doEvictionPass 的这种方式会导致某一个时间段内,大量的Pod需要被删除,会产生很大的流量;而 doNoExecuteTaintingPass 通过给Node打上污点,让TaintManager去做最终的Pod删除工作,TaintManager的删除任务是分时间段定时执行的,所以不会产生这种大流量的问题。因此建议开启这个特性,在kube-controller-manager的启动参数加上 --feature-gates=TaintBasedEvictions=true 即可。

monitorNodeHealth

前面几个goroutine的任务主要围绕着Taint来展开,而 monitorNodeHealth 则是定时更新Node的信息,并产生数据的来源。

未完待续。。。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

JavaScript精粹

JavaScript精粹

爱德华兹 / 高铁军 / 人民邮电出版社 / 2007-6 / 49.00元

《JavaScript 精粹》主要介绍JavaScript应用中一些常见的问题及其解决方法,从最基础的数字、字符串、数组到进阶的DOM、表单验证、cookie,再到较为高级的Ajax,书中均有涉及。《JavaScript 精粹》覆盖现在非常流行和通用的技术,提出很多出现频率较高的Web开发常见问题,并提供了大量的技巧和解决方案,具有很强的实用性和通用性,书中的代码也具有很强的兼容性。《JavaSc......一起来看看 《JavaScript精粹》 这本书的介绍吧!

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具