《k8s-1.13版本源码分析》- Informer 机制

栏目: 编程工具 · 发布时间: 6年前

内容简介:源码分析系列文章已经开源到github,地址如下:-----------------------------------------------------------------讲 Informer 还是比较有压力的,client-go 中的逻辑确实有点复杂,我甚至怀疑有“炫技”的成分。Informer 在很多组件的源码中可以看到,尤其是 kube-controller-manager (写这篇文章时我已经基本写完 kube-scheduler 的源码分析,准备着手写 kube-controller-ma

源码分析系列文章已经开源到github,地址如下:

  • github:
    https://github.com/farmer-hutao/k8s-source-code-analysis
  • gitbook:
    https://farmer-hutao.github.io/k8s-source-code-analysis

-----------------------------------------------------------------

Informer机制

1. 概述

讲 Informer 还是比较有压力的,client-go 中的逻辑确实有点复杂,我甚至怀疑有“炫技”的成分。Informer 在很多组件的源码中可以看到,尤其是 kube-controller-manager (写这篇文章时我已经基本写完 kube-scheduler 的源码分析,准备着手写 kube-controller-manager 了,鉴于 controlelr 和 client-go 关联太大,跳过来先讲讲 Informer).

Informer 是 client-go 中一个比较核心的工具,通过 Informer 我们可以轻松 List/Get 某个资源对象,可以监听资源对象的各种事件(比如创建和删除)然后触发回调函数,让我们能够在各种事件发生的时候能够作出相应的逻辑处理。举个例字,当 pod 数量变化的时候 deployment 是不是需要判断自己名下的 pod 数量是否还和预期的一样?如果少了是不是要考虑创建?

2. 架构概览

《k8s-1.13版本源码分析》- Informer 机制

如上图,Informer 可以 watch API Server,监听各种事件,然后回调事件 handler。这些事件 handler 可以做一些简单的过滤,最终要将 item 放到 workequeue 中,这个 workerqueue 也是 client-go 提供的工具。最终用户写的 controller 负责启动 worker 去消费这 workqueue 中的 item.

3. SharedInformerFactory

SharedInformerFactory 提供所有 API group 资源的 shared informers,也就是说通过这个 factory 可以使用 DeploymentInformer、ConfigMapInformer 等等各种 Informer,从而能够实现针对各种资源的逻辑处理。

informers/factory.go:185

type SharedInformerFactory interface {
    internalinterfaces.SharedInformerFactory
    ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
    WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool

    Admissionregistration() admissionregistration.Interface
    Apps() apps.Interface
    Auditregistration() auditregistration.Interface
    Autoscaling() autoscaling.Interface
    Batch() batch.Interface
    Certificates() certificates.Interface
    Coordination() coordination.Interface
    Core() core.Interface
    // ……
}

这个 interface 我们关注3个点:

internalinterfaces.SharedInformerFactory
ForResource()

3.1. 同质的方法

我们先看第三点,找个特例,从这个接口的一个方法往里面看一下类型含义,比如 Apps() apps.Interface 吧:

informers/apps/interface.go:29

type Interface interface {
   // V1 provides access to shared informers for resources in V1.
   V1() v1.Interface
   // V1beta1 provides access to shared informers for resources in V1beta1.
   V1beta1() v1beta1.Interface
   // V1beta2 provides access to shared informers for resources in V1beta2.
   V1beta2() v1beta2.Interface
}

很自然我们想到要继续看 v1.Interface

informers/apps/v1/interface.go:26

type Interface interface {
   // ControllerRevisions returns a ControllerRevisionInformer.
   ControllerRevisions() ControllerRevisionInformer
   // DaemonSets returns a DaemonSetInformer.
   DaemonSets() DaemonSetInformer
   // Deployments returns a DeploymentInformer.
   Deployments() DeploymentInformer
   // ReplicaSets returns a ReplicaSetInformer.
   ReplicaSets() ReplicaSetInformer
   // StatefulSets returns a StatefulSetInformer.
   StatefulSets() StatefulSetInformer
}

DeploymentInformer 又是什么类型呢?

informers/apps/v1/deployment.go:36

type DeploymentInformer interface {
   Informer() cache.SharedIndexInformer
   Lister() v1.DeploymentLister
}

可以看到这个 interface 的两个方法的特点,这个接口要提供的是针对 Deployments 的 shared informer 和 lister. 我们先不纠结细节,到这里我们先理解 SharedInformerFactory 提供所有 API group 资源的 shared informers 这句话。

3.2. ForResource()方法

这个方法返回指定类型的 shared informer 的通用访问方式,从实现中可以看到一些端倪:

informers/generic.go:80

func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource) (GenericInformer, error) {
   switch resource {
   // Group=admissionregistration.k8s.io, Version=v1alpha1
   case v1alpha1.SchemeGroupVersion.WithResource("initializerconfigurations"):
      return &genericInformer{resource: resource.GroupResource(), informer: f.Admissionregistration().V1alpha1().InitializerConfigurations().Informer()}, nil
   // ……
   }

这里的返回值是 GenericInformer 类型,很简洁:

informers/generic.go:58

type GenericInformer interface {
   Informer() cache.SharedIndexInformer
   Lister() cache.GenericLister
}

3.3. internalinterfaces.SharedInformerFactory

informers/internalinterfaces/factory_interfaces.go:34

type SharedInformerFactory interface {
   Start(stopCh <-chan struct{})
   InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
}

这里的 InformerFor() 方法和前面的 ForResource() 有点像,这里的返回值是 SharedIndexInformer,GenericInformer 的 Informer() 方法返回值也是 SharedIndexInformer:

tools/cache/shared_informer.go:66

type SharedIndexInformer interface {
   SharedInformer
   // AddIndexers add indexers to the informer before it starts.
   AddIndexers(indexers Indexers) error
   GetIndexer() Indexer
}

3.4. sharedInformerFactory

sharedInformerFactory 对象是 SharedInformerFactory 接口的具体实现,从这个 struct 的属性中我们可以看到一些有用的信息:

informers/factory.go:53

type sharedInformerFactory struct {
   client           kubernetes.Interface
   namespace        string
   tweakListOptions internalinterfaces.TweakListOptionsFunc
   lock             sync.Mutex
   defaultResync    time.Duration
   customResync     map[reflect.Type]time.Duration
   informers map[reflect.Type]cache.SharedIndexInformer
   startedInformers map[reflect.Type]bool
}

这里主要注意 client 和 informers,client 先不细说,大家从字面理解,当作一个可以和 api server 交互(CURD)的 工具 先就行。 informers map[reflect.Type]cache.SharedIndexInformer 明显是存放了多个不同类型的 informers,这个 map 的 key 表达一种 obj 的类型,value 是 SharedIndexInformer,后面我们会讲。

4. SharedIndexInformer

看 client-go 的过程中我一直在想到底哪个对象最能代表 Informer,后来觉得 SharedIndexInformer 应该可以被认为就是广义的 Informer 了。

我们在前面看到 GenericInformer 的代码,再附加对应 struct 贴一份:

type GenericInformer interface {
   Informer() cache.SharedIndexInformer
   Lister() cache.GenericLister
}

type genericInformer struct {
   informer cache.SharedIndexInformer
   resource schema.GroupResource
}

我们编码的时候直接使用的都是 SharedInformerFactory,往里面跟可以认为 GenericInformer 是第一层,这个接口的方法很清晰表达了意图。这里涉及到 informer+lister,我们一一来看。

SharedIndexInformer 的定义如下:

tools/cache/shared_informer.go:66

type SharedIndexInformer interface {
   SharedInformer
   AddIndexers(indexers Indexers) error
   GetIndexer() Indexer
}

这里包了一个 Interface:

tools/cache/shared_informer.go:43

type SharedInformer interface {
    // 留意这个方法
   AddEventHandler(handler ResourceEventHandler)
   AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
   GetStore() Store
   GetController() Controller
   Run(stopCh <-chan struct{})
   HasSynced() bool
   LastSyncResourceVersion() string
}

从函数名得不到太多直观的信息,我们从 SharedIndexInformer 的实现 sharedIndexInformer 入手:

tools/cache/shared_informer.go:127

type sharedIndexInformer struct {
    indexer    Indexer
    controller Controller
    processor             *sharedProcessor
    cacheMutationDetector CacheMutationDetector
    listerWatcher ListerWatcher
    objectType    runtime.Object
    resyncCheckPeriod time.Duration
    defaultEventHandlerResyncPeriod time.Duration
    clock clock.Clock
    started, stopped bool
    startedLock      sync.Mutex
    blockDeltas sync.Mutex
}

从 sharedIndexInformer 的属性中可以看到几个实实在在的对象:

  • indexer
  • controller
  • processor
  • listerWatcher

4.1. indexer

Indexer 接口提供了各种 index 函数,让我们在 list 一个对象时可以使用这些索引函数:

tools/cache/index.go:27

type Indexer interface {
   Store
   Index(indexName string, obj interface{}) ([]interface{}, error)
   IndexKeys(indexName, indexKey string) ([]string, error)
   ListIndexFuncValues(indexName string) []string
   ByIndex(indexName, indexKey string) ([]interface{}, error)
   GetIndexers() Indexers
   AddIndexers(newIndexers Indexers) error
}

这个接口的实现是 cache:

tools/cache/store.go:112

type cache struct {
   cacheStorage ThreadSafeStore
   keyFunc KeyFunc
}

另外我们注意到包了一个接口 Store:

type Store interface {
   Add(obj interface{}) error
   Update(obj interface{}) error
   Delete(obj interface{}) error
   List() []interface{}
   ListKeys() []string
   Get(obj interface{}) (item interface{}, exists bool, err error)
   GetByKey(key string) (item interface{}, exists bool, err error)

   Replace([]interface{}, string) error
   Resync() error
}

Store 是一个一般对象的存储接口,Reflector(后面介绍)知道怎样 watch server 然后更新 store. Reflector 能够将 store 当作一个本地缓存系统,进而以类似队列的方式工作(队列中存的是等待被处理的对象)。

我们来看 Store 接口的一个实现:

type DeltaFIFO struct {
    items map[string]Deltas
    queue []string
    //……
}

4.2. reflector

前面说到 Store 要给 Reflector 服务,我们看一下 Reflector 的定义:

tools/cache/reflector.go:47

type Reflector struct {
   name string
   metrics *reflectorMetrics
   expectedType reflect.Type
   // The destination to sync up with the watch source
   store Store
   // listerWatcher is used to perform lists and watches.
   listerWatcher ListerWatcher
  // ……
}

Reflector 要做的事情是 watch 一个指定的资源,然后将这个资源的变化反射到给定的store中。很明显这里的两个属性 listerWatcher 和 store 就是这些逻辑的关键。

我们简单看一下往 store 中添加数据的代码:

tools/cache/reflector.go:324

switch event.Type {
case watch.Added:
   err := r.store.Add(event.Object)
   // ……
case watch.Modified:
   err := r.store.Update(event.Object)
   // ……
case watch.Deleted:
   // ……
   err := r.store.Delete(event.Object)

这个 store 一般用的是 DeltaFIFO,到这里大概就知道 Refactor 从 API Server watch 资源,然后写入 DeltaFIFO 的过程了,大概长这个样子:

《k8s-1.13版本源码分析》- Informer 机制

然后我们关注一下 DeltaFIFO 的 knownObjects 属性,在创建一个 DeltaFIFO 实例的时候有这样的逻辑:

tools/cache/delta_fifo.go:59

func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
   f := &DeltaFIFO{
      items:        map[string]Deltas{},
      queue:        []string{},
      keyFunc:      keyFunc,
      knownObjects: knownObjects,
   }
   f.cond.L = &f.lock
   return f
}

这里接收了 KeyListerGetter 类型的 knownObjects,继续往前跟可以看到我们前面提到的 SharedIndexInformer 的初始化逻辑中将 indexer 对象当作了这里的 knownObjects 的实参:

tools/cache/shared_informer.go:192

fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

s.indexer 来自于:NewSharedIndexInformer() 函数的逻辑:

func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
   realClock := &clock.RealClock{}
   sharedIndexInformer := &sharedIndexInformer{
      processor:                       &sharedProcessor{clock: realClock},
      indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
      listerWatcher:                   lw,
      objectType:                      objType,
      resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
      defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
      cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
      clock:                           realClock,
   }
   return sharedIndexInformer
}

