原 荐 Kubernetes Endpoints Controller源码分析

栏目: 编程工具 · 发布时间: 7年前

内容简介:Author:摘要:最近我们在写自己的Kubernetes服务路由组件对接公司自研的负载均衡器,这其中涉及到非常核心的Endpoints相关的逻辑,因此对Endpoints Controller的深入分析是非常有必要的,比如Pod Label发生变更、孤立Pod、Pod HostName发生变更等情况下,Endpoints Controller的处理逻辑是否与我们想要的一致。启动两类go协程:

Author: xidianwangtao@gmail.com

摘要:最近我们在写自己的Kubernetes服务路由组件对接公司自研的负载均衡器,这其中涉及到非常核心的Endpoints相关的逻辑,因此对Endpoints Controller的深入分析是非常有必要的,比如Pod Label发生变更、孤立Pod、Pod HostName发生变更等情况下,Endpoints Controller的处理逻辑是否与我们想要的一致。

Endpoints Controller相关的配置项

  • --concurrent-endpoint-syncs int32 Default: 5 The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load.

  • --leader-elect-resource-lock endpoints Default: "endpoints" The type of resource object that is used for locking during leader election. Supported options are endpoints (default) and configmaps .

Endpoints Controller Watch的GVK

  • Core/V1/Pods
  • Core/V1/Services
  • Core/V1/Endpoints

Endpoints Controller Event Handler

  • Add Service Event --> enqueueService
  • Update Service Event --> enqueueService(new)
  • Delete Service Event --> enqueueService
  • Add Pod Event --> addPod
  • Update Pod Event --> updatePod
  • Delete Pod Event --> deletePod
  • Add/Update/Delete Endpoints Event --> nil

Run Endpoints Controller

启动两类 go 协程:

  • 一类协程数为--concurrent-endpoint-syncs配置值(default 5),每个worker负责从service queue中pop service进行syncService同步,完成一次sync后等待1s再从service queue中pop一个service进行sync,如此反复。
  • 另一类协程只有一个协程,负责checkLeftoverEndpoints,只有启动时会执行一次。
// Run will not return until stopCh is closed. workers determines how many
// endpoints will be handled in parallel.
func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	defer e.queue.ShutDown()

	glog.Infof("Starting endpoint controller")
	defer glog.Infof("Shutting down endpoint controller")

	if !controller.WaitForCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced, e.endpointsSynced) {
		return
	}

	// workers = --concurrent-endpoint-syncs's value (default 5)
	for i := 0; i < workers; i++ {
		// workerLoopPeriod = 1s
		go wait.Until(e.worker, e.workerLoopPeriod, stopCh)
	}

	go func() {
		defer utilruntime.HandleCrash()
		e.checkLeftoverEndpoints()
	}()

	<-stopCh
}

checkLeftoverEndpoints

checkLeftoverEndpoints负责List所有当前集群中的endpoints并将它们对应的services添加到queue中,由workers进行syncService同步。

这是为了防止在controller-manager发生重启时时,用户删除了某些Services或者某些Endpoints还没删除干净,Endpoints Controller没有处理的情况下,在Endpoints Controller再次启动时能通过checkLeftoverEndpoints检测到那些孤立的endpionts(没有对应services),将虚构的Services重新加入到队列进行syncService操作,从而完成这些孤立endpoint的清理工作。

上面提到的虚构Services其实是把Endpoints的Key(namespace/name)作为Services的Key,因此这就是为什么要求Endpiont和Service的名字要一致的原因之一。

func (e *EndpointController) checkLeftoverEndpoints() {
	list, err := e.endpointsLister.List(labels.Everything())
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err))
		return
	}
	for _, ep := range list {
		if _, ok := ep.Annotations[resourcelock.LeaderElectionRecordAnnotationKey]; ok {
			// when there are multiple controller-manager instances,
			// we observe that it will delete leader-election endpoints after 5min
			// and cause re-election
			// so skip the delete here
			// as leader-election only have endpoints without service
			continue
		}
		key, err := keyFunc(ep)
		if err != nil {
			utilruntime.HandleError(fmt.Errorf("Unable to get key for endpoint %#v", ep))
			continue
		}
		e.queue.Add(key)
	}
}

