图解kubernetes中etcd增删改查的工业实现

栏目: IT技术 · 发布时间: 4年前

内容简介:kubernetes中基于etcd实现集中的数据存储,今天来学习下基于etcd如何实现数据读取一致性、更新一致性、事务的具体实现

kubernetes中基于etcd实现集中的数据存储,今天来学习下基于etcd如何实现数据读取一致性、更新一致性、事务的具体实现

1. 数据的存储与版本

1.1 数据存储的转换

图解kubernetes中etcd增删改查的工业实现 在k8s中有部分数据的存储是需要经过处理之后才能存储的,比如secret这种加密的数据,既然要存储就至少包含两个操作,加密存储,解密读取,transformer就是为了完成该操作而实现的,其在进行etcd数据存储的时候回对数据进行加密,而在读取的时候,则会进行解密

1.2 资源版本revision

图解kubernetes中etcd增删改查的工业实现 在etcd中进行修改(增删改)操作的时候,都会递增revision,而在k8s中也通过该值来作为k8s资源的ResourceVersion,该机制也是实现watch的关键机制,在操作etcd解码从etcd获取的数据的时候,会通过versioner组件来为资源动态的修改该值

1.3 数据模型的映射

图解kubernetes中etcd增删改查的工业实现 将数据从etcd中读取后,数据本身就是一个字节数组,如何将对应的数据转换成我们真正的运行时对象呢?还记得我们之前的scheme与codec么,在这里我们知道对应的数据编码格式,也知道资源对象的类型,则通过codec、字节数组、目标类型,我们就可以完成对应数据的反射

2. 查询接口一致性

图解kubernetes中etcd增删改查的工业实现 etcd中的数据写入是基于leader单点写入和集群quorum机制实现的,并不是一个强一致性的数据写入,则如果如果我们访问的节点不存在quorum的半数节点内,则可能造成短暂的数据不一致,针对一些强一致的场景,我们可以通过其revision机制来进行数据的读取, 保证我们读取到更新之后的数据

// 省略非核心代码
func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {
    // 获取key
    getResp, err := s.client.KV.Get(ctx, key, s.getOps...)

    // 检测当前版本,是否达到最小版本的
    if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
        return err
    }

    // 执行数据转换
    data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
    if err != nil {
        return storage.NewInternalError(err.Error())
    }
    // 解码数据
    return decode(s.codec, s.versioner, data, out, kv.ModRevision)
}
复制代码

3. 创建接口实现 图解kubernetes中etcd增删改查的工业实现

创建一个接口数据则会首先进行资源对象的检查,避免重复创建对象,此时会先通过资源对象的version字段来进行初步检查,然后在利用etcd的事务机制来保证资源创建的原子性操作

// 省略非核心代码
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
    if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
        return errors.New("resourceVersion should not be set on objects to be created")
    }
    if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
        return fmt.Errorf("PrepareObjectForStorage failed: %v", err)
    }
    // 将数据编码
    data, err := runtime.Encode(s.codec, obj)
    if err != nil {
        return err
    }
    
    // 转换数据
    newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))
    if err != nil {
        return storage.NewInternalError(err.Error())
    }

    startTime := time.Now()
    // 事务操作
    txnResp, err := s.client.KV.Txn(ctx).If(
        notFound(key), // 如果之前不存在 这里是利用的etcd的ModRevision即修改版本为0, 寓意着对应的key不存在
    ).Then(
        clientv3.OpPut(key, string(newData), opts...), // put修改数据
    ).Commit()
    metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
    if err != nil {
        return err
    }
    if !txnResp.Succeeded {
        return storage.NewKeyExistsError(key, 0)
    }

    if out != nil {
        // 获取对应的Revision
        putResp := txnResp.Responses[0].GetResponsePut()
        return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
    }
    return nil
}

func notFound(key string) clientv3.Cmp {
    return clientv3.Compare(clientv3.ModRevision(key), "=", 0)
}复制代码

4. 删除接口的实现

图解kubernetes中etcd增删改查的工业实现 删除接口主要是通过CAS和事务机制来共同实现,确保在etcd不发生异常的情况,即使并发对同个资源来进行删除操作也能保证至少有一个节点成功

// 省略非核心代码
func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc) error {
    startTime := time.Now()
    // 获取当前的key的数据
    getResp, err := s.client.KV.Get(ctx, key)
    for {
        // 获取当前的状态
        origState, err := s.getState(getResp, key, v, false)
        if err != nil {
            return err
        }
        txnResp, err := s.client.KV.Txn(ctx).If(
            clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev), // 如果修改版本等于当前状态,就尝试删除
        ).Then(
            clientv3.OpDelete(key), // 删除
        ).Else(
            clientv3.OpGet(key),    // 获取
        ).Commit()
        if !txnResp.Succeeded {
            // 获取最新的数据重试事务操作
            getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
            klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
            continue
        }
        // 将最后一个版本的数据解码到out里面,然后返回
        return decode(s.codec, s.versioner, origState.data, out, origState.rev)
    }
}复制代码

5. 更新接口的实现