这里的 NewIndexer() 函数中就可以看到我们前面提到的 Indexer 接口的实现 cache 对象了:

!FILENMAE tools/cache/store.go:239

func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
   return &cache{
      cacheStorage: NewThreadSafeStore(indexers, Indices{}),
      keyFunc:      keyFunc,
   }
}

Ok,我们可以基于前面的图加一个框框了:

《k8s-1.13版本源码分析》- Informer 机制

4.3. ResourceEventHandler

在 SharedInformer 接口中有一个方法 AddEventHandler(handler ResourceEventHandler) ,我们看一下这个方法的一些细节。先来看 ResourceEventHandler 接口的定义:

tools/cache/controller.go:177

type ResourceEventHandler interface {
   OnAdd(obj interface{})
   OnUpdate(oldObj, newObj interface{})
   OnDelete(obj interface{})
}

// adaptor
type ResourceEventHandlerFuncs struct {
    AddFunc    func(obj interface{})
    UpdateFunc func(oldObj, newObj interface{})
    DeleteFunc func(obj interface{})
}

ResourceEventHandler 要做的事情是 handle 一个资源对象的事件通知,在这个资源对象发生增加、修改、删除的时候分别对应上面3个方法的逻辑。下面在 processor 部分我们继续看 ResourceEventHandler.