另外,还需要注意一点,对于kube-controller-manager多实例HA部署时,各个contorller-manager endpoints是没有对应service的,这种情况下,我们不能把虚构的Service加入到队列触发这些“理应孤立”的endpoints被清理,因此我们给这些“理应孤立”的endpoints加上Annotation "control-plane.alpha.kubernetes.io/leader"以做跳过处理。

Endpoint Contoller的核心逻辑syncService

Service的Add/Update/Delete Event Handler都是将Service Key加入到Queue中,等待worker进行syncService处理:

  1. 根据queue中得到的service key(namespace/name)去indexer中获取对应的Service Object,如果没获取到,则调api删除同Key(namespace/name)的Endpoints Object进行清理工作,这对应到checkLeftoverEndpoints中描述到的那些孤立endpoints清理工作。

  2. 因为Service是通过LabelSelector进行Pod匹配,将匹配的Pods构建对应的Endpoints Subsets加入到Endpoints中,因此这里会先过滤掉那些没有LabelSelector的Services。

  3. 然后用Service的LabelSelector获取同namespace下的所有Pods。

  4. 检查service.Spec.PublishNotReadyAddresses是否为true,或者Service Annotations "service.alpha.kubernetes.io/tolerate-unready-endpoints"是否为true(/t/T/True/TRUE/1),如果为true,则表示tolerate Unready Endpoints,即Unready的Pods信息也会被加入该Service对应的Endpoints中。

    注意,Annotations "service.alpha.kubernetes.io/tolerate-unready-endpoints"在Kubernetes 1.13中将被弃用,后续只使用.Spec.PublishNotReadyAddresses Field。

  5. 接下来就是遍历前面获取到的Pods,用各个Pod的IP、ContainerPorts、HostName及Service的Port去构建Endpoints的Subsets,注意如下特殊处理:

    1. 跳过没有pod.Status.PodIP为空的pod;

    2. 当tolerate Unready Endpoints为false时,跳过那些被标记删除(DeletionTimestamp != nil)的Pods;

    3. 对于Headless Service,因为没有Service Port,因此构建EndpointSubset时对应的Ports内容为空;

    4)当tolerate Unready Endpoints为true(即使Pod not Ready)或者Pod isReady时,Pod对应的EndpointAddress也会被加入到(Ready)Addresses中。

    5)tolerate Unready Endpoints为false且Pod isNotReady情况下:

    - 当pod.Spec.RestartPolicy为Never,Pod Status.Phase为非结束状态(非Failed/Successed)时,Pod对应的EndpointAddress也会被加入到NotReadyAddresses中。
     - 当pod.Spec.RestartPolicy为OnFailure, Pod Status.Phase为非Successed时,Pod对应的EndpointAddress也会被加入到NotReadyAddresses中。
     - 其他情况下,Pod对应的EndpointAddress也会被加入到NotReadyAddresses中。
  6. 从indexer中获取service对应的Endpoints Object(currentEndpoints),如果从indexer中没有返回对应的Endpoints Object,则构建一个与该Service同名、同Labels的Endpoints对象(newEndpoints)。

  7. 如果currentEndpoints的ResourceVersion不为空,则对比currentEndpoints.Subsets、Labels与前面构建的Subsets、Service.Labels是否DeepEqual,如果是则说明不需要update,流程结束。

  8. 否则,就像currentEndpoints DeepCopy给newEndpoints,并用前面构建的Subsets和Services.Labels替换newEndpoints中对应内容。

  9. 如果currentEndpoints的ResourceVersion为空,则调用Create API去创建上一步的newEndpoints Object。如果currentEndpoints的ResourceVersion不为空,表示已经存在对应的Endpoints,则调用Update API用newEndpoints去更新该Endpoints。

  10. 流程结束。

