图解kubernetes容器运行时状态缓存数据结构

栏目: IT技术 · 发布时间: 4年前

内容简介:缓存和发布订阅都是后端开发中常用的手段,其中缓存主要是用于可丢失数据的暂存,发布订阅主要是用于消息传递,今天给大家介绍一个k8s中带有发布订阅的缓存实现,其目标是给定一个时间,只关注该时间后续的事件,主要是用于近实时状态数据的获取

缓存和发布订阅都是后端开发中常用的手段,其中缓存主要是用于可丢失数据的暂存,发布订阅主要是用于消息传递,今天给大家介绍一个k8s中带有发布订阅的缓存实现,其目标是给定一个时间,只关注该时间后续的事件,主要是用于近实时状态数据的获取

1. 业务背景

图解kubernetes容器运行时状态缓存数据结构 在k8s中的kubelet中支持不同的容器运行时,为了缓存容器运行时当前所有可见的Pod/Container就构造了一个Cache结构,当一个事件发生后,kubelet接收到事件后,此时需要获取当前Pod的状态,此时要获取的状态,就必须要求是在事件产生后的最新的状态,而不能是之前的状态,

2. 核心实现

图解kubernetes容器运行时状态缓存数据结构

2.1 数据与订阅记录

2.1.1 状态数据

状态数据主要是存储一个pod的状态数据

type data struct {
    // 存储Pod的状态
    status *PodStatus
    // 试图检测Pod状态出错信息
    err error
    // 上次数据的修改时间
    modified time.Time
}
复制代码

2.1.2 订阅记录

订阅记录其实指的是一个订阅需求,其通过一个chan来进行数据通知,其中time字段是过滤条件,即只有时间大于time的记录才允许被加入到chan中

type subRecord struct {
    time time.Time
    ch   chan *data
}复制代码

2.2 Cache实现

2.2.1 核心成员结构

cache里面的数据在kubelet每次进行PLEG更新的时候,都会更新timestamp,并且会重新获取最新的Pod状态进行填充cache,所以这里会更新timestamp,寓意着让之前旧的状态都过期,并且会针对旧的订阅的进行数据的返回

// cache implements Cache.
type cache struct {
    // 读写锁
    lock sync.RWMutex
    // 存储Pod的状态数据,用于满足不带时间戳的状态获取
    pods map[types.UID]*data
    // 全局时间戳,即当前缓存中的数据,至少都要比该时间戳新
    timestamp *time.Time
    //存储对应Pod的定语记录列表
    subscribers map[types.UID][]*subRecord
}复制代码

2.2.3 普通状态数据获取

普通状态获取即直接通过Map来进行数据的返回

func (c *cache) Get(id types.UID) (*PodStatus, error) {
    c.lock.RLock()
    defer c.lock.RUnlock()
    d := c.get(id)
    return d.status, d.err
}

复制代码

2.2.4 默认状态构造器

当发现当前的cahce中并不存在对应的数据,则是直接根据ID来生成一个默认的状态数据

func (c *cache) get(id types.UID) *data {
    d, ok := c.pods[id]
    if !ok {
        return makeDefaultData(id)
    }
    return d
}
// 默认状态构造器
func makeDefaultData(id types.UID) *data {
    return &data{status: &PodStatus{ID: id}, err: nil}
}复制代码

2.2.5 最新状态数据获取

会给定一个时间戳,只有当当前缓存的数据的时间在该时间戳之后,才有效,否则返回nil,这里有个关键点就是timestamp的相关设计,因为在每个PLEG周期中,都会更新timestamp

如果minTime <="" p="">

func (c *cache) getIfNewerThan(id types.UID, minTime time.Time) *data {
    // 获取当前的状态
    d, ok := c.pods[id]

    // 如果全局时间戳大于给定的时间,则会直接返回
    globalTimestampIsNewer := (c.timestamp != nil && c.timestamp.After(minTime))
    if !ok && globalTimestampIsNewer {
        // 状态没有缓存,但是全局时间比最小时间新,就直接返回
        return makeDefaultData(id)
    }
    // 如果之前数据的时间在获取时间之后,或者全局时间已经更新
    if ok && (d.modified.After(minTime) || globalTimestampIsNewer) {
        return d
    }
    // The pod status is not ready.
    return nil
}复制代码