4.4. controller

controller 对应这里的 Controller 接口:

tools/cache/controller.go:82

type Controller interface {
   Run(stopCh <-chan struct{})
   HasSynced() bool
   LastSyncResourceVersion() string
}

这里有个 Run() 方法比较显眼,我们看一下 sharedIndexInformer 对 Run() 方法的实现:

tools/cache/shared_informer.go:189

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
   // ……
   cfg := &Config{
      Queue:            fifo,
      ListerWatcher:    s.listerWatcher,
      ObjectType:       s.objectType,
      FullResyncPeriod: s.resyncCheckPeriod,
      RetryOnError:     false,
      ShouldResync:     s.processor.shouldResync,

      Process: s.HandleDeltas,
   }

   func() {
      // ……
      s.controller = New(cfg)
      // ……
   }()
   // ……
   s.controller.Run(stopCh)
}

关注这里基于 Config 创建了一个 Controller 赋值给 s.controller,然后调用了这个 s.controller.Run() 方法。我们看一下 New 里面是什么:

tools/cache/controller.go:89

// New makes a new Controller from the given Config.
func New(c *Config) Controller {
   ctlr := &controller{
      config: *c,
      clock:  &clock.RealClock{},
   }
   return ctlr
}

这里的 controller 类型是:

tools/cache/controller.go:75