图解kubernetes中etcd增删改查的工业实现 更新接口实现上与删除接口并无本质上的差别,但是如果多个节点同时进行更新,CAS并发操作必然会有一个节点成功,当发现已经有节点操作成功,则当前节点其实并不需要再做过多的操作,直接返回即可

// 省略非核心代码
func (s *store) GuaranteedUpdate(
    ctx context.Context, key string, out runtime.Object, ignoreNotFound bool,
    preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error {
    // 获取当前key的最新数据
    getCurrentState := func() (*objState, error) {
        startTime := time.Now()
        getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
        metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
        if err != nil {
            return nil, err
        }
        return s.getState(getResp, key, v, ignoreNotFound)
    }

    // 获取当前数据
    var origState *objState
    var mustCheckData bool
    if len(suggestion) == 1 && suggestion[0] != nil {
        // 如果提供了建议的数据,则会使用,
        origState, err = s.getStateFromObject(suggestion[0])
        if err != nil {
            return err
        }
        //但是需要检测数据
        mustCheckData = true
    } else {
        // 尝试重新获取数据
        origState, err = getCurrentState()
        if err != nil {
            return err
        }
    }

    transformContext := authenticatedDataString(key)
    for {
        // 检查对象是否已经更新, 主要是通过检测uuid/revision来实现
        if err := preconditions.Check(key, origState.obj); err != nil {
            // If our data is already up to date, return the error
            if !mustCheckData {
                return err
            }
            // 如果检查数据一致性错误,则需要重新获取
            origState, err = getCurrentState()
            if err != nil {
                return err
            }
            mustCheckData = false
            // Retry
            continue
        }

        // 删除当前的版本数据revision
        ret, ttl, err := s.updateState(origState, tryUpdate)
        if err != nil {
            // If our data is already up to date, return the error
            if !mustCheckData {
                return err
            }

            // It's possible we were working with stale data
            // Actually fetch
            origState, err = getCurrentState()
            if err != nil {
                return err
            }
            mustCheckData = false
            // Retry
            continue
        }

        // 编码数据
        data, err := runtime.Encode(s.codec, ret)
        if err != nil {
            return err
        }
        if !origState.stale && bytes.Equal(data, origState.data) {
            // 如果我们发现我们当前的数据与获取到的数据一致,则会直接跳过
            if mustCheckData {
                origState, err = getCurrentState()
                if err != nil {
                    return err
                }
                mustCheckData = false
                if !bytes.Equal(data, origState.data) {
                    // original data changed, restart loop
                    continue
                }
            }
            if !origState.stale {
                // 直接返回数据
                return decode(s.codec, s.versioner, origState.data, out, origState.rev)
            }
        }

        // 砖汉数据
        newData, err := s.transformer.TransformToStorage(data, transformContext)
        if err != nil {
            return storage.NewInternalError(err.Error())
        }

        opts, err := s.ttlOpts(ctx, int64(ttl))
        if err != nil {
            return err
        }
        trace.Step("Transaction prepared")

        startTime := time.Now()
        // 事务更新数据
        txnResp, err := s.client.KV.Txn(ctx).If(
            clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
        ).Then(
            clientv3.OpPut(key, string(newData), opts...),
        ).Else(
            clientv3.OpGet(key),
        ).Commit()
        metrics.RecordEtcdRequestLatency("update", getTypeName(out), startTime)
        if err != nil {
            return err
        }
        trace.Step("Transaction committed")
        if !txnResp.Succeeded {
            // 重新获取数据
            getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
            klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key)
            origState, err = s.getState(getResp, key, v, ignoreNotFound)
            if err != nil {
                return err
            }
            trace.Step("Retry value restored")
            mustCheckData = false
            continue
        }
        // 获取put响应
        putResp := txnResp.Responses[0].GetResponsePut()

        return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
    }
}复制代码

6. 未曾讲到的地方

transformer的实现和注册地方我并没有找到,只看到了几个覆盖资源类型的地方,还有list/watch接口,后续再继续学习,今天就先到这里,下次再见

微信号:baxiaoshi2020 图解kubernetes中etcd增删改查的工业实现

关注公告号阅读更多源码分析文章 图解kubernetes中etcd增删改查的工业实现

更多文章关注 www.sreguide.com

本文由博客一文多发平台 OpenWrite 发布


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

颠覆式创新:移动互联网时代的生存法则

颠覆式创新:移动互联网时代的生存法则

李善友 / 机械工业出版社 / 2015-3-1

为什么把每件事情都做对了,仍有可能错失城池?为什么无人可敌的领先企业,却在一夜之间虎落平阳?短短三年间诺基亚陨落,摩托罗拉以区区29亿美元出售给联想,芯片业霸主英特尔在移动芯片领域份额几乎为零,风光无限的巨头转眼成为被颠覆的恐龙,默默无闻的小公司一战成名迅速崛起,令人瞠目结舌的现象几乎都能被“颠覆式创新”法则所解释。 颠覆式创新教你在新的商业竞争中“换操作系统”而不是“打补丁”,小公司用破坏......一起来看看 《颠覆式创新:移动互联网时代的生存法则》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具