2.2.6 订阅状态管道构造

订阅管道最终会返回一个状态的管道,同时会进行检查,如果发现当前有可用数据,则会直接丢进管道中,否则则创建一个subRecords订阅记录,并保存

func (c *cache) subscribe(id types.UID, timestamp time.Time) chan *data {
    ch := make(chan *data, 1)
    c.lock.Lock()
    defer c.lock.Unlock()
    // 获取状态数据
    d := c.getIfNewerThan(id, timestamp)
    if d != nil {
        // 如果已经有状态数据,则立即返回
        ch <- d
        return ch
    }
    // 否则添加一个订阅记录到subscribers中对应的列表中
    c.subscribers[id] = append(c.subscribers[id], &subRecord{time: timestamp, ch: ch})
    return ch
}复制代码

2.2.7 通知清理过期管道

通知的时候回根据subRecord的订阅时间进行检测,如果订阅时间已经超过当前的 timestamp则直接获取数据进行返回,最后只会保留那些还未过期的订阅记录

func (c *cache) notify(id types.UID, timestamp time.Time) {
    // 获取事件的ID列表
    list, ok := c.subscribers[id]
    if !ok {
        // No one to notify.
        return
    }
    newList := []*subRecord{}
    // 遍历所有的订阅记录subRecords
    for i, r := range list {
        // 如果这些订阅记录的时间在timestamp之前,就不进行操作, 即当前管道时间>timestamp
        if timestamp.Before(r.time) {
            newList = append(newList, list[i])
            continue
        }
        // 获取一个数据返回, 同时关闭管道
        r.ch <- c.get(id)
        close(r.ch)
    }
    if len(newList) == 0 {
        // 如果不存在订阅记录,则就删除对应的key
        delete(c.subscribers, id)
    } else {
        // 剩余的订阅列表
        c.subscribers[id] = newList
    }
}复制代码

2.2.8 全局时间戳更新

全局时间戳更新,则会遍历所有的订阅,以最新的全局时间戳作为时间,进行通知

func (c *cache) UpdateTime(timestamp time.Time) {
    c.lock.Lock()
    defer c.lock.Unlock()
    c.timestamp = ×tamp
    // Notify all the subscribers if the condition is met.
    for id := range c.subscribers {
        c.notify(id, *c.timestamp)
    }
}
复制代码

2.2.9 Pod事件更新通知函数

更新的时候,则会调用notify来进行通知

func (c *cache) Set(id types.UID, status *PodStatus, err error, timestamp time.Time) {
    c.lock.Lock()
    defer c.lock.Unlock()
    // 进行事件的通知
    defer c.notify(id, timestamp)
    // 保存最新的状态数据 
    c.pods[id] = &data{status: status, err: err, modified: timestamp}
}复制代码

今天就到这里,这些数据结构和设计有很多值得学习地方,希望大家能多多交流,一起学习云原生相关的设计与关键实现

微信号:baxiaoshi2020 图解kubernetes容器运行时状态缓存数据结构

关注公告号阅读更多源码分析文章 图解kubernetes容器运行时状态缓存数据结构

更多文章关注 www.sreguide.com

本文由博客一文多发平台 OpenWrite 发布


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

付费:互联网知识经济的兴起

付费:互联网知识经济的兴起

方军 / 机械工业出版社 / 2017-6-1 / CNY 59.00

关于互联网知识付费的首部作品 知识工作正在被重塑,知识经济正在开启互联网时代下半场 为你展现互联网知识经济全景大图,解读新物种的前世今生 内容简介 一个产业解读 三个分析工具 一组知识卡片 书是最早的知识载体,已有2000多年的付费历史,随着移动互联网的普及,新的知识经 济在今天爆发,知识的创造者和传播者从书后走到了书前,互联网知识经济正在拉开帷幕。知识的......一起来看看 《付费:互联网知识经济的兴起》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试