type controller struct {
   config         Config
   reflector      *Reflector
   reflectorMutex sync.RWMutex
   clock          clock.Clock
}

4.4.1. controller.Run()

我们接着关注这个 controller 是怎么实现 Run() 方法的:

tools/cache/controller.go:100

func (c *controller) Run(stopCh <-chan struct{}) {
   defer utilruntime.HandleCrash()
   go func() {
      <-stopCh
      c.config.Queue.Close()
   }()
    // listerWatcher 和 queue 等都用于创建这里的r eflector 了
   r := NewReflector(
      c.config.ListerWatcher,
      c.config.ObjectType,
      c.config.Queue,
      c.config.FullResyncPeriod,
   )
   r.ShouldResync = c.config.ShouldResync
   r.clock = c.clock

   c.reflectorMutex.Lock()
    // reflector 是 controller 的一个关键属性
   c.reflector = r
   c.reflectorMutex.Unlock()

   var wg wait.Group
   defer wg.Wait()

   wg.StartWithChannel(stopCh, r.Run)
   // 逻辑到了 processLoop 里面
   wait.Until(c.processLoop, time.Second, stopCh)
}

这个 loop 是用来消费 queue 的:

tools/cache/controller.go:148

func (c *controller) processLoop() {
   for {
       // 这里的 Pop() 明显是阻塞式的
       // type PopProcessFunc func(interface{}) error
       // PopProcessFunc 用于处理 queue 中 pop 出来的 element
      obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
      if err != nil {
         if err == FIFOClosedError {
            return
         }
         if c.config.RetryOnError {
            // This is the safe way to re-enqueue.
            c.config.Queue.AddIfNotPresent(obj)
         }
      }
   }
}

