内容简介:如果要开发一个开发
如果要开发一个 Dynamic Provisioner
,需要使用到 the helper library
。
1. Dynamic Provisioner
1.1. Provisioner Interface
开发 Dynamic Provisioner
需要实现 Provisioner
接口,该接口有两个方法,分别是:
- Provision:创建存储资源,并且返回一个PV对象。
- Delete:移除对应的存储资源,但并没有删除PV对象。
Provisioner 接口源码如下:
// Provisioner is an interface that creates templates for PersistentVolumes // and can create the volume as a new resource in the infrastructure provider. // It can also remove the volume it created from the underlying storage // provider. type Provisioner interface { // Provision creates a volume i.e. the storage asset and returns a PV object // for the volume Provision(VolumeOptions) (*v1.PersistentVolume, error) // Delete removes the storage asset that was created by Provision backing the // given PV. Does not delete the PV object itself. // // May return IgnoredError to indicate that the call has been ignored and no // action taken. Delete(*v1.PersistentVolume) error }
1.2. VolumeOptions
Provisioner
接口的 Provision
方法的入参是一个 VolumeOptions
对象。 VolumeOptions
对象包含了创建PV对象所需要的信息,例如:PV的回收策略,PV的名字,PV所对应的PVC对象以及PVC的 StorageClass
对象使用的参数等。
VolumeOptions 源码如下:
// VolumeOptions contains option information about a volume // https://github.com/kubernetes/kubernetes/blob/release-1.4/pkg/volume/plugins.go type VolumeOptions struct { // Reclamation policy for a persistent volume PersistentVolumeReclaimPolicy v1.PersistentVolumeReclaimPolicy // PV.Name of the appropriate PersistentVolume. Used to generate cloud // volume name. PVName string // PV mount options. Not validated - mount of the PVs will simply fail if one is invalid. MountOptions []string // PVC is reference to the claim that lead to provisioning of a new PV. // Provisioners *must* create a PV that would be matched by this PVC, // i.e. with required capacity, accessMode, labels matching PVC.Selector and // so on. PVC *v1.PersistentVolumeClaim // Volume provisioning parameters from StorageClass Parameters map[string]string // Node selected by the scheduler for the volume. SelectedNode *v1.Node // Topology constraint parameter from StorageClass AllowedTopologies []v1.TopologySelectorTerm }
1.3. ProvisionController
ProvisionController
是一个给PVC提供PV的控制器,具体执行 Provisioner
接口的 Provision
和 Delete
的方法的所有逻辑。
1.4. 开发provisioner的步骤
-
写一个
provisioner
实现Provisioner
接口(包含Provision
和Delete
的方法)。 -
通过该
provisioner
构建ProvisionController
。 -
执行
ProvisionController
的Run
方法。
2. NFS Client Provisioner
nfs-client-provisioner
是一个 automatic provisioner
,使用NFS作为存储,自动创建PV和对应的PVC,本身不提供NFS存储,需要外部先有一套NFS存储服务。
${namespace}-${pvcName}-${pvName} archieved-${namespace}-${pvcName}-${pvName}
以下通过 nfs-client-provisioner
的源码分析来说明开发自定义 provisioner
整个过程。 nfs-client-provisioner
的主要代码都在 provisioner.go
的文件中。
nfs-client-provisioner
源码地址: https://github.com/kubernetes-incubator/external-storage/tree/master/nfs-client
2.1. Main函数
2.1.1. 读取环境变量
源码如下:
func main() { flag.Parse() flag.Set("logtostderr", "true") server := os.Getenv("NFS_SERVER") if server == "" { glog.Fatal("NFS_SERVER not set") } path := os.Getenv("NFS_PATH") if path == "" { glog.Fatal("NFS_PATH not set") } provisionerName := os.Getenv(provisionerNameKey) if provisionerName == "" { glog.Fatalf("environment variable %s is not set! Please set it.", provisionerNameKey) } ... }
main函数先获取 NFS_SERVER
、 NFS_PATH
、 PROVISIONER_NAME
三个环境变量的值,因此在部署nfs-client-provisioner的时候,需要将这三个环境变量的值传入。
-
NFS_SERVER
:NFS服务端的IP地址。 -
NFS_PATH
:NFS服务端设置的共享目录 -
PROVISIONER_NAME
:provisioner的名字,需要和StorageClass
对象中的provisioner
字段一致。
例如 StorageClass
对象的yaml文件如下:
apiVersion: storage.k8s.io/v1 kind: StorageClass metadata: name: managed-nfs-storage provisioner: fuseim.pri/ifs # or choose another name, must match deployment's env PROVISIONER_NAME' parameters: archiveOnDelete: "false" # When set to "false" your PVs will not be archived by the provisioner upon deletion of the PVC.
2.1.2. 获取clientset对象
源码如下:
// Create an InClusterConfig and use it to create a client for the controller // to use to communicate with Kubernetes config, err := rest.InClusterConfig() if err != nil { glog.Fatalf("Failed to create config: %v", err) } clientset, err := kubernetes.NewForConfig(config) if err != nil { glog.Fatalf("Failed to create client: %v", err) }
通过读取对应的k8s的配置,创建 clientset
对象,用来执行k8s对应的API,其中主要包括对PV和PVC等对象的创建删除等操作。
2.1.3. 构造nfsProvisioner对象
源码如下:
// The controller needs to know what the server version is because out-of-tree // provisioners aren't officially supported until 1.5 serverVersion, err := clientset.Discovery().ServerVersion() if err != nil { glog.Fatalf("Error getting server version: %v", err) } clientNFSProvisioner := &nfsProvisioner{ client: clientset, server: server, path: path, }
通过 clientset
、 server
、 path
等值构造 nfsProvisioner
对象,同时还获取了k8s的版本信息,因为provisioners的功能在k8s 1.5及以上版本才支持。
nfsProvisioner
类型定义如下:
type nfsProvisioner struct { client kubernetes.Interface server string path string } var _ controller.Provisioner = &nfsProvisioner{}
nfsProvisioner
是一个自定义的 provisioner
,用来实现 Provisioner
的接口,其中的属性除了 server
、 path
这两个关于NFS相关的参数,还包含了 client
,主要用来调用k8s的API。
var _ controller.Provisioner = &nfsProvisioner{}
以上用法用来检测 nfsProvisioner
是否实现了 Provisioner
的接口。
2.1.4. 构建并运行ProvisionController
源码如下:
// Start the provision controller which will dynamically provision efs NFS // PVs pc := controller.NewProvisionController(clientset, provisionerName, clientNFSProvisioner, serverVersion.GitVersion) pc.Run(wait.NeverStop)
通过 nfsProvisioner
构造 ProvisionController
对象并执行 Run
方法, ProvisionController
实现了具体的PV和PVC的相关逻辑, Run
方法以常驻进程的方式运行。
2.2. Provision 和 Delete 方法
2.2.1. Provision方法
nfsProvisioner
的 Provision
方法具体源码参考: https://github.com/kubernetes-incubator/external-storage/blob/master/nfs-client/cmd/nfs-client-provisioner/provisioner.go#L56
Provision
方法用来创建存储资源,并且返回一个 PV
对象。其中入参是 VolumeOptions
,用来指定 PV
对象的相关属性。
1、构建PV和PVC的名称
func (p *nfsProvisioner) Provision(options controller.VolumeOptions) (*v1.PersistentVolume, error) { if options.PVC.Spec.Selector != nil { return nil, fmt.Errorf("claim Selector is not supported") } glog.V(4).Infof("nfs provisioner: VolumeOptions %v", options) pvcNamespace := options.PVC.Namespace pvcName := options.PVC.Name pvName := strings.Join([]string{pvcNamespace, pvcName, options.PVName}, "-") fullPath := filepath.Join(mountPath, pvName) glog.V(4).Infof("creating path %s", fullPath) if err := os.MkdirAll(fullPath, 0777); err != nil { return nil, errors.New("unable to create directory to provision new pv: " + err.Error()) } os.Chmod(fullPath, 0777) path := filepath.Join(p.path, pvName) ... }
通过 VolumeOptions
的入参,构建PV和PVC的名称,以及创建路径path。
2、构造PV对象
pv := &v1.PersistentVolume{ ObjectMeta: metav1.ObjectMeta{ Name: options.PVName, }, Spec: v1.PersistentVolumeSpec{ PersistentVolumeReclaimPolicy: options.PersistentVolumeReclaimPolicy, AccessModes: options.PVC.Spec.AccessModes, MountOptions: options.MountOptions, Capacity: v1.ResourceList{ v1.ResourceName(v1.ResourceStorage): options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)], }, PersistentVolumeSource: v1.PersistentVolumeSource{ NFS: &v1.NFSVolumeSource{ Server: p.server, Path: path, ReadOnly: false, }, }, }, } return pv, nil
综上可以看出, Provision
方法只是通过 VolumeOptions
参数来构建 PV
对象,并没有执行具体 PV
的创建或删除的操作。
不同类型的 Provisioner
的,一般是 PersistentVolumeSource
类型和参数不同,例如 nfs-provisioner
对应的 PersistentVolumeSource
为 NFS
,并且需要传入 NFS
相关的参数: Server
, Path
等。
2.2.2. Delete方法
nfsProvisioner
的 delete
方法具体源码参考: https://github.com/kubernetes-incubator/external-storage/blob/master/nfs-client/cmd/nfs-client-provisioner/provisioner.go#L99
1、获取pvName和path等相关参数
func (p *nfsProvisioner) Delete(volume *v1.PersistentVolume) error { path := volume.Spec.PersistentVolumeSource.NFS.Path pvName := filepath.Base(path) oldPath := filepath.Join(mountPath, pvName) if _, err := os.Stat(oldPath); os.IsNotExist(err) { glog.Warningf("path %s does not exist, deletion skipped", oldPath) return nil } ... }
通过 path
和 pvName
生成 oldPath
,其中 oldPath
是原先NFS服务器上 pod
对应的数据持久化存储路径。
2、获取archiveOnDelete参数并删除数据
// Get the storage class for this volume. storageClass, err := p.getClassForVolume(volume) if err != nil { return err } // Determine if the "archiveOnDelete" parameter exists. // If it exists and has a falsey value, delete the directory. // Otherwise, archive it. archiveOnDelete, exists := storageClass.Parameters["archiveOnDelete"] if exists { archiveBool, err := strconv.ParseBool(archiveOnDelete) if err != nil { return err } if !archiveBool { return os.RemoveAll(oldPath) } }
如果 storageClass
对象中指定 archiveOnDelete
参数并且值为 false
,则会自动删除 oldPath
下的所有数据,即 pod
对应的数据持久化存储数据。
archiveOnDelete
字面意思为删除时是否存档,false表示不存档,即删除数据,true表示存档,即重命名路径。
3、重命名旧数据路径
archivePath := filepath.Join(mountPath, "archived-"+pvName) glog.V(4).Infof("archiving path %s to %s", oldPath, archivePath) return os.Rename(oldPath, archivePath)
如果 storageClass
对象中没有指定 archiveOnDelete
参数或者值为 true
,表明需要删除时存档,即将 oldPath
重命名,命名格式为 oldPath
前面增加 archived-
的前缀。
3. ProvisionController
3.1. ProvisionController结构体
ProvisionController
是一个给PVC提供PV的控制器,具体执行 Provisioner
接口的 Provision
和 Delete
的方法的所有逻辑。
3.1.1. 入参
// ProvisionController is a controller that provisions PersistentVolumes for // PersistentVolumeClaims. type ProvisionController struct { client kubernetes.Interface // The name of the provisioner for which this controller dynamically // provisions volumes. The value of annDynamicallyProvisioned and // annStorageProvisioner to set & watch for, respectively provisionerName string // The provisioner the controller will use to provision and delete volumes. // Presumably this implementer of Provisioner carries its own // volume-specific options and such that it needs in order to provision // volumes. provisioner Provisioner // Kubernetes cluster server version: // * 1.4: storage classes introduced as beta. Technically out-of-tree dynamic // provisioning is not officially supported, though it works // * 1.5: storage classes stay in beta. Out-of-tree dynamic provisioning is // officially supported // * 1.6: storage classes enter GA kubeVersion *utilversion.Version ... }
client
、 provisionerName
、 provisioner
、 kubeVersion
等属性作为 NewProvisionController
的入参。
-
client
:clientset客户端,用来调用k8s的API。 -
provisionerName
:provisioner的名字,需要和StorageClass
对象中的provisioner
字段一致。 -
provisioner
:具体的provisioner的实现者,本文为nfsProvisioner
。 -
kubeVersion
:k8s的版本信息。
3.1.2. Controller和Informer
type ProvisionController struct { ... claimInformer cache.SharedInformer claims cache.Store claimController cache.Controller volumeInformer cache.SharedInformer volumes cache.Store volumeController cache.Controller classInformer cache.SharedInformer classes cache.Store classController cache.Controller ... }
ProvisionController
结构体中包含了 PV
、 PVC
、 StorageClass
三个对象的 Controller
、 Informer
和 Store
,主要用来执行这三个对象的相关操作。
- Controller:通用的控制框架
- Informer:消息通知器
- Store:通用的对象存储接口
3.1.3. workqueue
type ProvisionController struct { ... claimQueue workqueue.RateLimitingInterface volumeQueue workqueue.RateLimitingInterface ... }
claimQueue
和 volumeQueue
分别是 PV
和 PVC
的任务队列。
3.1.4. 其他
// Identity of this controller, generated at creation time and not persisted // across restarts. Useful only for debugging, for seeing the source of // events. controller.provisioner may have its own, different notion of // identity which may/may not persist across restarts id string component string eventRecorder record.EventRecorder resyncPeriod time.Duration exponentialBackOffOnError bool threadiness int createProvisionedPVRetryCount int createProvisionedPVInterval time.Duration failedProvisionThreshold, failedDeleteThreshold int // The port for metrics server to serve on. metricsPort int32 // The IP address for metrics server to serve on. metricsAddress string // The path of metrics endpoint path. metricsPath string // Parameters of leaderelection.LeaderElectionConfig. leaseDuration, renewDeadline, retryPeriod time.Duration hasRun bool hasRunLock *sync.Mutex
3.2. NewProvisionController方法
NewProvisionController
方法主要用来构造 ProvisionController
。
3.2.1. 初始化默认值
// NewProvisionController creates a new provision controller using // the given configuration parameters and with private (non-shared) informers. func NewProvisionController( client kubernetes.Interface, provisionerName string, provisioner Provisioner, kubeVersion string, options ...func(*ProvisionController) error, ) *ProvisionController { ... controller := &ProvisionController{ client: client, provisionerName: provisionerName, provisioner: provisioner, kubeVersion: utilversion.MustParseSemantic(kubeVersion), id: id, component: component, eventRecorder: eventRecorder, resyncPeriod: DefaultResyncPeriod, exponentialBackOffOnError: DefaultExponentialBackOffOnError, threadiness: DefaultThreadiness, createProvisionedPVRetryCount: DefaultCreateProvisionedPVRetryCount, createProvisionedPVInterval: DefaultCreateProvisionedPVInterval, failedProvisionThreshold: DefaultFailedProvisionThreshold, failedDeleteThreshold: DefaultFailedDeleteThreshold, leaseDuration: DefaultLeaseDuration, renewDeadline: DefaultRenewDeadline, retryPeriod: DefaultRetryPeriod, metricsPort: DefaultMetricsPort, metricsAddress: DefaultMetricsAddress, metricsPath: DefaultMetricsPath, hasRun: false, hasRunLock: &sync.Mutex{}, } ... }
3.2.2. 初始化任务队列
ratelimiter := workqueue.NewMaxOfRateLimiter( workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 1000*time.Second), &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, ) if !controller.exponentialBackOffOnError { ratelimiter = workqueue.NewMaxOfRateLimiter( workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 15*time.Second), &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, ) } controller.claimQueue = workqueue.NewNamedRateLimitingQueue(ratelimiter, "claims") controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(ratelimiter, "volumes")
3.2.3. ListWatch
// PVC claimSource := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { return client.CoreV1().PersistentVolumeClaims(v1.NamespaceAll).List(options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return client.CoreV1().PersistentVolumeClaims(v1.NamespaceAll).Watch(options) }, } // PV volumeSource := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { return client.CoreV1().PersistentVolumes().List(options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return client.CoreV1().PersistentVolumes().Watch(options) }, } // StorageClass classSource = &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { return client.StorageV1().StorageClasses().List(options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return client.StorageV1().StorageClasses().Watch(options) }, }
list-watch
机制是k8s中用来监听对象变化的核心机制, ListWatch
包含 ListFunc
和 WatchFunc
两个函数,且不能为空,以上代码分别构造了PV、PVC、StorageClass三个对象的 ListWatch
结构体。该机制的实现在 client-go
的 cache
包中,具体参考: https://godoc.org/k8s.io/client-go/tools/cache。
更多 ListWatch
代码如下:
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource. type ListerWatcher interface { // List should return a list type object; the Items field will be extracted, and the // ResourceVersion field will be used to start the watch in the right place. List(options metav1.ListOptions) (runtime.Object, error) // Watch should begin a watch at the specified version. Watch(options metav1.ListOptions) (watch.Interface, error) } // ListFunc knows how to list resources type ListFunc func(options metav1.ListOptions) (runtime.Object, error) // WatchFunc knows how to watch resources type WatchFunc func(options metav1.ListOptions) (watch.Interface, error) // ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface. // It is a convenience function for users of NewReflector, etc. // ListFunc and WatchFunc must not be nil type ListWatch struct { ListFunc ListFunc WatchFunc WatchFunc // DisableChunking requests no chunking for this list watcher. DisableChunking bool }
3.2.4. ResourceEventHandlerFuncs
// PVC claimHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) }, UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) }, DeleteFunc: func(obj interface{}) { controller.forgetWork(controller.claimQueue, obj) }, } // PV volumeHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) }, UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) }, DeleteFunc: func(obj interface{}) { controller.forgetWork(controller.volumeQueue, obj) }, } // StorageClass classHandler := cache.ResourceEventHandlerFuncs{ // We don't need an actual event handler for StorageClasses, // but we must pass a non-nil one to cache.NewInformer() AddFunc: nil, UpdateFunc: nil, DeleteFunc: nil, }
ResourceEventHandlerFuncs
是资源事件处理函数,主要用来对k8s资源对象 增删改
变化的事件进行消息通知,该函数实现了 ResourceEventHandler
的接口。具体代码逻辑在 client-go
的cache包中。
更多 ResourceEventHandlerFuncs
代码可参考:
// ResourceEventHandler can handle notifications for events that happen to a // resource. The events are informational only, so you can't return an // error. // * OnAdd is called when an object is added. // * OnUpdate is called when an object is modified. Note that oldObj is the // last known state of the object-- it is possible that several changes // were combined together, so you can't use this to see every single // change. OnUpdate is also called when a re-list happens, and it will // get called even if nothing changed. This is useful for periodically // evaluating or syncing something. // * OnDelete will get the final state of the item if it is known, otherwise // it will get an object of type DeletedFinalStateUnknown. This can // happen if the watch is closed and misses the delete event and we don't // notice the deletion until the subsequent re-list. type ResourceEventHandler interface { OnAdd(obj interface{}) OnUpdate(oldObj, newObj interface{}) OnDelete(obj interface{}) } // ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or // as few of the notification functions as you want while still implementing // ResourceEventHandler. type ResourceEventHandlerFuncs struct { AddFunc func(obj interface{}) UpdateFunc func(oldObj, newObj interface{}) DeleteFunc func(obj interface{}) }
3.2.5. 构造Store和Controller
1、PVC
if controller.claimInformer != nil { controller.claimInformer.AddEventHandlerWithResyncPeriod(claimHandler, controller.resyncPeriod) controller.claims, controller.claimController = controller.claimInformer.GetStore(), controller.claimInformer.GetController() } else { controller.claims, controller.claimController = cache.NewInformer( claimSource, &v1.PersistentVolumeClaim{}, controller.resyncPeriod, claimHandler, ) }
2、PV
if controller.volumeInformer != nil { controller.volumeInformer.AddEventHandlerWithResyncPeriod(volumeHandler, controller.resyncPeriod) controller.volumes, controller.volumeController = controller.volumeInformer.GetStore(), controller.volumeInformer.GetController() } else { controller.volumes, controller.volumeController = cache.NewInformer( volumeSource, &v1.PersistentVolume{}, controller.resyncPeriod, volumeHandler, ) }
3、StorageClass
if controller.classInformer != nil { // no resource event handler needed for StorageClasses controller.classes, controller.classController = controller.classInformer.GetStore(), controller.classInformer.GetController() } else { controller.classes, controller.classController = cache.NewInformer( classSource, versionedClassType, controller.resyncPeriod, classHandler, ) }
通过 cache.NewInformer
的方法构造,入参是 ListWatch
结构体和 ResourceEventHandlerFuncs
函数等,返回值是 Store
和 Controller
。
通过以上各个部分的构造,最后返回一个具体的 ProvisionController
对象。
3.3. ProvisionController.Run方法
ProvisionController
的 Run
方法是以常驻进程的方式运行,函数内部再运行其他的controller。
3.3.1. prometheus数据收集
// Run starts all of this controller's control loops func (ctrl *ProvisionController) Run(stopCh <-chan struct{}) { run := func(stopCh <-chan struct{}) { ... if ctrl.metricsPort > 0 { prometheus.MustRegister([]prometheus.Collector{ metrics.PersistentVolumeClaimProvisionTotal, metrics.PersistentVolumeClaimProvisionFailedTotal, metrics.PersistentVolumeClaimProvisionDurationSeconds, metrics.PersistentVolumeDeleteTotal, metrics.PersistentVolumeDeleteFailedTotal, metrics.PersistentVolumeDeleteDurationSeconds, }...) http.Handle(ctrl.metricsPath, promhttp.Handler()) address := net.JoinHostPort(ctrl.metricsAddress, strconv.FormatInt(int64(ctrl.metricsPort), 10)) glog.Infof("Starting metrics server at %s\n", address) go wait.Forever(func() { err := http.ListenAndServe(address, nil) if err != nil { glog.Errorf("Failed to listen on %s: %v", address, err) } }, 5*time.Second) } ... }
3.3.2. Controller.Run
// If a SharedInformer has been passed in, this controller should not // call Run again if ctrl.claimInformer == nil { go ctrl.claimController.Run(stopCh) } if ctrl.volumeInformer == nil { go ctrl.volumeController.Run(stopCh) } if ctrl.classInformer == nil { go ctrl.classController.Run(stopCh) }
运行消息通知器Informer。
3.3.3. Worker
for i := 0; i < ctrl.threadiness; i++ { go wait.Until(ctrl.runClaimWorker, time.Second, stopCh) go wait.Until(ctrl.runVolumeWorker, time.Second, stopCh) }
runClaimWorker
和 runVolumeWorker
分别为PVC和PV的worker,这两个的具体执行体分别是 processNextClaimWorkItem
和 processNextVolumeWorkItem
。
执行流程如下:
PVC的函数调用流程
runClaimWorker→processNextClaimWorkItem→syncClaimHandler→syncClaim→provisionClaimOperation
PV的函数调用流程
runVolumeWorker→processNextVolumeWorkItem→syncVolumeHandler→syncVolume→deleteVolumeOperation
可见最后执行的函数分别是 provisionClaimOperation
和 deleteVolumeOperation
。
3.4. Operation
3.4.1. provisionClaimOperation
1、 provisionClaimOperation
入参是PVC,通过PVC获得PV对象,并判断PV对象是否存在,如果存在则退出后续操作。
// provisionClaimOperation attempts to provision a volume for the given claim. // Returns error, which indicates whether provisioning should be retried // (requeue the claim) or not func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVolumeClaim) error { // Most code here is identical to that found in controller.go of kube's PV controller... claimClass := helper.GetPersistentVolumeClaimClass(claim) operation := fmt.Sprintf("provision %q class %q", claimToClaimKey(claim), claimClass) glog.Infof(logOperation(operation, "started")) // A previous doProvisionClaim may just have finished while we were waiting for // the locks. Check that PV (with deterministic name) hasn't been provisioned // yet. pvName := ctrl.getProvisionedVolumeNameForClaim(claim) volume, err := ctrl.client.CoreV1().PersistentVolumes().Get(pvName, metav1.GetOptions{}) if err == nil && volume != nil { // Volume has been already provisioned, nothing to do. glog.Infof(logOperation(operation, "persistentvolume %q already exists, skipping", pvName)) return nil } ... }
2、获取StorageClass对象中的 Provisioner
和 ReclaimPolicy
参数,如果 provisionerName
和 StorageClass
对象中的 provisioner
字段不一致则报错并退出执行。
provisioner, parameters, err := ctrl.getStorageClassFields(claimClass) if err != nil { glog.Errorf(logOperation(operation, "error getting claim's StorageClass's fields: %v", err)) return nil } if provisioner != ctrl.provisionerName { // class.Provisioner has either changed since shouldProvision() or // annDynamicallyProvisioned contains different provisioner than // class.Provisioner. glog.Errorf(logOperation(operation, "unknown provisioner %q requested in claim's StorageClass", provisioner)) return nil } // Check if this provisioner can provision this claim. if err = ctrl.canProvision(claim); err != nil { ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error()) glog.Errorf(logOperation(operation, "failed to provision volume: %v", err)) return nil } reclaimPolicy := v1.PersistentVolumeReclaimDelete if ctrl.kubeVersion.AtLeast(utilversion.MustParseSemantic("v1.8.0")) { reclaimPolicy, err = ctrl.fetchReclaimPolicy(claimClass) if err != nil { return err } }
3、执行具体的 provisioner.Provision
方法,构建PV对象,例如本文中的 provisioner
是 nfs-provisioner
。
options := VolumeOptions{ PersistentVolumeReclaimPolicy: reclaimPolicy, PVName: pvName, PVC: claim, MountOptions: mountOptions, Parameters: parameters, SelectedNode: selectedNode, AllowedTopologies: allowedTopologies, } ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, "Provisioning", fmt.Sprintf("External provisioner is provisioning volume for claim %q", claimToClaimKey(claim))) volume, err = ctrl.provisioner.Provision(options) if err != nil { if ierr, ok := err.(*IgnoredError); ok { // Provision ignored, do nothing and hope another provisioner will provision it. glog.Infof(logOperation(operation, "volume provision ignored: %v", ierr)) return nil } err = fmt.Errorf("failed to provision volume with StorageClass %q: %v", claimClass, err) ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error()) return err }
4、创建k8s的PV对象。
// Try to create the PV object several times for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ { glog.Infof(logOperation(operation, "trying to save persistentvvolume %q", volume.Name)) if _, err = ctrl.client.CoreV1().PersistentVolumes().Create(volume); err == nil || apierrs.IsAlreadyExists(err) { // Save succeeded. if err != nil { glog.Infof(logOperation(operation, "persistentvolume %q already exists, reusing", volume.Name)) err = nil } else { glog.Infof(logOperation(operation, "persistentvolume %q saved", volume.Name)) } break } // Save failed, try again after a while. glog.Infof(logOperation(operation, "failed to save persistentvolume %q: %v", volume.Name, err)) time.Sleep(ctrl.createProvisionedPVInterval) }
5、创建PV失败,清理存储资源。
if err != nil { // Save failed. Now we have a storage asset outside of Kubernetes, // but we don't have appropriate PV object for it. // Emit some event here and try to delete the storage asset several // times. ... for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ { if err = ctrl.provisioner.Delete(volume); err == nil { // Delete succeeded glog.Infof(logOperation(operation, "cleaning volume %q succeeded", volume.Name)) break } // Delete failed, try again after a while. glog.Infof(logOperation(operation, "failed to clean volume %q: %v", volume.Name, err)) time.Sleep(ctrl.createProvisionedPVInterval) } if err != nil { // Delete failed several times. There is an orphaned volume and there // is nothing we can do about it. strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), err) glog.Error(logOperation(operation, strerr)) ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningCleanupFailed", strerr) } }
如果创建成功,则打印成功的日志,并返回 nil
。
3.4.2. deleteVolumeOperation
1、 deleteVolumeOperation
入参是PV,先获得PV对象,并判断是否需要删除。
// deleteVolumeOperation attempts to delete the volume backing the given // volume. Returns error, which indicates whether deletion should be retried // (requeue the volume) or not func (ctrl *ProvisionController) deleteVolumeOperation(volume *v1.PersistentVolume) error { ... // This method may have been waiting for a volume lock for some time. // Our check does not have to be as sophisticated as PV controller's, we can // trust that the PV controller has set the PV to Released/Failed and it's // ours to delete newVolume, err := ctrl.client.CoreV1().PersistentVolumes().Get(volume.Name, metav1.GetOptions{}) if err != nil { return nil } if !ctrl.shouldDelete(newVolume) { glog.Infof(logOperation(operation, "persistentvolume no longer needs deletion, skipping")) return nil } ... }
2、调用具体的 provisioner
的 Delete
方法,例如,如果是nfs-provisioner,则是调用nfs-provisioner的Delete方法。
err = ctrl.provisioner.Delete(volume) if err != nil { if ierr, ok := err.(*IgnoredError); ok { // Delete ignored, do nothing and hope another provisioner will delete it. glog.Infof(logOperation(operation, "volume deletion ignored: %v", ierr)) return nil } // Delete failed, emit an event. glog.Errorf(logOperation(operation, "volume deletion failed: %v", err)) ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, "VolumeFailedDelete", err.Error()) return err }
3、删除k8s中的PV对象。
// Delete the volume if err = ctrl.client.CoreV1().PersistentVolumes().Delete(volume.Name, nil); err != nil { // Oops, could not delete the volume and therefore the controller will // try to delete the volume again on next update. glog.Infof(logOperation(operation, "failed to delete persistentvolume: %v", err)) return err }
4. 总结
-
Provisioner
接口包含Provision
和Delete
两个方法,自定义的provisioner
需要实现这两个方法,这两个方法只是处理了跟存储类型相关的事项,并没有针对PV
、PVC
对象的增删等操作。 -
Provision
方法主要用来构造PV对象,不同类型的Provisioner
的,一般是PersistentVolumeSource
类型和参数不同,例如nfs-provisioner
对应的PersistentVolumeSource
为NFS
,并且需要传入NFS
相关的参数:Server
,Path
等。 -
Delete
方法主要针对对应的存储类型,做数据存档(备份)或删除的处理。 -
StorageClass
对象需要单独创建,用来指定具体的provisioner
来执行相关逻辑。 -
provisionClaimOperation
和deleteVolumeOperation
具体执行了k8s中PV
对象的创建和删除操作,同时调用了具体provisioner
的Provision
和Delete
两个方法来对存储数据做处理。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 以太坊源码分析(36)ethdb源码分析
- [源码分析] kubelet源码分析(一)之 NewKubeletCommand
- libmodbus源码分析(3)从机(服务端)功能源码分析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- Spring事务源码分析专题(一)JdbcTemplate使用及源码分析
- [源码分析] kube-scheduler源码分析(五)之 PrioritizeNodes
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Ruby on Rails电子商务实战
Christian Hellsten、Jarkko Laine / 曹维远 / 人民邮电出版社 / 2008-4 / 49.00元
《Ruby on Rails电子商务实战》全面讲解了使用Ruby on Rails创建产品级应用程序的过程。书中通过演示构建网上书店的全过程,先后介绍如何使用如TDD的敏捷实践,启动一个项目并建立良好稳定的基础,如何深入Ruby on Rails,实现诸如将应用程序翻译成各种语言对产品进行调试等的普遍需求。其中用到的主要技术包括Ajax、聚合、设置标签和国际化等,还介绍了如何使用ActiveRec......一起来看看 《Ruby on Rails电子商务实战》 这本书的介绍吧!