Pod Event Hanlder

Add Pod

  1. 通过Services LabeleSelector与Pod Labels进行匹配的方法,将该Pod能匹配上的所有Services都找出来,然后将它们的Key(namespace/name)都加入到queue等待sync。
// When a pod is added, figure out what services it will be a member of and
// enqueue them. obj must have *v1.Pod type.
func (e *EndpointController) addPod(obj interface{}) {
	pod := obj.(*v1.Pod)
	services, err := e.getPodServiceMemberships(pod)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
		return
	}
	for key := range services {
		e.queue.Add(key)
	}
}

Update Pod

  • 如果newPod.ResourceVersion等于oldPod.ResourceVersion,则跳过,不进行任何update。

  • 检查新老Pod的DeletionTimestamp、Ready Condition以及由PodIP,Hostname等建构的EndpointAddress是否发生变更,只要其中之一发生变更,podChangedFlag就为true。

  • 检查新老Pod Spec的Labels、HostName、Subdomain是否发生变更,只要其中之一发生变更,labelChangedFlag就为true。

  • 如果podChangedFlag和labelChangedFlag都为false,则跳过,不做任何update。

  • 通过Services LabeleSelector与Pod Labels进行匹配的方法,将newPod能匹配上的所有Services都找出来(services记录),如果labelChangedFlag为true,则根据LabelSelector匹配找出oldPod对应的oldServices:

    • 如果podChangedFlag为true,则将services和oldServices进行union集合,将集合内的所有Services Key都加入到queue中等待sync;
    • 如果podChangedFlag为false,则将services和oldServices的互相差值进行union集合,将集合内的所有Services Key都加入到queue中等待sync;

    互相差值进行union集合的含义: services.Difference(oldServices).Union(oldServices.Difference(services))

Delete Pod

  • 如果该pod还是个完整记录的pod,则跟addPod逻辑一样:通过Services LabeleSelector与Pod Labels进行匹配的方法,将该Pod能匹配上的所有Services都找出来,然后将它们的Key(namespace/name)都加入到queue等待sync。
  • 如果该pod是tombstone object(final state is unrecorded),则将其转换成v1.pod后,再调用addPod。相比正常的Pod,就是多了一步:从tombstone到v1.pod的转换。
// When a pod is deleted, enqueue the services the pod used to be a member of.
// obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
func (e *EndpointController) deletePod(obj interface{}) {
	if _, ok := obj.(*v1.Pod); ok {
		// Enqueue all the services that the pod used to be a member
		// of. This happens to be exactly the same thing we do when a
		// pod is added.
		e.addPod(obj)
		return
	}
	// If we reached here it means the pod was deleted but its final state is unrecorded.
	tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
	if !ok {
		utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
		return
	}
	pod, ok := tombstone.Obj.(*v1.Pod)
	if !ok {
		utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Pod: %#v", obj))
		return
	}
	glog.V(4).Infof("Enqueuing services of deleted pod %s/%s having final state unrecorded", pod.Namespace, pod.Name)
	e.addPod(pod)
}

核心Struct

里面有几个struct,挺容易混淆的,简单用图表示下,方便比对:

原 荐 Kubernetes Endpoints Controller源码分析

总结

通过对Endpoints Controller的源码分析,我们了解了其中很多细节,比如对Service和Pod事件处理逻辑、对孤立Pod的处理方法、Pod Labels变更带来的影响等等,这对我们通过Watch Endpoints去写自己的Ingress组件对接公司内部的路由组件时是有帮助的。

原 荐 Kubernetes Endpoints Controller源码分析

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Visual Thinking

Visual Thinking

Colin Ware / Morgan Kaufmann / 2008-4-18 / USD 49.95

Increasingly, designers need to present information in ways that aid their audiences thinking process. Fortunately, results from the relatively new science of human visual perception provide valuable ......一起来看看 《Visual Thinking》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

在线进制转换器
在线进制转换器

各进制数互转换器