这里的 PopProcessFunc 可能会让人一时摸不着头脑,其实这这值是一个函数类型 func(interface{}) error ,这里 PopProcessFunc(c.config.Process) 也就是把 c.config.Process 转为了 PopProcessFunc 类型而已。

我们在前面有贴 sharedIndexInformer.Run() 这个函数,里面的 Process: s.HandleDeltas, 这一行其实就交代了这里的 PopProcessFunc 类型实例来源。

4.4.2. sharedIndexInformer.HandleDeltas()

tools/cache/shared_informer.go:344

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
   s.blockDeltas.Lock()
   defer s.blockDeltas.Unlock()
    // from oldest to newest
    // 循环处理这个对象的一系列状态
   for _, d := range obj.(Deltas) {
      switch d.Type {
      case Sync, Added, Updated:
         isSync := d.Type == Sync
         s.cacheMutationDetector.AddObject(d.Object)
         if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
            if err := s.indexer.Update(d.Object); err != nil {
               return err
            }
             // distribute
            s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
         } else {
            if err := s.indexer.Add(d.Object); err != nil {
               return err
            }
             // distribute
            s.processor.distribute(addNotification{newObj: d.Object}, isSync)
         }
      case Deleted:
         if err := s.indexer.Delete(d.Object); err != nil {
            return err
         }
          // distribute
         s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
      }
   }
   return nil
}

先关注这里的 distribute 过程,注意到这个 distribute 的参数是 xxxNotification,下面 processor 部分会讲到这些信号被处理的逻辑。

tools/cache/shared_informer.go:400

func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
   p.listenersLock.RLock()
   defer p.listenersLock.RUnlock()

   if sync {
      for _, listener := range p.syncingListeners {
          // add
         listener.add(obj)
      }
   } else {
      for _, listener := range p.listeners {
          // add
         listener.add(obj)
      }
   }
}

tools/cache/shared_informer.go:506

func (p *processorListener) add(notification interface{}) {
   p.addCh <- notification
}

这里的 p.addCh 接收到信号,也就是下面 processor 部分的逻辑 processorListener.pop() 逻辑的起点。

4.5. processor

在 sharedIndexInformer 对象中有一个属性 processor *sharedProcessor ,这个 sharedProcessor 类型定义如下:

tools/cache/shared_informer.go:375

type sharedProcessor struct {
   listenersStarted bool
   listenersLock    sync.RWMutex
   listeners        []*processorListener
   syncingListeners []*processorListener
   clock            clock.Clock
   wg               wait.Group
}

这里的重点明显是 listeners 属性了,我们继续看 listeners 的类型中 processorListener 的定义:

tools/cache/shared_informer.go:466

type processorListener struct {
   nextCh chan interface{}
   addCh  chan interface{}

   handler ResourceEventHandler
   // ……
}

这里有一个我们前面提到的 handler,下面结合在一起跟一下handler 方法调用逻辑。

4.5.1. sharedProcessor.run()

从 processor 的 run() 方法开始看:

