• 1. Dynamic Provisioner
    • 1.1. Provisioner Interface
    • 1.2. VolumeOptions
    • 1.3. ProvisionController
    • 1.4. 开发provisioner的步骤
  • 2. NFS Client Provisioner
    • 2.1. Main函数
      • 2.1.1. 读取环境变量
      • 2.1.2. 获取clientset对象
      • 2.1.3. 构造nfsProvisioner对象
      • 2.1.4. 构建并运行ProvisionController
    • 2.2. Provision 和 Delete 方法
      • 2.2.1. Provision方法
      • 2.2.2. Delete方法
  • 3. ProvisionController
    • 3.1. ProvisionController结构体
      • 3.1.1. 入参
      • 3.1.2. Controller和Informer
      • 3.1.3. workqueue
      • 3.1.4. 其他
    • 3.2. NewProvisionController 方法
      • 3.2.1. 初始化默认值
      • 3.2.2. 初始化任务队列
      • 3.2.3. ListWatch
      • 3.2.4. ResourceEventHandlerFuncs
      • 3.2.5. 构造Store和Controller
    • 3.3. ProvisionController.Run 方法
      • 3.3.1. prometheus数据收集
      • 3.3.2. Controller.Run
      • 3.3.3. Worker
    • 3.4. Operation
      • 3.4.1. provisionClaimOperation
      • 3.4.2. deleteVolumeOperation
  • 4. 总结

    如果要开发一个Dynamic Provisioner,需要使用到the helper library。

    1. Dynamic Provisioner

    1.1. Provisioner Interface

    开发Dynamic Provisioner需要实现Provisioner接口,该接口有两个方法,分别是:

    • Provision:创建存储资源,并且返回一个PV对象。
    • Delete:移除对应的存储资源,但并没有删除PV对象。

    Provisioner 接口源码如下:

    1. // Provisioner is an interface that creates templates for PersistentVolumes
    2. // and can create the volume as a new resource in the infrastructure provider.
    3. // It can also remove the volume it created from the underlying storage
    4. // provider.
    5. type Provisioner interface {
    6. // Provision creates a volume i.e. the storage asset and returns a PV object
    7. // for the volume
    8. Provision(VolumeOptions) (*v1.PersistentVolume, error)
    9. // Delete removes the storage asset that was created by Provision backing the
    10. // given PV. Does not delete the PV object itself.
    11. //
    12. // May return IgnoredError to indicate that the call has been ignored and no
    13. // action taken.
    14. Delete(*v1.PersistentVolume) error
    15. }

    1.2. VolumeOptions


    VolumeOptions 源码如下:

    1. // VolumeOptions contains option information about a volume
    2. // https://github.com/kubernetes/kubernetes/blob/release-1.4/pkg/volume/plugins.go
    3. type VolumeOptions struct {
    4. // Reclamation policy for a persistent volume
    5. PersistentVolumeReclaimPolicy v1.PersistentVolumeReclaimPolicy
    6. // PV.Name of the appropriate PersistentVolume. Used to generate cloud
    7. // volume name.
    8. PVName string
    9. // PV mount options. Not validated - mount of the PVs will simply fail if one is invalid.
    10. MountOptions []string
    11. // PVC is reference to the claim that lead to provisioning of a new PV.
    12. // Provisioners *must* create a PV that would be matched by this PVC,
    13. // i.e. with required capacity, accessMode, labels matching PVC.Selector and
    14. // so on.
    15. PVC *v1.PersistentVolumeClaim
    16. // Volume provisioning parameters from StorageClass
    17. Parameters map[string]string
    18. // Node selected by the scheduler for the volume.
    19. SelectedNode *v1.Node
    20. // Topology constraint parameter from StorageClass
    21. AllowedTopologies []v1.TopologySelectorTerm
    22. }

    1.3. ProvisionController


    1.4. 开发provisioner的步骤

    1. 写一个provisioner实现Provisioner接口(包含ProvisionDelete的方法)。
    2. 通过该provisioner构建ProvisionController
    3. 执行ProvisionControllerRun方法。

    2. NFS Client Provisioner

    nfs-client-provisioner是一个automatic provisioner,使用NFS作为存储,自动创建PV和对应的PVC,本身不提供NFS存储,需要外部先有一套NFS存储服务。

    • PV以 ${namespace}-${pvcName}-${pvName}的命名格式提供(在NFS服务器上)
    • PV回收的时候以 archieved-${namespace}-${pvcName}-${pvName} 的命名格式(在NFS服务器上)



    2.1. Main函数

    2.1.1. 读取环境变量


    1. func main() {
    2. flag.Parse()
    3. flag.Set("logtostderr", "true")
    4. server := os.Getenv("NFS_SERVER")
    5. if server == "" {
    6. glog.Fatal("NFS_SERVER not set")
    7. }
    8. path := os.Getenv("NFS_PATH")
    9. if path == "" {
    10. glog.Fatal("NFS_PATH not set")
    11. }
    12. provisionerName := os.Getenv(provisionerNameKey)
    13. if provisionerName == "" {
    14. glog.Fatalf("environment variable %s is not set! Please set it.", provisionerNameKey)
    15. }
    16. ...
    17. }


    • NFS_SERVER:NFS服务端的IP地址。
    • NFS_PATH:NFS服务端设置的共享目录
    • PROVISIONER_NAME:provisioner的名字,需要和StorageClass对象中的provisioner字段一致。


    1. apiVersion: storage.k8s.io/v1
    2. kind: StorageClass
    3. metadata:
    4. name: managed-nfs-storage
    5. provisioner: fuseim.pri/ifs # or choose another name, must match deployment's env PROVISIONER_NAME'
    6. parameters:
    7. archiveOnDelete: "false" # When set to "false" your PVs will not be archived by the provisioner upon deletion of the PVC.

    2.1.2. 获取clientset对象


    1. // Create an InClusterConfig and use it to create a client for the controller
    2. // to use to communicate with Kubernetes
    3. config, err := rest.InClusterConfig()
    4. if err != nil {
    5. glog.Fatalf("Failed to create config: %v", err)
    6. }
    7. clientset, err := kubernetes.NewForConfig(config)
    8. if err != nil {
    9. glog.Fatalf("Failed to create client: %v", err)
    10. }


    2.1.3. 构造nfsProvisioner对象


    1. // The controller needs to know what the server version is because out-of-tree
    2. // provisioners aren't officially supported until 1.5
    3. serverVersion, err := clientset.Discovery().ServerVersion()
    4. if err != nil {
    5. glog.Fatalf("Error getting server version: %v", err)
    6. }
    7. clientNFSProvisioner := &nfsProvisioner{
    8. client: clientset,
    9. server: server,
    10. path: path,
    11. }

    通过clientsetserverpath等值构造nfsProvisioner对象,同时还获取了k8s的版本信息,因为provisioners的功能在k8s 1.5及以上版本才支持。


    1. type nfsProvisioner struct {
    2. client kubernetes.Interface
    3. server string
    4. path string
    5. }
    6. var _ controller.Provisioner = &nfsProvisioner{}


    1. var _ controller.Provisioner = &nfsProvisioner{}


    2.1.4. 构建并运行ProvisionController


    1. // Start the provision controller which will dynamically provision efs NFS
    2. // PVs
    3. pc := controller.NewProvisionController(clientset, provisionerName, clientNFSProvisioner, serverVersion.GitVersion)
    4. pc.Run(wait.NeverStop)


    2.2. Provision和Delete方法

    2.2.1. Provision方法




    1. func (p *nfsProvisioner) Provision(options controller.VolumeOptions) (*v1.PersistentVolume, error) {
    2. if options.PVC.Spec.Selector != nil {
    3. return nil, fmt.Errorf("claim Selector is not supported")
    4. }
    5. glog.V(4).Infof("nfs provisioner: VolumeOptions %v", options)
    6. pvcNamespace := options.PVC.Namespace
    7. pvcName := options.PVC.Name
    8. pvName := strings.Join([]string{pvcNamespace, pvcName, options.PVName}, "-")
    9. fullPath := filepath.Join(mountPath, pvName)
    10. glog.V(4).Infof("creating path %s", fullPath)
    11. if err := os.MkdirAll(fullPath, 0777); err != nil {
    12. return nil, errors.New("unable to create directory to provision new pv: " + err.Error())
    13. }
    14. os.Chmod(fullPath, 0777)
    15. path := filepath.Join(p.path, pvName)
    16. ...
    17. }



    1. pv := &v1.PersistentVolume{
    2. ObjectMeta: metav1.ObjectMeta{
    3. Name: options.PVName,
    4. },
    5. Spec: v1.PersistentVolumeSpec{
    6. PersistentVolumeReclaimPolicy: options.PersistentVolumeReclaimPolicy,
    7. AccessModes: options.PVC.Spec.AccessModes,
    8. MountOptions: options.MountOptions,
    9. Capacity: v1.ResourceList{
    10. v1.ResourceName(v1.ResourceStorage): options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)],
    11. },
    12. PersistentVolumeSource: v1.PersistentVolumeSource{
    13. NFS: &v1.NFSVolumeSource{
    14. Server: p.server,
    15. Path: path,
    16. ReadOnly: false,
    17. },
    18. },
    19. },
    20. }
    21. return pv, nil



    2.2.2. Delete方法



    1. func (p *nfsProvisioner) Delete(volume *v1.PersistentVolume) error {
    2. path := volume.Spec.PersistentVolumeSource.NFS.Path
    3. pvName := filepath.Base(path)
    4. oldPath := filepath.Join(mountPath, pvName)
    5. if _, err := os.Stat(oldPath); os.IsNotExist(err) {
    6. glog.Warningf("path %s does not exist, deletion skipped", oldPath)
    7. return nil
    8. }
    9. ...
    10. }



    1. // Get the storage class for this volume.
    2. storageClass, err := p.getClassForVolume(volume)
    3. if err != nil {
    4. return err
    5. }
    6. // Determine if the "archiveOnDelete" parameter exists.
    7. // If it exists and has a falsey value, delete the directory.
    8. // Otherwise, archive it.
    9. archiveOnDelete, exists := storageClass.Parameters["archiveOnDelete"]
    10. if exists {
    11. archiveBool, err := strconv.ParseBool(archiveOnDelete)
    12. if err != nil {
    13. return err
    14. }
    15. if !archiveBool {
    16. return os.RemoveAll(oldPath)
    17. }
    18. }




    1. archivePath := filepath.Join(mountPath, "archived-"+pvName)
    2. glog.V(4).Infof("archiving path %s to %s", oldPath, archivePath)
    3. return os.Rename(oldPath, archivePath)


    3. ProvisionController

    3.1. ProvisionController结构体



    3.1.1. 入参

    1. // ProvisionController is a controller that provisions PersistentVolumes for
    2. // PersistentVolumeClaims.
    3. type ProvisionController struct {
    4. client kubernetes.Interface
    5. // The name of the provisioner for which this controller dynamically
    6. // provisions volumes. The value of annDynamicallyProvisioned and
    7. // annStorageProvisioner to set & watch for, respectively
    8. provisionerName string
    9. // The provisioner the controller will use to provision and delete volumes.
    10. // Presumably this implementer of Provisioner carries its own
    11. // volume-specific options and such that it needs in order to provision
    12. // volumes.
    13. provisioner Provisioner
    14. // Kubernetes cluster server version:
    15. // * 1.4: storage classes introduced as beta. Technically out-of-tree dynamic
    16. // provisioning is not officially supported, though it works
    17. // * 1.5: storage classes stay in beta. Out-of-tree dynamic provisioning is
    18. // officially supported
    19. // * 1.6: storage classes enter GA
    20. kubeVersion *utilversion.Version
    21. ...
    22. }


    • client:clientset客户端,用来调用k8s的API。
    • provisionerName:provisioner的名字,需要和StorageClass对象中的provisioner字段一致。
    • provisioner:具体的provisioner的实现者,本文为nfsProvisioner
    • kubeVersion:k8s的版本信息。

    3.1.2. Controller和Informer

    1. type ProvisionController struct {
    2. ...
    3. claimInformer cache.SharedInformer
    4. claims cache.Store
    5. claimController cache.Controller
    6. volumeInformer cache.SharedInformer
    7. volumes cache.Store
    8. volumeController cache.Controller
    9. classInformer cache.SharedInformer
    10. classes cache.Store
    11. classController cache.Controller
    12. ...
    13. }


    • Controller:通用的控制框架
    • Informer:消息通知器
    • Store:通用的对象存储接口

    3.1.3. workqueue

    1. type ProvisionController struct {
    2. ...
    3. claimQueue workqueue.RateLimitingInterface
    4. volumeQueue workqueue.RateLimitingInterface
    5. ...
    6. }


    3.1.4. 其他

    1. // Identity of this controller, generated at creation time and not persisted
    2. // across restarts. Useful only for debugging, for seeing the source of
    3. // events. controller.provisioner may have its own, different notion of
    4. // identity which may/may not persist across restarts
    5. id string
    6. component string
    7. eventRecorder record.EventRecorder
    8. resyncPeriod time.Duration
    9. exponentialBackOffOnError bool
    10. threadiness int
    11. createProvisionedPVRetryCount int
    12. createProvisionedPVInterval time.Duration
    13. failedProvisionThreshold, failedDeleteThreshold int
    14. // The port for metrics server to serve on.
    15. metricsPort int32
    16. // The IP address for metrics server to serve on.
    17. metricsAddress string
    18. // The path of metrics endpoint path.
    19. metricsPath string
    20. // Parameters of leaderelection.LeaderElectionConfig.
    21. leaseDuration, renewDeadline, retryPeriod time.Duration
    22. hasRun bool
    23. hasRunLock *sync.Mutex

    3.2. NewProvisionController方法



    3.2.1. 初始化默认值

    1. // NewProvisionController creates a new provision controller using
    2. // the given configuration parameters and with private (non-shared) informers.
    3. func NewProvisionController(
    4. client kubernetes.Interface,
    5. provisionerName string,
    6. provisioner Provisioner,
    7. kubeVersion string,
    8. options ...func(*ProvisionController) error,
    9. ) *ProvisionController {
    10. ...
    11. controller := &ProvisionController{
    12. client: client,
    13. provisionerName: provisionerName,
    14. provisioner: provisioner,
    15. kubeVersion: utilversion.MustParseSemantic(kubeVersion),
    16. id: id,
    17. component: component,
    18. eventRecorder: eventRecorder,
    19. resyncPeriod: DefaultResyncPeriod,
    20. exponentialBackOffOnError: DefaultExponentialBackOffOnError,
    21. threadiness: DefaultThreadiness,
    22. createProvisionedPVRetryCount: DefaultCreateProvisionedPVRetryCount,
    23. createProvisionedPVInterval: DefaultCreateProvisionedPVInterval,
    24. failedProvisionThreshold: DefaultFailedProvisionThreshold,
    25. failedDeleteThreshold: DefaultFailedDeleteThreshold,
    26. leaseDuration: DefaultLeaseDuration,
    27. renewDeadline: DefaultRenewDeadline,
    28. retryPeriod: DefaultRetryPeriod,
    29. metricsPort: DefaultMetricsPort,
    30. metricsAddress: DefaultMetricsAddress,
    31. metricsPath: DefaultMetricsPath,
    32. hasRun: false,
    33. hasRunLock: &sync.Mutex{},
    34. }
    35. ...
    36. }

    3.2.2. 初始化任务队列

    1. ratelimiter := workqueue.NewMaxOfRateLimiter(
    2. workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 1000*time.Second),
    3. &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
    4. )
    5. if !controller.exponentialBackOffOnError {
    6. ratelimiter = workqueue.NewMaxOfRateLimiter(
    7. workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 15*time.Second),
    8. &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
    9. )
    10. }
    11. controller.claimQueue = workqueue.NewNamedRateLimitingQueue(ratelimiter, "claims")
    12. controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(ratelimiter, "volumes")

    3.2.3. ListWatch

    1. // PVC
    2. claimSource := &cache.ListWatch{
    3. ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
    4. return client.CoreV1().PersistentVolumeClaims(v1.NamespaceAll).List(options)
    5. },
    6. WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
    7. return client.CoreV1().PersistentVolumeClaims(v1.NamespaceAll).Watch(options)
    8. },
    9. }
    10. // PV
    11. volumeSource := &cache.ListWatch{
    12. ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
    13. return client.CoreV1().PersistentVolumes().List(options)
    14. },
    15. WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
    16. return client.CoreV1().PersistentVolumes().Watch(options)
    17. },
    18. }
    19. // StorageClass
    20. classSource = &cache.ListWatch{
    21. ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
    22. return client.StorageV1().StorageClasses().List(options)
    23. },
    24. WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
    25. return client.StorageV1().StorageClasses().Watch(options)
    26. },
    27. }




    1. // ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
    2. type ListerWatcher interface {
    3. // List should return a list type object; the Items field will be extracted, and the
    4. // ResourceVersion field will be used to start the watch in the right place.
    5. List(options metav1.ListOptions) (runtime.Object, error)
    6. // Watch should begin a watch at the specified version.
    7. Watch(options metav1.ListOptions) (watch.Interface, error)
    8. }
    9. // ListFunc knows how to list resources
    10. type ListFunc func(options metav1.ListOptions) (runtime.Object, error)
    11. // WatchFunc knows how to watch resources
    12. type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
    13. // ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface.
    14. // It is a convenience function for users of NewReflector, etc.
    15. // ListFunc and WatchFunc must not be nil
    16. type ListWatch struct {
    17. ListFunc ListFunc
    18. WatchFunc WatchFunc
    19. // DisableChunking requests no chunking for this list watcher.
    20. DisableChunking bool
    21. }

    3.2.4. ResourceEventHandlerFuncs

    1. // PVC
    2. claimHandler := cache.ResourceEventHandlerFuncs{
    3. AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
    4. UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) },
    5. DeleteFunc: func(obj interface{}) { controller.forgetWork(controller.claimQueue, obj) },
    6. }
    7. // PV
    8. volumeHandler := cache.ResourceEventHandlerFuncs{
    9. AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
    10. UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) },
    11. DeleteFunc: func(obj interface{}) { controller.forgetWork(controller.volumeQueue, obj) },
    12. }
    13. // StorageClass
    14. classHandler := cache.ResourceEventHandlerFuncs{
    15. // We don't need an actual event handler for StorageClasses,
    16. // but we must pass a non-nil one to cache.NewInformer()
    17. AddFunc: nil,
    18. UpdateFunc: nil,
    19. DeleteFunc: nil,
    20. }



    1. // ResourceEventHandler can handle notifications for events that happen to a
    2. // resource. The events are informational only, so you can't return an
    3. // error.
    4. // * OnAdd is called when an object is added.
    5. // * OnUpdate is called when an object is modified. Note that oldObj is the
    6. // last known state of the object-- it is possible that several changes
    7. // were combined together, so you can't use this to see every single
    8. // change. OnUpdate is also called when a re-list happens, and it will
    9. // get called even if nothing changed. This is useful for periodically
    10. // evaluating or syncing something.
    11. // * OnDelete will get the final state of the item if it is known, otherwise
    12. // it will get an object of type DeletedFinalStateUnknown. This can
    13. // happen if the watch is closed and misses the delete event and we don't
    14. // notice the deletion until the subsequent re-list.
    15. type ResourceEventHandler interface {
    16. OnAdd(obj interface{})
    17. OnUpdate(oldObj, newObj interface{})
    18. OnDelete(obj interface{})
    19. }
    20. // ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
    21. // as few of the notification functions as you want while still implementing
    22. // ResourceEventHandler.
    23. type ResourceEventHandlerFuncs struct {
    24. AddFunc func(obj interface{})
    25. UpdateFunc func(oldObj, newObj interface{})
    26. DeleteFunc func(obj interface{})
    27. }

    3.2.5. 构造Store和Controller


    1. if controller.claimInformer != nil {
    2. controller.claimInformer.AddEventHandlerWithResyncPeriod(claimHandler, controller.resyncPeriod)
    3. controller.claims, controller.claimController =
    4. controller.claimInformer.GetStore(),
    5. controller.claimInformer.GetController()
    6. } else {
    7. controller.claims, controller.claimController =
    8. cache.NewInformer(
    9. claimSource,
    10. &v1.PersistentVolumeClaim{},
    11. controller.resyncPeriod,
    12. claimHandler,
    13. )
    14. }


    1. if controller.volumeInformer != nil {
    2. controller.volumeInformer.AddEventHandlerWithResyncPeriod(volumeHandler, controller.resyncPeriod)
    3. controller.volumes, controller.volumeController =
    4. controller.volumeInformer.GetStore(),
    5. controller.volumeInformer.GetController()
    6. } else {
    7. controller.volumes, controller.volumeController =
    8. cache.NewInformer(
    9. volumeSource,
    10. &v1.PersistentVolume{},
    11. controller.resyncPeriod,
    12. volumeHandler,
    13. )
    14. }


    1. if controller.classInformer != nil {
    2. // no resource event handler needed for StorageClasses
    3. controller.classes, controller.classController =
    4. controller.classInformer.GetStore(),
    5. controller.classInformer.GetController()
    6. } else {
    7. controller.classes, controller.classController = cache.NewInformer(
    8. classSource,
    9. versionedClassType,
    10. controller.resyncPeriod,
    11. classHandler,
    12. )
    13. }



    3.3. ProvisionController.Run方法


    3.3.1. prometheus数据收集

    1. // Run starts all of this controller's control loops
    2. func (ctrl *ProvisionController) Run(stopCh <-chan struct{}) {
    3. run := func(stopCh <-chan struct{}) {
    4. ...
    5. if ctrl.metricsPort > 0 {
    6. prometheus.MustRegister([]prometheus.Collector{
    7. metrics.PersistentVolumeClaimProvisionTotal,
    8. metrics.PersistentVolumeClaimProvisionFailedTotal,
    9. metrics.PersistentVolumeClaimProvisionDurationSeconds,
    10. metrics.PersistentVolumeDeleteTotal,
    11. metrics.PersistentVolumeDeleteFailedTotal,
    12. metrics.PersistentVolumeDeleteDurationSeconds,
    13. }...)
    14. http.Handle(ctrl.metricsPath, promhttp.Handler())
    15. address := net.JoinHostPort(ctrl.metricsAddress, strconv.FormatInt(int64(ctrl.metricsPort), 10))
    16. glog.Infof("Starting metrics server at %s\n", address)
    17. go wait.Forever(func() {
    18. err := http.ListenAndServe(address, nil)
    19. if err != nil {
    20. glog.Errorf("Failed to listen on %s: %v", address, err)
    21. }
    22. }, 5*time.Second)
    23. }
    24. ...
    25. }

    3.3.2. Controller.Run

    1. // If a SharedInformer has been passed in, this controller should not
    2. // call Run again
    3. if ctrl.claimInformer == nil {
    4. go ctrl.claimController.Run(stopCh)
    5. }
    6. if ctrl.volumeInformer == nil {
    7. go ctrl.volumeController.Run(stopCh)
    8. }
    9. if ctrl.classInformer == nil {
    10. go ctrl.classController.Run(stopCh)
    11. }


    3.3.3. Worker

    1. for i := 0; i < ctrl.threadiness; i++ {
    2. go wait.Until(ctrl.runClaimWorker, time.Second, stopCh)
    3. go wait.Until(ctrl.runVolumeWorker, time.Second, stopCh)
    4. }




    1. runClaimWorkerprocessNextClaimWorkItemsyncClaimHandlersyncClaimprovisionClaimOperation


    1. runVolumeWorkerprocessNextVolumeWorkItemsyncVolumeHandlersyncVolumedeleteVolumeOperation


    3.4. Operation

    3.4.1. provisionClaimOperation


    1. // provisionClaimOperation attempts to provision a volume for the given claim.
    2. // Returns error, which indicates whether provisioning should be retried
    3. // (requeue the claim) or not
    4. func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVolumeClaim) error {
    5. // Most code here is identical to that found in controller.go of kube's PV controller...
    6. claimClass := helper.GetPersistentVolumeClaimClass(claim)
    7. operation := fmt.Sprintf("provision %q class %q", claimToClaimKey(claim), claimClass)
    8. glog.Infof(logOperation(operation, "started"))
    9. // A previous doProvisionClaim may just have finished while we were waiting for
    10. // the locks. Check that PV (with deterministic name) hasn't been provisioned
    11. // yet.
    12. pvName := ctrl.getProvisionedVolumeNameForClaim(claim)
    13. volume, err := ctrl.client.CoreV1().PersistentVolumes().Get(pvName, metav1.GetOptions{})
    14. if err == nil && volume != nil {
    15. // Volume has been already provisioned, nothing to do.
    16. glog.Infof(logOperation(operation, "persistentvolume %q already exists, skipping", pvName))
    17. return nil
    18. }
    19. ...
    20. }


    1. provisioner, parameters, err := ctrl.getStorageClassFields(claimClass)
    2. if err != nil {
    3. glog.Errorf(logOperation(operation, "error getting claim's StorageClass's fields: %v", err))
    4. return nil
    5. }
    6. if provisioner != ctrl.provisionerName {
    7. // class.Provisioner has either changed since shouldProvision() or
    8. // annDynamicallyProvisioned contains different provisioner than
    9. // class.Provisioner.
    10. glog.Errorf(logOperation(operation, "unknown provisioner %q requested in claim's StorageClass", provisioner))
    11. return nil
    12. }
    13. // Check if this provisioner can provision this claim.
    14. if err = ctrl.canProvision(claim); err != nil {
    15. ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error())
    16. glog.Errorf(logOperation(operation, "failed to provision volume: %v", err))
    17. return nil
    18. }
    19. reclaimPolicy := v1.PersistentVolumeReclaimDelete
    20. if ctrl.kubeVersion.AtLeast(utilversion.MustParseSemantic("v1.8.0")) {
    21. reclaimPolicy, err = ctrl.fetchReclaimPolicy(claimClass)
    22. if err != nil {
    23. return err
    24. }
    25. }


    1. options := VolumeOptions{
    2. PersistentVolumeReclaimPolicy: reclaimPolicy,
    3. PVName: pvName,
    4. PVC: claim,
    5. MountOptions: mountOptions,
    6. Parameters: parameters,
    7. SelectedNode: selectedNode,
    8. AllowedTopologies: allowedTopologies,
    9. }
    10. ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, "Provisioning", fmt.Sprintf("External provisioner is provisioning volume for claim %q", claimToClaimKey(claim)))
    11. volume, err = ctrl.provisioner.Provision(options)
    12. if err != nil {
    13. if ierr, ok := err.(*IgnoredError); ok {
    14. // Provision ignored, do nothing and hope another provisioner will provision it.
    15. glog.Infof(logOperation(operation, "volume provision ignored: %v", ierr))
    16. return nil
    17. }
    18. err = fmt.Errorf("failed to provision volume with StorageClass %q: %v", claimClass, err)
    19. ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error())
    20. return err
    21. }


    1. // Try to create the PV object several times
    2. for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
    3. glog.Infof(logOperation(operation, "trying to save persistentvvolume %q", volume.Name))
    4. if _, err = ctrl.client.CoreV1().PersistentVolumes().Create(volume); err == nil || apierrs.IsAlreadyExists(err) {
    5. // Save succeeded.
    6. if err != nil {
    7. glog.Infof(logOperation(operation, "persistentvolume %q already exists, reusing", volume.Name))
    8. err = nil
    9. } else {
    10. glog.Infof(logOperation(operation, "persistentvolume %q saved", volume.Name))
    11. }
    12. break
    13. }
    14. // Save failed, try again after a while.
    15. glog.Infof(logOperation(operation, "failed to save persistentvolume %q: %v", volume.Name, err))
    16. time.Sleep(ctrl.createProvisionedPVInterval)
    17. }


    1. if err != nil {
    2. // Save failed. Now we have a storage asset outside of Kubernetes,
    3. // but we don't have appropriate PV object for it.
    4. // Emit some event here and try to delete the storage asset several
    5. // times.
    6. ...
    7. for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
    8. if err = ctrl.provisioner.Delete(volume); err == nil {
    9. // Delete succeeded
    10. glog.Infof(logOperation(operation, "cleaning volume %q succeeded", volume.Name))
    11. break
    12. }
    13. // Delete failed, try again after a while.
    14. glog.Infof(logOperation(operation, "failed to clean volume %q: %v", volume.Name, err))
    15. time.Sleep(ctrl.createProvisionedPVInterval)
    16. }
    17. if err != nil {
    18. // Delete failed several times. There is an orphaned volume and there
    19. // is nothing we can do about it.
    20. strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), err)
    21. glog.Error(logOperation(operation, strerr))
    22. ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningCleanupFailed", strerr)
    23. }
    24. }


    3.4.2. deleteVolumeOperation


    1. // deleteVolumeOperation attempts to delete the volume backing the given
    2. // volume. Returns error, which indicates whether deletion should be retried
    3. // (requeue the volume) or not
    4. func (ctrl *ProvisionController) deleteVolumeOperation(volume *v1.PersistentVolume) error {
    5. ...
    6. // This method may have been waiting for a volume lock for some time.
    7. // Our check does not have to be as sophisticated as PV controller's, we can
    8. // trust that the PV controller has set the PV to Released/Failed and it's
    9. // ours to delete
    10. newVolume, err := ctrl.client.CoreV1().PersistentVolumes().Get(volume.Name, metav1.GetOptions{})
    11. if err != nil {
    12. return nil
    13. }
    14. if !ctrl.shouldDelete(newVolume) {
    15. glog.Infof(logOperation(operation, "persistentvolume no longer needs deletion, skipping"))
    16. return nil
    17. }
    18. ...
    19. }


    1. err = ctrl.provisioner.Delete(volume)
    2. if err != nil {
    3. if ierr, ok := err.(*IgnoredError); ok {
    4. // Delete ignored, do nothing and hope another provisioner will delete it.
    5. glog.Infof(logOperation(operation, "volume deletion ignored: %v", ierr))
    6. return nil
    7. }
    8. // Delete failed, emit an event.
    9. glog.Errorf(logOperation(operation, "volume deletion failed: %v", err))
    10. ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, "VolumeFailedDelete", err.Error())
    11. return err
    12. }


    1. // Delete the volume
    2. if err = ctrl.client.CoreV1().PersistentVolumes().Delete(volume.Name, nil); err != nil {
    3. // Oops, could not delete the volume and therefore the controller will
    4. // try to delete the volume again on next update.
    5. glog.Infof(logOperation(operation, "failed to delete persistentvolume: %v", err))
    6. return err
    7. }

    4. 总结

    1. Provisioner接口包含ProvisionDelete两个方法,自定义的provisioner需要实现这两个方法,这两个方法只是处理了跟存储类型相关的事项,并没有针对PVPVC对象的增删等操作。
    2. Provision方法主要用来构造PV对象,不同类型的Provisioner的,一般是PersistentVolumeSource类型和参数不同,例如nfs-provisioner对应的PersistentVolumeSourceNFS,并且需要传入NFS相关的参数:ServerPath等。
    3. Delete方法主要针对对应的存储类型,做数据存档(备份)或删除的处理。
    4. StorageClass对象需要单独创建,用来指定具体的provisioner来执行相关逻辑。
    5. provisionClaimOperationdeleteVolumeOperation具体执行了k8s中PV对象的创建和删除操作,同时调用了具体provisionerProvisionDelete两个方法来对存储数据做处理。


    • https://github.com/kubernetes-incubator/external-storage/tree/master/docs/demo/hostpath-provisioner
    • https://github.com/kubernetes-incubator/external-storage/tree/master/nfs-client
    • https://github.com/kubernetes-incubator/external-storage/blob/master/lib/controller/controller.go
    • https://github.com/kubernetes-incubator/external-storage/blob/master/lib/controller/volume.go