内容简介:源码分析系列文章已经开源到github,地址如下:-----------------------------------------------------------------讲 Informer 还是比较有压力的,client-go 中的逻辑确实有点复杂,我甚至怀疑有“炫技”的成分。Informer 在很多组件的源码中可以看到,尤其是 kube-controller-manager (写这篇文章时我已经基本写完 kube-scheduler 的源码分析,准备着手写 kube-controller-ma
源码分析系列文章已经开源到github,地址如下:
- github:
https://github.com/farmer-hutao/k8s-source-code-analysis - gitbook:
https://farmer-hutao.github.io/k8s-source-code-analysis
-----------------------------------------------------------------
Informer机制
1. 概述
讲 Informer 还是比较有压力的,client-go 中的逻辑确实有点复杂,我甚至怀疑有“炫技”的成分。Informer 在很多组件的源码中可以看到,尤其是 kube-controller-manager (写这篇文章时我已经基本写完 kube-scheduler 的源码分析,准备着手写 kube-controller-manager 了,鉴于 controlelr 和 client-go 关联太大,跳过来先讲讲 Informer).
Informer 是 client-go 中一个比较核心的工具,通过 Informer 我们可以轻松 List/Get 某个资源对象,可以监听资源对象的各种事件(比如创建和删除)然后触发回调函数,让我们能够在各种事件发生的时候能够作出相应的逻辑处理。举个例字,当 pod 数量变化的时候 deployment 是不是需要判断自己名下的 pod 数量是否还和预期的一样?如果少了是不是要考虑创建?
2. 架构概览
如上图,Informer 可以 watch API Server,监听各种事件,然后回调事件 handler。这些事件 handler 可以做一些简单的过滤,最终要将 item 放到 workequeue 中,这个 workerqueue 也是 client-go 提供的工具。最终用户写的 controller 负责启动 worker 去消费这 workqueue 中的 item.
3. SharedInformerFactory
SharedInformerFactory 提供所有 API group 资源的 shared informers,也就是说通过这个 factory 可以使用 DeploymentInformer、ConfigMapInformer 等等各种 Informer,从而能够实现针对各种资源的逻辑处理。
informers/factory.go:185
type SharedInformerFactory interface { internalinterfaces.SharedInformerFactory ForResource(resource schema.GroupVersionResource) (GenericInformer, error) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool Admissionregistration() admissionregistration.Interface Apps() apps.Interface Auditregistration() auditregistration.Interface Autoscaling() autoscaling.Interface Batch() batch.Interface Certificates() certificates.Interface Coordination() coordination.Interface Core() core.Interface // …… }
这个 interface 我们关注3个点:
internalinterfaces.SharedInformerFactory ForResource()
3.1. 同质的方法
我们先看第三点,找个特例,从这个接口的一个方法往里面看一下类型含义,比如 Apps() apps.Interface
吧:
informers/apps/interface.go:29
type Interface interface { // V1 provides access to shared informers for resources in V1. V1() v1.Interface // V1beta1 provides access to shared informers for resources in V1beta1. V1beta1() v1beta1.Interface // V1beta2 provides access to shared informers for resources in V1beta2. V1beta2() v1beta2.Interface }
很自然我们想到要继续看 v1.Interface
:
informers/apps/v1/interface.go:26
type Interface interface { // ControllerRevisions returns a ControllerRevisionInformer. ControllerRevisions() ControllerRevisionInformer // DaemonSets returns a DaemonSetInformer. DaemonSets() DaemonSetInformer // Deployments returns a DeploymentInformer. Deployments() DeploymentInformer // ReplicaSets returns a ReplicaSetInformer. ReplicaSets() ReplicaSetInformer // StatefulSets returns a StatefulSetInformer. StatefulSets() StatefulSetInformer }
DeploymentInformer 又是什么类型呢?
informers/apps/v1/deployment.go:36
type DeploymentInformer interface { Informer() cache.SharedIndexInformer Lister() v1.DeploymentLister }
可以看到这个 interface 的两个方法的特点,这个接口要提供的是针对 Deployments 的 shared informer 和 lister. 我们先不纠结细节,到这里我们先理解 SharedInformerFactory 提供所有 API group 资源的 shared informers 这句话。
3.2. ForResource()方法
这个方法返回指定类型的 shared informer 的通用访问方式,从实现中可以看到一些端倪:
informers/generic.go:80
func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource) (GenericInformer, error) { switch resource { // Group=admissionregistration.k8s.io, Version=v1alpha1 case v1alpha1.SchemeGroupVersion.WithResource("initializerconfigurations"): return &genericInformer{resource: resource.GroupResource(), informer: f.Admissionregistration().V1alpha1().InitializerConfigurations().Informer()}, nil // …… }
这里的返回值是 GenericInformer 类型,很简洁:
informers/generic.go:58
type GenericInformer interface { Informer() cache.SharedIndexInformer Lister() cache.GenericLister }
3.3. internalinterfaces.SharedInformerFactory
informers/internalinterfaces/factory_interfaces.go:34
type SharedInformerFactory interface { Start(stopCh <-chan struct{}) InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer }
这里的 InformerFor() 方法和前面的 ForResource() 有点像,这里的返回值是 SharedIndexInformer,GenericInformer 的 Informer() 方法返回值也是 SharedIndexInformer:
tools/cache/shared_informer.go:66
type SharedIndexInformer interface { SharedInformer // AddIndexers add indexers to the informer before it starts. AddIndexers(indexers Indexers) error GetIndexer() Indexer }
3.4. sharedInformerFactory
sharedInformerFactory 对象是 SharedInformerFactory 接口的具体实现,从这个 struct 的属性中我们可以看到一些有用的信息:
informers/factory.go:53
type sharedInformerFactory struct { client kubernetes.Interface namespace string tweakListOptions internalinterfaces.TweakListOptionsFunc lock sync.Mutex defaultResync time.Duration customResync map[reflect.Type]time.Duration informers map[reflect.Type]cache.SharedIndexInformer startedInformers map[reflect.Type]bool }
这里主要注意 client 和 informers,client 先不细说,大家从字面理解,当作一个可以和 api server 交互(CURD)的 工具 先就行。 informers map[reflect.Type]cache.SharedIndexInformer
明显是存放了多个不同类型的 informers,这个 map 的 key 表达一种 obj 的类型,value 是 SharedIndexInformer,后面我们会讲。
4. SharedIndexInformer
看 client-go 的过程中我一直在想到底哪个对象最能代表 Informer,后来觉得 SharedIndexInformer 应该可以被认为就是广义的 Informer 了。
我们在前面看到 GenericInformer 的代码,再附加对应 struct 贴一份:
type GenericInformer interface { Informer() cache.SharedIndexInformer Lister() cache.GenericLister } type genericInformer struct { informer cache.SharedIndexInformer resource schema.GroupResource }
我们编码的时候直接使用的都是 SharedInformerFactory,往里面跟可以认为 GenericInformer 是第一层,这个接口的方法很清晰表达了意图。这里涉及到 informer+lister,我们一一来看。
SharedIndexInformer 的定义如下:
tools/cache/shared_informer.go:66
type SharedIndexInformer interface { SharedInformer AddIndexers(indexers Indexers) error GetIndexer() Indexer }
这里包了一个 Interface:
tools/cache/shared_informer.go:43
type SharedInformer interface { // 留意这个方法 AddEventHandler(handler ResourceEventHandler) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) GetStore() Store GetController() Controller Run(stopCh <-chan struct{}) HasSynced() bool LastSyncResourceVersion() string }
从函数名得不到太多直观的信息,我们从 SharedIndexInformer 的实现 sharedIndexInformer 入手:
tools/cache/shared_informer.go:127
type sharedIndexInformer struct { indexer Indexer controller Controller processor *sharedProcessor cacheMutationDetector CacheMutationDetector listerWatcher ListerWatcher objectType runtime.Object resyncCheckPeriod time.Duration defaultEventHandlerResyncPeriod time.Duration clock clock.Clock started, stopped bool startedLock sync.Mutex blockDeltas sync.Mutex }
从 sharedIndexInformer 的属性中可以看到几个实实在在的对象:
- indexer
- controller
- processor
- listerWatcher
4.1. indexer
Indexer 接口提供了各种 index 函数,让我们在 list 一个对象时可以使用这些索引函数:
tools/cache/index.go:27
type Indexer interface { Store Index(indexName string, obj interface{}) ([]interface{}, error) IndexKeys(indexName, indexKey string) ([]string, error) ListIndexFuncValues(indexName string) []string ByIndex(indexName, indexKey string) ([]interface{}, error) GetIndexers() Indexers AddIndexers(newIndexers Indexers) error }
这个接口的实现是 cache:
tools/cache/store.go:112
type cache struct { cacheStorage ThreadSafeStore keyFunc KeyFunc }
另外我们注意到包了一个接口 Store:
type Store interface { Add(obj interface{}) error Update(obj interface{}) error Delete(obj interface{}) error List() []interface{} ListKeys() []string Get(obj interface{}) (item interface{}, exists bool, err error) GetByKey(key string) (item interface{}, exists bool, err error) Replace([]interface{}, string) error Resync() error }
Store 是一个一般对象的存储接口,Reflector(后面介绍)知道怎样 watch server 然后更新 store. Reflector 能够将 store 当作一个本地缓存系统,进而以类似队列的方式工作(队列中存的是等待被处理的对象)。
我们来看 Store 接口的一个实现:
type DeltaFIFO struct { items map[string]Deltas queue []string //…… }
4.2. reflector
前面说到 Store 要给 Reflector 服务,我们看一下 Reflector 的定义:
tools/cache/reflector.go:47
type Reflector struct { name string metrics *reflectorMetrics expectedType reflect.Type // The destination to sync up with the watch source store Store // listerWatcher is used to perform lists and watches. listerWatcher ListerWatcher // …… }
Reflector 要做的事情是 watch 一个指定的资源,然后将这个资源的变化反射到给定的store中。很明显这里的两个属性 listerWatcher 和 store 就是这些逻辑的关键。
我们简单看一下往 store 中添加数据的代码:
tools/cache/reflector.go:324
switch event.Type { case watch.Added: err := r.store.Add(event.Object) // …… case watch.Modified: err := r.store.Update(event.Object) // …… case watch.Deleted: // …… err := r.store.Delete(event.Object)
这个 store 一般用的是 DeltaFIFO,到这里大概就知道 Refactor 从 API Server watch 资源,然后写入 DeltaFIFO 的过程了,大概长这个样子:
然后我们关注一下 DeltaFIFO 的 knownObjects 属性,在创建一个 DeltaFIFO 实例的时候有这样的逻辑:
tools/cache/delta_fifo.go:59
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO { f := &DeltaFIFO{ items: map[string]Deltas{}, queue: []string{}, keyFunc: keyFunc, knownObjects: knownObjects, } f.cond.L = &f.lock return f }
这里接收了 KeyListerGetter 类型的 knownObjects,继续往前跟可以看到我们前面提到的 SharedIndexInformer 的初始化逻辑中将 indexer 对象当作了这里的 knownObjects 的实参:
tools/cache/shared_informer.go:192
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
s.indexer 来自于:NewSharedIndexInformer() 函数的逻辑:
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { realClock := &clock.RealClock{} sharedIndexInformer := &sharedIndexInformer{ processor: &sharedProcessor{clock: realClock}, indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), listerWatcher: lw, objectType: objType, resyncCheckPeriod: defaultEventHandlerResyncPeriod, defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)), clock: realClock, } return sharedIndexInformer }
这里的 NewIndexer() 函数中就可以看到我们前面提到的 Indexer 接口的实现 cache 对象了:
!FILENMAE tools/cache/store.go:239
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer { return &cache{ cacheStorage: NewThreadSafeStore(indexers, Indices{}), keyFunc: keyFunc, } }
Ok,我们可以基于前面的图加一个框框了:
4.3. ResourceEventHandler
在 SharedInformer 接口中有一个方法 AddEventHandler(handler ResourceEventHandler)
,我们看一下这个方法的一些细节。先来看 ResourceEventHandler 接口的定义:
tools/cache/controller.go:177
type ResourceEventHandler interface { OnAdd(obj interface{}) OnUpdate(oldObj, newObj interface{}) OnDelete(obj interface{}) } // adaptor type ResourceEventHandlerFuncs struct { AddFunc func(obj interface{}) UpdateFunc func(oldObj, newObj interface{}) DeleteFunc func(obj interface{}) }
ResourceEventHandler 要做的事情是 handle 一个资源对象的事件通知,在这个资源对象发生增加、修改、删除的时候分别对应上面3个方法的逻辑。下面在 processor 部分我们继续看 ResourceEventHandler.
4.4. controller
controller 对应这里的 Controller 接口:
tools/cache/controller.go:82
type Controller interface { Run(stopCh <-chan struct{}) HasSynced() bool LastSyncResourceVersion() string }
这里有个 Run()
方法比较显眼,我们看一下 sharedIndexInformer 对 Run() 方法的实现:
tools/cache/shared_informer.go:189
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { // …… cfg := &Config{ Queue: fifo, ListerWatcher: s.listerWatcher, ObjectType: s.objectType, FullResyncPeriod: s.resyncCheckPeriod, RetryOnError: false, ShouldResync: s.processor.shouldResync, Process: s.HandleDeltas, } func() { // …… s.controller = New(cfg) // …… }() // …… s.controller.Run(stopCh) }
关注这里基于 Config 创建了一个 Controller 赋值给 s.controller,然后调用了这个 s.controller.Run() 方法。我们看一下 New 里面是什么:
tools/cache/controller.go:89
// New makes a new Controller from the given Config. func New(c *Config) Controller { ctlr := &controller{ config: *c, clock: &clock.RealClock{}, } return ctlr }
这里的 controller 类型是:
tools/cache/controller.go:75
type controller struct { config Config reflector *Reflector reflectorMutex sync.RWMutex clock clock.Clock }
4.4.1. controller.Run()
我们接着关注这个 controller 是怎么实现 Run() 方法的:
tools/cache/controller.go:100
func (c *controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() go func() { <-stopCh c.config.Queue.Close() }() // listerWatcher 和 queue 等都用于创建这里的r eflector 了 r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) r.ShouldResync = c.config.ShouldResync r.clock = c.clock c.reflectorMutex.Lock() // reflector 是 controller 的一个关键属性 c.reflector = r c.reflectorMutex.Unlock() var wg wait.Group defer wg.Wait() wg.StartWithChannel(stopCh, r.Run) // 逻辑到了 processLoop 里面 wait.Until(c.processLoop, time.Second, stopCh) }
这个 loop 是用来消费 queue 的:
tools/cache/controller.go:148
func (c *controller) processLoop() { for { // 这里的 Pop() 明显是阻塞式的 // type PopProcessFunc func(interface{}) error // PopProcessFunc 用于处理 queue 中 pop 出来的 element obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == FIFOClosedError { return } if c.config.RetryOnError { // This is the safe way to re-enqueue. c.config.Queue.AddIfNotPresent(obj) } } } }
这里的 PopProcessFunc 可能会让人一时摸不着头脑,其实这这值是一个函数类型 func(interface{}) error
,这里 PopProcessFunc(c.config.Process)
也就是把 c.config.Process
转为了 PopProcessFunc
类型而已。
我们在前面有贴 sharedIndexInformer.Run() 这个函数,里面的 Process: s.HandleDeltas,
这一行其实就交代了这里的 PopProcessFunc 类型实例来源。
4.4.2. sharedIndexInformer.HandleDeltas()
tools/cache/shared_informer.go:344
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() // from oldest to newest // 循环处理这个对象的一系列状态 for _, d := range obj.(Deltas) { switch d.Type { case Sync, Added, Updated: isSync := d.Type == Sync s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } // distribute s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } // distribute s.processor.distribute(addNotification{newObj: d.Object}, isSync) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } // distribute s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil }
先关注这里的 distribute 过程,注意到这个 distribute 的参数是 xxxNotification,下面 processor 部分会讲到这些信号被处理的逻辑。
tools/cache/shared_informer.go:400
func (p *sharedProcessor) distribute(obj interface{}, sync bool) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() if sync { for _, listener := range p.syncingListeners { // add listener.add(obj) } } else { for _, listener := range p.listeners { // add listener.add(obj) } } }
tools/cache/shared_informer.go:506
func (p *processorListener) add(notification interface{}) { p.addCh <- notification }
这里的 p.addCh 接收到信号,也就是下面 processor 部分的逻辑 processorListener.pop()
逻辑的起点。
4.5. processor
在 sharedIndexInformer 对象中有一个属性 processor *sharedProcessor
,这个 sharedProcessor 类型定义如下:
tools/cache/shared_informer.go:375
type sharedProcessor struct { listenersStarted bool listenersLock sync.RWMutex listeners []*processorListener syncingListeners []*processorListener clock clock.Clock wg wait.Group }
这里的重点明显是 listeners 属性了,我们继续看 listeners 的类型中 processorListener 的定义:
tools/cache/shared_informer.go:466
type processorListener struct { nextCh chan interface{} addCh chan interface{} handler ResourceEventHandler // …… }
这里有一个我们前面提到的 handler,下面结合在一起跟一下handler 方法调用逻辑。
4.5.1. sharedProcessor.run()
从 processor 的 run() 方法开始看:
tools/cache/shared_informer.go:415
func (p *sharedProcessor) run(stopCh <-chan struct{}) { func() { p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { p.wg.Start(listener.run) p.wg.Start(listener.pop) } p.listenersStarted = true }() <-stopCh // …… }
撇开细节,可以看到这里调用了内部所有 listener 的 run() 和 pop() 方法。
4.5.2. sharedIndexInformer.Run()
我们前面写 controller 时提到过这个Run() ,现在只关注一点,sharedIndexInformer 的 run 会调用到 s.processor.run
,也就是上面写的 sharedProcessor.run()
.
4.5.3. processorListener.run()
sharedProcessor.run()
往里调到了 processorListener.run() 和 processorListener.pop()
,先看一下这个 run 做了什么:
tools/cache/shared_informer.go:540
func (p *processorListener) run() { stopCh := make(chan struct{}) wait.Until(func() { // 一分钟执行一次这个 func() // 一分钟内的又有几次重试 err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) { // 等待信号 nextCh for next := range p.nextCh { // notification 是 next 的实际类型 switch notification := next.(type) { // update case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) // add case addNotification: p.handler.OnAdd(notification.newObj) // delete case deleteNotification: p.handler.OnDelete(notification.oldObj) default: utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next)) } } return true, nil }) if err == nil { close(stopCh) } }, 1*time.Minute, stopCh) }
这个 run 过程不复杂,等待信号然后调用 handler 的增删改方法做对应的处理逻辑。case 里的 Notification 再看一眼:
tools/cache/shared_informer.go:176
type updateNotification struct { oldObj interface{} newObj interface{} } type addNotification struct { newObj interface{} } type deleteNotification struct { oldObj interface{} }
另外注意到 for next := range p.nextCh
是下面的 case 执行的前提,也就是说触发点是 p.nextCh,我们接着看 pop 过程( pod的代码花了我不少时间,这里的逻辑不简单):
tools/cache/shared_informer.go:510
func (p *processorListener) pop() { defer utilruntime.HandleCrash() defer close(p.nextCh) // Tell .run() to stop // 这个 chan 是没有初始化的 var nextCh chan<- interface{} // 可以接收任意类型,其实是对应前面提到的 addNotification 等 var notification interface{} // for 循环套 select 是比较常规的写法 for { select { //第一遍执行到这里的时候由于 nexth 没有初始化,所以这里会阻塞(和notification有没有值没有关系,notification哪怕是nil也可以写入 chan interface{} 类型的 channel) case nextCh <- notification: var ok bool // 第二次循环,下面一个case运行过之后才有这里的逻辑 notification, ok = p.pendingNotifications.ReadOne() if !ok { // 将 channel 指向 nil 相当于初始化的逆操作,会使得这个 case 条件阻塞 nextCh = nil } // 这里是 for 首次执行逻辑的入口 case notificationToAdd, ok := <-p.addCh: if !ok { return } // 如果是 nil,也就是第一个通知过来的时候,这时不需要用到缓存(和下面else相对) if notification == nil { // 赋值给 notification,这样上面一个 case 在接下来的一轮循化中就可以读到了 notification = notificationToAdd // 相当于复制引用,nextCh 就指向了 p.nextCh,使得上面 case 写 channel 的时候本质上操作了 p.nextCh,从而 run 能够读到 p.nextCh 中的信号 nextCh = p.nextCh } else { // 处理到这里的时候,其实第一个 case 已经有了首个 notification,这里的逻辑是一下子来了太多 notification 就往 pendingNotifications 缓存,在第一个 case 中 有对应的 ReadOne()操作 p.pendingNotifications.WriteOne(notificationToAdd) } } } }
这里的 pop 逻辑的入口是 <-p.addCh
,我们前面 controller 部分讲到了这个 addCh 的来源。继续看其他逻辑。
4.6. listerwatcher
ListerWatcher 的出镜率还是挺高的,大家应该在很多文章里都有看到过这个词。我们先看一下接口定义:
tools/cache/listwatch.go:31
type ListerWatcher interface { // List should return a list type object; List(options metav1.ListOptions) (runtime.Object, error) // Watch should begin a watch at the specified version. Watch(options metav1.ListOptions) (watch.Interface, error) } type ListFunc func(options metav1.ListOptions) (runtime.Object, error) type WatchFunc func(options metav1.ListOptions) (watch.Interface, error) type ListWatch struct { ListFunc ListFunc WatchFunc WatchFunc // DisableChunking requests no chunking for this list watcher. DisableChunking bool }
从这些代码中我们能够体会到一些 ListerWatcher 的用意,但心里应该还是纠结的。我们看一下 deployment 的 list-watch.
我们是从 sharedIndexInformer 中看到有个属性 listerWatcher,DeploymentInformer 的创建代码如下:
informers/apps/v1beta2/deployment.go:50
// 注意到返回值类型是 SharedIndexInformer,也就是说这里的初始化肯定需要给 listerWatcher 属性赋值 func NewDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { return NewFilteredDeploymentInformer(client, namespace, resyncPeriod, indexers, nil) } func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { return cache.NewSharedIndexInformer( // 这里初始化一个 ListWatch 类型实例 &cache.ListWatch{ // ListFunc 和 WatchFunc 的赋值 ListFunc: func(options v1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { tweakListOptions(&options) } // 逻辑是通过client的 xxx 实现的,这个 client 其实就是 Clientset return client.AppsV1beta2().Deployments(namespace).List(options) }, WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.AppsV1beta2().Deployments(namespace).Watch(options) }, }, &appsv1beta2.Deployment{}, resyncPeriod, indexers, ) }
以 list 为例, client.AppsV1beta2().Deployments(namespace).List(options)
其实是 client 提供的逻辑了,我们可以看一下 List() 方法对应的接口:
// DeploymentInterface has methods to work with Deployment resources. type DeploymentInterface interface { Create(*v1beta2.Deployment) (*v1beta2.Deployment, error) Update(*v1beta2.Deployment) (*v1beta2.Deployment, error) UpdateStatus(*v1beta2.Deployment) (*v1beta2.Deployment, error) Delete(name string, options *v1.DeleteOptions) error DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error Get(name string, options v1.GetOptions) (*v1beta2.Deployment, error) List(opts v1.ListOptions) (*v1beta2.DeploymentList, error) Watch(opts v1.ListOptions) (watch.Interface, error) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1beta2.Deployment, err error) DeploymentExpansion }
顺着这个接口再往里跟很快就到 http 协议层了,要了然整个 list-watch 的原理还得结合 API Server 的代码,我们今天先不讲。
5. 小结
Informer 的实现还是有点复杂的,啃的过程中很容易一个不小心就被绕晕了。今天我们以开头的那张图结尾。以后讲Operator 的时候会基于这个图增加几个框框。
以上所述就是小编给大家介绍的《《k8s-1.13版本源码分析》- Informer 机制》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Timer机制源码浅析
- Handler机制源码分析
- View事件机制源码分析
- Checkpoint对齐机制源码分析
- SpringBoot事件监听机制源码分析(上) SpringBoot源码(九)
- vue源码解读-component机制
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。