tools/cache/shared_informer.go:415

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
   func() {
      p.listenersLock.RLock()
      defer p.listenersLock.RUnlock()
      for _, listener := range p.listeners {
         p.wg.Start(listener.run)
         p.wg.Start(listener.pop)
      }
      p.listenersStarted = true
   }()
   <-stopCh
  // ……
}

撇开细节,可以看到这里调用了内部所有 listener 的 run() 和 pop() 方法。

4.5.2. sharedIndexInformer.Run()

我们前面写 controller 时提到过这个Run() ,现在只关注一点,sharedIndexInformer 的 run 会调用到 s.processor.run ,也就是上面写的  sharedProcessor.run() .

4.5.3. processorListener.run()

sharedProcessor.run() 往里调到了 processorListener.run() 和  processorListener.pop() ,先看一下这个 run 做了什么:

tools/cache/shared_informer.go:540

func (p *processorListener) run() {
   stopCh := make(chan struct{})
    wait.Until(func() { // 一分钟执行一次这个 func()
        // 一分钟内的又有几次重试
      err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
          // 等待信号 nextCh
         for next := range p.nextCh {
             // notification 是 next 的实际类型
            switch notification := next.(type) {
                // update
            case updateNotification:
               p.handler.OnUpdate(notification.oldObj, notification.newObj)
                // add
            case addNotification:
               p.handler.OnAdd(notification.newObj)
                // delete
            case deleteNotification:
               p.handler.OnDelete(notification.oldObj)
            default:
               utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
            }
         }
         return true, nil
      })

      if err == nil {
         close(stopCh)
      }
   }, 1*time.Minute, stopCh)
}

这个 run 过程不复杂,等待信号然后调用 handler 的增删改方法做对应的处理逻辑。case 里的 Notification 再看一眼:

tools/cache/shared_informer.go:176

type updateNotification struct {
   oldObj interface{}
   newObj interface{}
}

type addNotification struct {
   newObj interface{}
}

type deleteNotification struct {
   oldObj interface{}
}

另外注意到 for next := range p.nextCh 是下面的 case 执行的前提,也就是说触发点是 p.nextCh,我们接着看 pop 过程( pod的代码花了我不少时间,这里的逻辑不简单):

tools/cache/shared_informer.go:510

func (p *processorListener) pop() {
   defer utilruntime.HandleCrash()
   defer close(p.nextCh) // Tell .run() to stop
    // 这个 chan 是没有初始化的
   var nextCh chan<- interface{}
    // 可以接收任意类型,其实是对应前面提到的 addNotification 等
   var notification interface{}
    // for 循环套 select 是比较常规的写法
   for {
      select {
          //第一遍执行到这里的时候由于 nexth 没有初始化,所以这里会阻塞(和notification有没有值没有关系,notification哪怕是nil也可以写入 chan interface{} 类型的 channel)
      case nextCh <- notification:
         var ok bool
          // 第二次循环,下面一个case运行过之后才有这里的逻辑
         notification, ok = p.pendingNotifications.ReadOne()
         if !ok { 
             // 将 channel 指向 nil 相当于初始化的逆操作,会使得这个 case 条件阻塞
            nextCh = nil 
         }
          // 这里是 for 首次执行逻辑的入口
      case notificationToAdd, ok := <-p.addCh:
         if !ok {
            return
         }
          // 如果是 nil,也就是第一个通知过来的时候,这时不需要用到缓存(和下面else相对)
         if notification == nil { 
             // 赋值给 notification,这样上面一个 case 在接下来的一轮循化中就可以读到了
            notification = notificationToAdd
             // 相当于复制引用,nextCh 就指向了 p.nextCh,使得上面 case 写 channel 的时候本质上操作了 p.nextCh,从而 run 能够读到 p.nextCh 中的信号
            nextCh = p.nextCh
         } else { 
             // 处理到这里的时候,其实第一个 case 已经有了首个 notification,这里的逻辑是一下子来了太多 notification 就往 pendingNotifications 缓存,在第一个 case 中 有对应的 ReadOne()操作
            p.pendingNotifications.WriteOne(notificationToAdd)
         }
      }
   }
}

