内容简介:如果要开发一个开发
如果要开发一个 Dynamic Provisioner
,需要使用到 the helper library
。
1. Dynamic Provisioner
1.1. Provisioner Interface
开发 Dynamic Provisioner
需要实现 Provisioner
接口,该接口有两个方法,分别是:
- Provision:创建存储资源,并且返回一个PV对象。
- Delete:移除对应的存储资源,但并没有删除PV对象。
Provisioner 接口源码如下:
// Provisioner is an interface that creates templates for PersistentVolumes // and can create the volume as a new resource in the infrastructure provider. // It can also remove the volume it created from the underlying storage // provider. type Provisioner interface { // Provision creates a volume i.e. the storage asset and returns a PV object // for the volume Provision(VolumeOptions) (*v1.PersistentVolume, error) // Delete removes the storage asset that was created by Provision backing the // given PV. Does not delete the PV object itself. // // May return IgnoredError to indicate that the call has been ignored and no // action taken. Delete(*v1.PersistentVolume) error }
1.2. VolumeOptions
Provisioner
接口的 Provision
方法的入参是一个 VolumeOptions
对象。 VolumeOptions
对象包含了创建PV对象所需要的信息,例如:PV的回收策略,PV的名字,PV所对应的PVC对象以及PVC的 StorageClass
对象使用的参数等。
VolumeOptions 源码如下:
// VolumeOptions contains option information about a volume // https://github.com/kubernetes/kubernetes/blob/release-1.4/pkg/volume/plugins.go type VolumeOptions struct { // Reclamation policy for a persistent volume PersistentVolumeReclaimPolicy v1.PersistentVolumeReclaimPolicy // PV.Name of the appropriate PersistentVolume. Used to generate cloud // volume name. PVName string // PV mount options. Not validated - mount of the PVs will simply fail if one is invalid. MountOptions []string // PVC is reference to the claim that lead to provisioning of a new PV. // Provisioners *must* create a PV that would be matched by this PVC, // i.e. with required capacity, accessMode, labels matching PVC.Selector and // so on. PVC *v1.PersistentVolumeClaim // Volume provisioning parameters from StorageClass Parameters map[string]string // Node selected by the scheduler for the volume. SelectedNode *v1.Node // Topology constraint parameter from StorageClass AllowedTopologies []v1.TopologySelectorTerm }
1.3. ProvisionController
ProvisionController
是一个给PVC提供PV的控制器,具体执行 Provisioner
接口的 Provision
和 Delete
的方法的所有逻辑。
1.4. 开发provisioner的步骤
-
写一个
provisioner
实现Provisioner
接口(包含Provision
和Delete
的方法)。 -
通过该
provisioner
构建ProvisionController
。 -
执行
ProvisionController
的Run
方法。
2. NFS Client Provisioner
nfs-client-provisioner
是一个 automatic provisioner
,使用NFS作为存储,自动创建PV和对应的PVC,本身不提供NFS存储,需要外部先有一套NFS存储服务。
${namespace}-${pvcName}-${pvName} archieved-${namespace}-${pvcName}-${pvName}
以下通过 nfs-client-provisioner
的源码分析来说明开发自定义 provisioner
整个过程。 nfs-client-provisioner
的主要代码都在 provisioner.go
的文件中。
nfs-client-provisioner
源码地址: https://github.com/kubernetes-incubator/external-storage/tree/master/nfs-client
2.1. Main函数
2.1.1. 读取环境变量
源码如下:
func main() { flag.Parse() flag.Set("logtostderr", "true") server := os.Getenv("NFS_SERVER") if server == "" { glog.Fatal("NFS_SERVER not set") } path := os.Getenv("NFS_PATH") if path == "" { glog.Fatal("NFS_PATH not set") } provisionerName := os.Getenv(provisionerNameKey) if provisionerName == "" { glog.Fatalf("environment variable %s is not set! Please set it.", provisionerNameKey) } ... }
main函数先获取 NFS_SERVER
、 NFS_PATH
、 PROVISIONER_NAME
三个环境变量的值,因此在部署nfs-client-provisioner的时候,需要将这三个环境变量的值传入。
-
NFS_SERVER
:NFS服务端的IP地址。 -
NFS_PATH
:NFS服务端设置的共享目录 -
PROVISIONER_NAME
:provisioner的名字,需要和StorageClass
对象中的provisioner
字段一致。
例如 StorageClass
对象的yaml文件如下:
apiVersion: storage.k8s.io/v1 kind: StorageClass metadata: name: managed-nfs-storage provisioner: fuseim.pri/ifs # or choose another name, must match deployment's env PROVISIONER_NAME' parameters: archiveOnDelete: "false" # When set to "false" your PVs will not be archived by the provisioner upon deletion of the PVC.
2.1.2. 获取clientset对象
源码如下:
// Create an InClusterConfig and use it to create a client for the controller // to use to communicate with Kubernetes config, err := rest.InClusterConfig() if err != nil { glog.Fatalf("Failed to create config: %v", err) } clientset, err := kubernetes.NewForConfig(config) if err != nil { glog.Fatalf("Failed to create client: %v", err) }
通过读取对应的k8s的配置,创建 clientset
对象,用来执行k8s对应的API,其中主要包括对PV和PVC等对象的创建删除等操作。
2.1.3. 构造nfsProvisioner对象
源码如下:
// The controller needs to know what the server version is because out-of-tree // provisioners aren't officially supported until 1.5 serverVersion, err := clientset.Discovery().ServerVersion() if err != nil { glog.Fatalf("Error getting server version: %v", err) } clientNFSProvisioner := &nfsProvisioner{ client: clientset, server: server, path: path, }
通过 clientset
、 server
、 path
等值构造 nfsProvisioner
对象,同时还获取了k8s的版本信息,因为provisioners的功能在k8s 1.5及以上版本才支持。
nfsProvisioner
类型定义如下:
type nfsProvisioner struct { client kubernetes.Interface server string path string } var _ controller.Provisioner = &nfsProvisioner{}
nfsProvisioner
是一个自定义的 provisioner
,用来实现 Provisioner
的接口,其中的属性除了 server
、 path
这两个关于NFS相关的参数,还包含了 client
,主要用来调用k8s的API。
var _ controller.Provisioner = &nfsProvisioner{}
以上用法用来检测 nfsProvisioner
是否实现了 Provisioner
的接口。
2.1.4. 构建并运行ProvisionController
源码如下:
// Start the provision controller which will dynamically provision efs NFS // PVs pc := controller.NewProvisionController(clientset, provisionerName, clientNFSProvisioner, serverVersion.GitVersion) pc.Run(wait.NeverStop)
通过 nfsProvisioner
构造 ProvisionController
对象并执行 Run
方法, ProvisionController
实现了具体的PV和PVC的相关逻辑, Run
方法以常驻进程的方式运行。
2.2. Provision 和 Delete 方法
2.2.1. Provision方法
nfsProvisioner
的 Provision
方法具体源码参考: https://github.com/kubernetes-incubator/external-storage/blob/master/nfs-client/cmd/nfs-client-provisioner/provisioner.go#L56
Provision
方法用来创建存储资源,并且返回一个 PV
对象。其中入参是 VolumeOptions
,用来指定 PV
对象的相关属性。
1、构建PV和PVC的名称
func (p *nfsProvisioner) Provision(options controller.VolumeOptions) (*v1.PersistentVolume, error) { if options.PVC.Spec.Selector != nil { return nil, fmt.Errorf("claim Selector is not supported") } glog.V(4).Infof("nfs provisioner: VolumeOptions %v", options) pvcNamespace := options.PVC.Namespace pvcName := options.PVC.Name pvName := strings.Join([]string{pvcNamespace, pvcName, options.PVName}, "-") fullPath := filepath.Join(mountPath, pvName) glog.V(4).Infof("creating path %s", fullPath) if err := os.MkdirAll(fullPath, 0777); err != nil { return nil, errors.New("unable to create directory to provision new pv: " + err.Error()) } os.Chmod(fullPath, 0777) path := filepath.Join(p.path, pvName) ... }
通过 VolumeOptions
的入参,构建PV和PVC的名称,以及创建路径path。
2、构造PV对象
pv := &v1.PersistentVolume{ ObjectMeta: metav1.ObjectMeta{ Name: options.PVName, }, Spec: v1.PersistentVolumeSpec{ PersistentVolumeReclaimPolicy: options.PersistentVolumeReclaimPolicy, AccessModes: options.PVC.Spec.AccessModes, MountOptions: options.MountOptions, Capacity: v1.ResourceList{ v1.ResourceName(v1.ResourceStorage): options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)], }, PersistentVolumeSource: v1.PersistentVolumeSource{ NFS: &v1.NFSVolumeSource{ Server: p.server, Path: path, ReadOnly: false, }, }, }, } return pv, nil
综上可以看出, Provision
方法只是通过 VolumeOptions
参数来构建 PV
对象,并没有执行具体 PV
的创建或删除的操作。
不同类型的 Provisioner
的,一般是 PersistentVolumeSource
类型和参数不同,例如 nfs-provisioner
对应的 PersistentVolumeSource
为 NFS
,并且需要传入 NFS
相关的参数: Server
, Path
等。
2.2.2. Delete方法
nfsProvisioner
的 delete
方法具体源码参考: https://github.com/kubernetes-incubator/external-storage/blob/master/nfs-client/cmd/nfs-client-provisioner/provisioner.go#L99
1、获取pvName和path等相关参数
func (p *nfsProvisioner) Delete(volume *v1.PersistentVolume) error { path := volume.Spec.PersistentVolumeSource.NFS.Path pvName := filepath.Base(path) oldPath := filepath.Join(mountPath, pvName) if _, err := os.Stat(oldPath); os.IsNotExist(err) { glog.Warningf("path %s does not exist, deletion skipped", oldPath) return nil } ... }
通过 path
和 pvName
生成 oldPath
,其中 oldPath
是原先NFS服务器上 pod
对应的数据持久化存储路径。
2、获取archiveOnDelete参数并删除数据
// Get the storage class for this volume. storageClass, err := p.getClassForVolume(volume) if err != nil { return err } // Determine if the "archiveOnDelete" parameter exists. // If it exists and has a falsey value, delete the directory. // Otherwise, archive it. archiveOnDelete, exists := storageClass.Parameters["archiveOnDelete"] if exists { archiveBool, err := strconv.ParseBool(archiveOnDelete) if err != nil { return err } if !archiveBool { return os.RemoveAll(oldPath) } }
如果 storageClass
对象中指定 archiveOnDelete
参数并且值为 false
,则会自动删除 oldPath
下的所有数据,即 pod
对应的数据持久化存储数据。
archiveOnDelete
字面意思为删除时是否存档,false表示不存档,即删除数据,true表示存档,即重命名路径。
3、重命名旧数据路径
archivePath := filepath.Join(mountPath, "archived-"+pvName) glog.V(4).Infof("archiving path %s to %s", oldPath, archivePath) return os.Rename(oldPath, archivePath)
如果 storageClass
对象中没有指定 archiveOnDelete
参数或者值为 true
,表明需要删除时存档,即将 oldPath
重命名,命名格式为 oldPath
前面增加 archived-
的前缀。
3. ProvisionController
3.1. ProvisionController结构体
ProvisionController
是一个给PVC提供PV的控制器,具体执行 Provisioner
接口的 Provision
和 Delete
的方法的所有逻辑。
3.1.1. 入参
// ProvisionController is a controller that provisions PersistentVolumes for // PersistentVolumeClaims. type ProvisionController struct { client kubernetes.Interface // The name of the provisioner for which this controller dynamically // provisions volumes. The value of annDynamicallyProvisioned and // annStorageProvisioner to set & watch for, respectively provisionerName string // The provisioner the controller will use to provision and delete volumes. // Presumably this implementer of Provisioner carries its own // volume-specific options and such that it needs in order to provision // volumes. provisioner Provisioner // Kubernetes cluster server version: // * 1.4: storage classes introduced as beta. Technically out-of-tree dynamic // provisioning is not officially supported, though it works // * 1.5: storage classes stay in beta. Out-of-tree dynamic provisioning is // officially supported // * 1.6: storage classes enter GA kubeVersion *utilversion.Version ... }
client
、 provisionerName
、 provisioner
、 kubeVersion
等属性作为 NewProvisionController
的入参。
-
client
:clientset客户端,用来调用k8s的API。 -
provisionerName
:provisioner的名字,需要和StorageClass
对象中的provisioner
字段一致。 -
provisioner
:具体的provisioner的实现者,本文为nfsProvisioner
。 -
kubeVersion
:k8s的版本信息。
3.1.2. Controller和Informer
type ProvisionController struct { ... claimInformer cache.SharedInformer claims cache.Store claimController cache.Controller volumeInformer cache.SharedInformer volumes cache.Store volumeController cache.Controller classInformer cache.SharedInformer classes cache.Store classController cache.Controller ... }
ProvisionController
结构体中包含了 PV
、 PVC
、 StorageClass
三个对象的 Controller
、 Informer
和 Store
,主要用来执行这三个对象的相关操作。
- Controller:通用的控制框架
- Informer:消息通知器
- Store:通用的对象存储接口
3.1.3. workqueue
type ProvisionController struct { ... claimQueue workqueue.RateLimitingInterface volumeQueue workqueue.RateLimitingInterface ... }
claimQueue
和 volumeQueue
分别是 PV
和 PVC
的任务队列。
3.1.4. 其他
// Identity of this controller, generated at creation time and not persisted // across restarts. Useful only for debugging, for seeing the source of // events. controller.provisioner may have its own, different notion of // identity which may/may not persist across restarts id string component string eventRecorder record.EventRecorder resyncPeriod time.Duration exponentialBackOffOnError bool threadiness int createProvisionedPVRetryCount int createProvisionedPVInterval time.Duration failedProvisionThreshold, failedDeleteThreshold int // The port for metrics server to serve on. metricsPort int32 // The IP address for metrics server to serve on. metricsAddress string // The path of metrics endpoint path. metricsPath string // Parameters of leaderelection.LeaderElectionConfig. leaseDuration, renewDeadline, retryPeriod time.Duration hasRun bool hasRunLock *sync.Mutex
3.2. NewProvisionController方法
NewProvisionController
方法主要用来构造 ProvisionController
。
3.2.1. 初始化默认值
// NewProvisionController creates a new provision controller using // the given configuration parameters and with private (non-shared) informers. func NewProvisionController( client kubernetes.Interface, provisionerName string, provisioner Provisioner, kubeVersion string, options ...func(*ProvisionController) error, ) *ProvisionController { ... controller := &ProvisionController{ client: client, provisionerName: provisionerName, provisioner: provisioner, kubeVersion: utilversion.MustParseSemantic(kubeVersion), id: id, component: component, eventRecorder: eventRecorder, resyncPeriod: DefaultResyncPeriod, exponentialBackOffOnError: DefaultExponentialBackOffOnError, threadiness: DefaultThreadiness, createProvisionedPVRetryCount: DefaultCreateProvisionedPVRetryCount, createProvisionedPVInterval: DefaultCreateProvisionedPVInterval, failedProvisionThreshold: DefaultFailedProvisionThreshold, failedDeleteThreshold: DefaultFailedDeleteThreshold, leaseDuration: DefaultLeaseDuration, renewDeadline: DefaultRenewDeadline, retryPeriod: DefaultRetryPeriod, metricsPort: DefaultMetricsPort, metricsAddress: DefaultMetricsAddress, metricsPath: DefaultMetricsPath, hasRun: false, hasRunLock: &sync.Mutex{}, } ... }
3.2.2. 初始化任务队列
ratelimiter := workqueue.NewMaxOfRateLimiter( workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 1000*time.Second), &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, ) if !controller.exponentialBackOffOnError { ratelimiter = workqueue.NewMaxOfRateLimiter( workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 15*time.Second), &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, ) } controller.claimQueue = workqueue.NewNamedRateLimitingQueue(ratelimiter, "claims") controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(ratelimiter, "volumes")
3.2.3. ListWatch
// PVC claimSource := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { return client.CoreV1().PersistentVolumeClaims(v1.NamespaceAll).List(options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return client.CoreV1().PersistentVolumeClaims(v1.NamespaceAll).Watch(options) }, } // PV volumeSource := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { return client.CoreV1().PersistentVolumes().List(options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return client.CoreV1().PersistentVolumes().Watch(options) }, } // StorageClass classSource = &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { return client.StorageV1().StorageClasses().List(options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return client.StorageV1().StorageClasses().Watch(options) }, }
list-watch
机制是k8s中用来监听对象变化的核心机制, ListWatch
包含 ListFunc
和 WatchFunc
两个函数,且不能为空,以上代码分别构造了PV、PVC、StorageClass三个对象的 ListWatch
结构体。该机制的实现在 client-go
的 cache
包中,具体参考: https://godoc.org/k8s.io/client-go/tools/cache。
更多 ListWatch
代码如下:
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource. type ListerWatcher interface { // List should return a list type object; the Items field will be extracted, and the // ResourceVersion field will be used to start the watch in the right place. List(options metav1.ListOptions) (runtime.Object, error) // Watch should begin a watch at the specified version. Watch(options metav1.ListOptions) (watch.Interface, error) } // ListFunc knows how to list resources type ListFunc func(options metav1.ListOptions) (runtime.Object, error) // WatchFunc knows how to watch resources type WatchFunc func(options metav1.ListOptions) (watch.Interface, error) // ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface. // It is a convenience function for users of NewReflector, etc. // ListFunc and WatchFunc must not be nil type ListWatch struct { ListFunc ListFunc WatchFunc WatchFunc // DisableChunking requests no chunking for this list watcher. DisableChunking bool }
3.2.4. ResourceEventHandlerFuncs
// PVC claimHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) }, UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) }, DeleteFunc: func(obj interface{}) { controller.forgetWork(controller.claimQueue, obj) }, } // PV volumeHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) }, UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) }, DeleteFunc: func(obj interface{}) { controller.forgetWork(controller.volumeQueue, obj) }, } // StorageClass classHandler := cache.ResourceEventHandlerFuncs{ // We don't need an actual event handler for StorageClasses, // but we must pass a non-nil one to cache.NewInformer() AddFunc: nil, UpdateFunc: nil, DeleteFunc: nil, }
ResourceEventHandlerFuncs
是资源事件处理函数,主要用来对k8s资源对象 增删改
变化的事件进行消息通知,该函数实现了 ResourceEventHandler
的接口。具体代码逻辑在 client-go
的cache包中。
更多 ResourceEventHandlerFuncs
代码可参考:
// ResourceEventHandler can handle notifications for events that happen to a // resource. The events are informational only, so you can't return an // error. // * OnAdd is called when an object is added. // * OnUpdate is called when an object is modified. Note that oldObj is the // last known state of the object-- it is possible that several changes // were combined together, so you can't use this to see every single // change. OnUpdate is also called when a re-list happens, and it will // get called even if nothing changed. This is useful for periodically // evaluating or syncing something. // * OnDelete will get the final state of the item if it is known, otherwise // it will get an object of type DeletedFinalStateUnknown. This can // happen if the watch is closed and misses the delete event and we don't // notice the deletion until the subsequent re-list. type ResourceEventHandler interface { OnAdd(obj interface{}) OnUpdate(oldObj, newObj interface{}) OnDelete(obj interface{}) } // ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or // as few of the notification functions as you want while still implementing // ResourceEventHandler. type ResourceEventHandlerFuncs struct { AddFunc func(obj interface{}) UpdateFunc func(oldObj, newObj interface{}) DeleteFunc func(obj interface{}) }
3.2.5. 构造Store和Controller
1、PVC
if controller.claimInformer != nil { controller.claimInformer.AddEventHandlerWithResyncPeriod(claimHandler, controller.resyncPeriod) controller.claims, controller.claimController = controller.claimInformer.GetStore(), controller.claimInformer.GetController() } else { controller.claims, controller.claimController = cache.NewInformer( claimSource, &v1.PersistentVolumeClaim{}, controller.resyncPeriod, claimHandler, ) }
2、PV
if controller.volumeInformer != nil { controller.volumeInformer.AddEventHandlerWithResyncPeriod(volumeHandler, controller.resyncPeriod) controller.volumes, controller.volumeController = controller.volumeInformer.GetStore(), controller.volumeInformer.GetController() } else { controller.volumes, controller.volumeController = cache.NewInformer( volumeSource, &v1.PersistentVolume{}, controller.resyncPeriod, volumeHandler, ) }
3、StorageClass
if controller.classInformer != nil { // no resource event handler needed for StorageClasses controller.classes, controller.classController = controller.classInformer.GetStore(), controller.classInformer.GetController() } else { controller.classes, controller.classController = cache.NewInformer( classSource, versionedClassType, controller.resyncPeriod, classHandler, ) }
通过 cache.NewInformer
的方法构造,入参是 ListWatch
结构体和 ResourceEventHandlerFuncs
函数等,返回值是 Store
和 Controller
。
通过以上各个部分的构造,最后返回一个具体的 ProvisionController
对象。
3.3. ProvisionController.Run方法
ProvisionController
的 Run
方法是以常驻进程的方式运行,函数内部再运行其他的controller。
3.3.1. prometheus数据收集
// Run starts all of this controller's control loops func (ctrl *ProvisionController) Run(stopCh <-chan struct{}) { run := func(stopCh <-chan struct{}) { ... if ctrl.metricsPort > 0 { prometheus.MustRegister([]prometheus.Collector{ metrics.PersistentVolumeClaimProvisionTotal, metrics.PersistentVolumeClaimProvisionFailedTotal, metrics.PersistentVolumeClaimProvisionDurationSeconds, metrics.PersistentVolumeDeleteTotal, metrics.PersistentVolumeDeleteFailedTotal, metrics.PersistentVolumeDeleteDurationSeconds, }...) http.Handle(ctrl.metricsPath, promhttp.Handler()) address := net.JoinHostPort(ctrl.metricsAddress, strconv.FormatInt(int64(ctrl.metricsPort), 10)) glog.Infof("Starting metrics server at %s\n", address) go wait.Forever(func() { err := http.ListenAndServe(address, nil) if err != nil { glog.Errorf("Failed to listen on %s: %v", address, err) } }, 5*time.Second) } ... }
3.3.2. Controller.Run
// If a SharedInformer has been passed in, this controller should not // call Run again if ctrl.claimInformer == nil { go ctrl.claimController.Run(stopCh) } if ctrl.volumeInformer == nil { go ctrl.volumeController.Run(stopCh) } if ctrl.classInformer == nil { go ctrl.classController.Run(stopCh) }
运行消息通知器Informer。
3.3.3. Worker
for i := 0; i < ctrl.threadiness; i++ { go wait.Until(ctrl.runClaimWorker, time.Second, stopCh) go wait.Until(ctrl.runVolumeWorker, time.Second, stopCh) }
runClaimWorker
和 runVolumeWorker
分别为PVC和PV的worker,这两个的具体执行体分别是 processNextClaimWorkItem
和 processNextVolumeWorkItem
。
执行流程如下:
PVC的函数调用流程
runClaimWorker→processNextClaimWorkItem→syncClaimHandler→syncClaim→provisionClaimOperation
PV的函数调用流程
runVolumeWorker→processNextVolumeWorkItem→syncVolumeHandler→syncVolume→deleteVolumeOperation
可见最后执行的函数分别是 provisionClaimOperation
和 deleteVolumeOperation
。
3.4. Operation
3.4.1. provisionClaimOperation
1、 provisionClaimOperation
入参是PVC,通过PVC获得PV对象,并判断PV对象是否存在,如果存在则退出后续操作。
// provisionClaimOperation attempts to provision a volume for the given claim. // Returns error, which indicates whether provisioning should be retried // (requeue the claim) or not func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVolumeClaim) error { // Most code here is identical to that found in controller.go of kube's PV controller... claimClass := helper.GetPersistentVolumeClaimClass(claim) operation := fmt.Sprintf("provision %q class %q", claimToClaimKey(claim), claimClass) glog.Infof(logOperation(operation, "started")) // A previous doProvisionClaim may just have finished while we were waiting for // the locks. Check that PV (with deterministic name) hasn't been provisioned // yet. pvName := ctrl.getProvisionedVolumeNameForClaim(claim) volume, err := ctrl.client.CoreV1().PersistentVolumes().Get(pvName, metav1.GetOptions{}) if err == nil && volume != nil { // Volume has been already provisioned, nothing to do. glog.Infof(logOperation(operation, "persistentvolume %q already exists, skipping", pvName)) return nil } ... }
2、获取StorageClass对象中的 Provisioner
和 ReclaimPolicy
参数,如果 provisionerName
和 StorageClass
对象中的 provisioner
字段不一致则报错并退出执行。
provisioner, parameters, err := ctrl.getStorageClassFields(claimClass) if err != nil { glog.Errorf(logOperation(operation, "error getting claim's StorageClass's fields: %v", err)) return nil } if provisioner != ctrl.provisionerName { // class.Provisioner has either changed since shouldProvision() or // annDynamicallyProvisioned contains different provisioner than // class.Provisioner. glog.Errorf(logOperation(operation, "unknown provisioner %q requested in claim's StorageClass", provisioner)) return nil } // Check if this provisioner can provision this claim. if err = ctrl.canProvision(claim); err != nil { ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error()) glog.Errorf(logOperation(operation, "failed to provision volume: %v", err)) return nil } reclaimPolicy := v1.PersistentVolumeReclaimDelete if ctrl.kubeVersion.AtLeast(utilversion.MustParseSemantic("v1.8.0")) { reclaimPolicy, err = ctrl.fetchReclaimPolicy(claimClass) if err != nil { return err } }
3、执行具体的 provisioner.Provision
方法,构建PV对象,例如本文中的 provisioner
是 nfs-provisioner
。
options := VolumeOptions{ PersistentVolumeReclaimPolicy: reclaimPolicy, PVName: pvName, PVC: claim, MountOptions: mountOptions, Parameters: parameters, SelectedNode: selectedNode, AllowedTopologies: allowedTopologies, } ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, "Provisioning", fmt.Sprintf("External provisioner is provisioning volume for claim %q", claimToClaimKey(claim))) volume, err = ctrl.provisioner.Provision(options) if err != nil { if ierr, ok := err.(*IgnoredError); ok { // Provision ignored, do nothing and hope another provisioner will provision it. glog.Infof(logOperation(operation, "volume provision ignored: %v", ierr)) return nil } err = fmt.Errorf("failed to provision volume with StorageClass %q: %v", claimClass, err) ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error()) return err }
4、创建k8s的PV对象。
// Try to create the PV object several times for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ { glog.Infof(logOperation(operation, "trying to save persistentvvolume %q", volume.Name)) if _, err = ctrl.client.CoreV1().PersistentVolumes().Create(volume); err == nil || apierrs.IsAlreadyExists(err) { // Save succeeded. if err != nil { glog.Infof(logOperation(operation, "persistentvolume %q already exists, reusing", volume.Name)) err = nil } else { glog.Infof(logOperation(operation, "persistentvolume %q saved", volume.Name)) } break } // Save failed, try again after a while. glog.Infof(logOperation(operation, "failed to save persistentvolume %q: %v", volume.Name, err)) time.Sleep(ctrl.createProvisionedPVInterval) }
5、创建PV失败,清理存储资源。
if err != nil { // Save failed. Now we have a storage asset outside of Kubernetes, // but we don't have appropriate PV object for it. // Emit some event here and try to delete the storage asset several // times. ... for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ { if err = ctrl.provisioner.Delete(volume); err == nil { // Delete succeeded glog.Infof(logOperation(operation, "cleaning volume %q succeeded", volume.Name)) break } // Delete failed, try again after a while. glog.Infof(logOperation(operation, "failed to clean volume %q: %v", volume.Name, err)) time.Sleep(ctrl.createProvisionedPVInterval) } if err != nil { // Delete failed several times. There is an orphaned volume and there // is nothing we can do about it. strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), err) glog.Error(logOperation(operation, strerr)) ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningCleanupFailed", strerr) } }
如果创建成功,则打印成功的日志,并返回 nil
。
3.4.2. deleteVolumeOperation
1、 deleteVolumeOperation
入参是PV,先获得PV对象,并判断是否需要删除。
// deleteVolumeOperation attempts to delete the volume backing the given // volume. Returns error, which indicates whether deletion should be retried // (requeue the volume) or not func (ctrl *ProvisionController) deleteVolumeOperation(volume *v1.PersistentVolume) error { ... // This method may have been waiting for a volume lock for some time. // Our check does not have to be as sophisticated as PV controller's, we can // trust that the PV controller has set the PV to Released/Failed and it's // ours to delete newVolume, err := ctrl.client.CoreV1().PersistentVolumes().Get(volume.Name, metav1.GetOptions{}) if err != nil { return nil } if !ctrl.shouldDelete(newVolume) { glog.Infof(logOperation(operation, "persistentvolume no longer needs deletion, skipping")) return nil } ... }
2、调用具体的 provisioner
的 Delete
方法,例如,如果是nfs-provisioner,则是调用nfs-provisioner的Delete方法。
err = ctrl.provisioner.Delete(volume) if err != nil { if ierr, ok := err.(*IgnoredError); ok { // Delete ignored, do nothing and hope another provisioner will delete it. glog.Infof(logOperation(operation, "volume deletion ignored: %v", ierr)) return nil } // Delete failed, emit an event. glog.Errorf(logOperation(operation, "volume deletion failed: %v", err)) ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, "VolumeFailedDelete", err.Error()) return err }
3、删除k8s中的PV对象。
// Delete the volume if err = ctrl.client.CoreV1().PersistentVolumes().Delete(volume.Name, nil); err != nil { // Oops, could not delete the volume and therefore the controller will // try to delete the volume again on next update. glog.Infof(logOperation(operation, "failed to delete persistentvolume: %v", err)) return err }
4. 总结
-
Provisioner
接口包含Provision
和Delete
两个方法,自定义的provisioner
需要实现这两个方法,这两个方法只是处理了跟存储类型相关的事项,并没有针对PV
、PVC
对象的增删等操作。 -
Provision
方法主要用来构造PV对象,不同类型的Provisioner
的,一般是PersistentVolumeSource
类型和参数不同,例如nfs-provisioner
对应的PersistentVolumeSource
为NFS
,并且需要传入NFS
相关的参数:Server
,Path
等。 -
Delete
方法主要针对对应的存储类型,做数据存档(备份)或删除的处理。 -
StorageClass
对象需要单独创建,用来指定具体的provisioner
来执行相关逻辑。 -
provisionClaimOperation
和deleteVolumeOperation
具体执行了k8s中PV
对象的创建和删除操作,同时调用了具体provisioner
的Provision
和Delete
两个方法来对存储数据做处理。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 以太坊源码分析(36)ethdb源码分析
- [源码分析] kubelet源码分析(一)之 NewKubeletCommand
- libmodbus源码分析(3)从机(服务端)功能源码分析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- Spring事务源码分析专题(一)JdbcTemplate使用及源码分析
- [源码分析] kube-scheduler源码分析(五)之 PrioritizeNodes
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Beginning Google Maps API 3
Gabriel Svennerberg / Apress / 2010-07-27 / $39.99
This book is about the next generation of the Google Maps API. It will provide the reader with the skills and knowledge necessary to incorporate Google Maps v3 on web pages in both desktop and mobile ......一起来看看 《Beginning Google Maps API 3》 这本书的介绍吧!