这里的 pop 逻辑的入口是 <-p.addCh ,我们前面 controller 部分讲到了这个 addCh 的来源。继续看其他逻辑。

4.6. listerwatcher

ListerWatcher 的出镜率还是挺高的,大家应该在很多文章里都有看到过这个词。我们先看一下接口定义:

tools/cache/listwatch.go:31

type ListerWatcher interface {
   // List should return a list type object; 
   List(options metav1.ListOptions) (runtime.Object, error)
   // Watch should begin a watch at the specified version.
   Watch(options metav1.ListOptions) (watch.Interface, error)
}

type ListFunc func(options metav1.ListOptions) (runtime.Object, error)

type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)

type ListWatch struct {
    ListFunc  ListFunc
    WatchFunc WatchFunc
    // DisableChunking requests no chunking for this list watcher.
    DisableChunking bool
}

从这些代码中我们能够体会到一些 ListerWatcher 的用意,但心里应该还是纠结的。我们看一下 deployment 的 list-watch.

我们是从 sharedIndexInformer 中看到有个属性 listerWatcher,DeploymentInformer 的创建代码如下:

informers/apps/v1beta2/deployment.go:50

// 注意到返回值类型是 SharedIndexInformer,也就是说这里的初始化肯定需要给 listerWatcher 属性赋值
func NewDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
   return NewFilteredDeploymentInformer(client, namespace, resyncPeriod, indexers, nil)
}

func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
   return cache.NewSharedIndexInformer(
       // 这里初始化一个 ListWatch 类型实例
      &cache.ListWatch{
          // ListFunc 和 WatchFunc 的赋值
         ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
            if tweakListOptions != nil {
               tweakListOptions(&options)
            }
             // 逻辑是通过client的 xxx 实现的,这个 client 其实就是 Clientset
            return client.AppsV1beta2().Deployments(namespace).List(options)
         },
         WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
            if tweakListOptions != nil {
               tweakListOptions(&options)
            }
            return client.AppsV1beta2().Deployments(namespace).Watch(options)
         },
      },
      &appsv1beta2.Deployment{},
      resyncPeriod,
      indexers,
   )
}

以 list 为例, client.AppsV1beta2().Deployments(namespace).List(options) 其实是 client 提供的逻辑了,我们可以看一下 List() 方法对应的接口:

// DeploymentInterface has methods to work with Deployment resources.
type DeploymentInterface interface {
   Create(*v1beta2.Deployment) (*v1beta2.Deployment, error)
   Update(*v1beta2.Deployment) (*v1beta2.Deployment, error)
   UpdateStatus(*v1beta2.Deployment) (*v1beta2.Deployment, error)
   Delete(name string, options *v1.DeleteOptions) error
   DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error
   Get(name string, options v1.GetOptions) (*v1beta2.Deployment, error)
   List(opts v1.ListOptions) (*v1beta2.DeploymentList, error)
   Watch(opts v1.ListOptions) (watch.Interface, error)
   Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1beta2.Deployment, err error)
   DeploymentExpansion
}

顺着这个接口再往里跟很快就到 http 协议层了,要了然整个 list-watch 的原理还得结合 API Server 的代码,我们今天先不讲。

5. 小结

Informer 的实现还是有点复杂的,啃的过程中很容易一个不小心就被绕晕了。今天我们以开头的那张图结尾。以后讲Operator 的时候会基于这个图增加几个框框。

《k8s-1.13版本源码分析》- Informer 机制

《k8s-1.13版本源码分析》- Informer 机制


以上所述就是小编给大家介绍的《《k8s-1.13版本源码分析》- Informer 机制》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Fluent Python

Fluent Python

Luciano Ramalho / O'Reilly Media / 2015-8-20 / USD 39.99

Learn how to write idiomatic, effective Python code by leveraging its best features. Python's simplicity quickly lets you become productive with it, but this often means you aren’t using everything th......一起来看看 《Fluent Python》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码