• 1. Dynamic Provisioner
    • 1.1. Provisioner Interface
    • 1.2. 开发provisioner的步骤
  • 2. CSI Provisioner
    • 2.1. Main 函数
      • 2.1.1. 读取环境变量
      • 2.1.2. 获取clientset对象
      • 2.1.3. k8s版本校验
      • 2.1.4. 连接 csi socket
      • 2.1.5. 构造csi-Provisioner对象
      • 2.1.6. 运行ProvisionController
    • 2.2. Provision 和 Delete 方法
      • 2.2.1. Provision方法
      • 2.2.2. Delete方法
    • 2.3. 总结
  • 3. csi-client
    • 3.1. 构造csi-client
      • 3.1.1. 构造grpcClient
      • 3.1.2. 构造csi-client
    • 3.2. csiClient.CreateVolume
    • 3.3. csiClient.DeleteVolume
  • 4. ProvisionController.Run

    本文主要分析csi-provisioner的源码,关于开发一个Dynamic Provisioner,具体可参考nfs-client-provisioner的源码分析

    1. Dynamic Provisioner

    1.1. Provisioner Interface

    开发Dynamic Provisioner需要实现Provisioner接口,该接口有两个方法,分别是:

    • Provision:创建存储资源,并且返回一个PV对象。
    • Delete:移除对应的存储资源,但并没有删除PV对象。

    1.2. 开发provisioner的步骤

    1. 写一个provisioner实现Provisioner接口(包含ProvisionDelete的方法)。
    2. 通过该provisioner构建ProvisionController
    3. 执行ProvisionControllerRun方法。

    2. CSI Provisioner

    CSI Provisioner的源码可参考:https://github.com/kubernetes-csi/external-provisioner。

    2.1. Main 函数

    2.1.1. 读取环境变量

    源码如下:

    1. var (
    2. provisioner = flag.String("provisioner", "", "Name of the provisioner. The provisioner will only provision volumes for claims that request a StorageClass with a provisioner field set equal to this name.")
    3. master = flag.String("master", "", "Master URL to build a client config from. Either this or kubeconfig needs to be set if the provisioner is being run out of cluster.")
    4. kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Either this or master needs to be set if the provisioner is being run out of cluster.")
    5. csiEndpoint = flag.String("csi-address", "/run/csi/socket", "The gRPC endpoint for Target CSI Volume")
    6. connectionTimeout = flag.Duration("connection-timeout", 10*time.Second, "Timeout for waiting for CSI driver socket.")
    7. volumeNamePrefix = flag.String("volume-name-prefix", "pvc", "Prefix to apply to the name of a created volume")
    8. volumeNameUUIDLength = flag.Int("volume-name-uuid-length", -1, "Truncates generated UUID of a created volume to this length. Defaults behavior is to NOT truncate.")
    9. showVersion = flag.Bool("version", false, "Show version.")
    10. provisionController *controller.ProvisionController
    11. version = "unknown"
    12. )
    13. func init() {
    14. var config *rest.Config
    15. var err error
    16. flag.Parse()
    17. flag.Set("logtostderr", "true")
    18. if *showVersion {
    19. fmt.Println(os.Args[0], version)
    20. os.Exit(0)
    21. }
    22. glog.Infof("Version: %s", version)
    23. ...
    24. }

    通过init函数解析相关参数,其实provisioner指明为PVC提供PV的provisioner的名字,需要和StorageClass对象中的provisioner字段一致。

    2.1.2. 获取clientset对象

    源码如下:

    1. // get the KUBECONFIG from env if specified (useful for local/debug cluster)
    2. kubeconfigEnv := os.Getenv("KUBECONFIG")
    3. if kubeconfigEnv != "" {
    4. glog.Infof("Found KUBECONFIG environment variable set, using that..")
    5. kubeconfig = &kubeconfigEnv
    6. }
    7. if *master != "" || *kubeconfig != "" {
    8. glog.Infof("Either master or kubeconfig specified. building kube config from that..")
    9. config, err = clientcmd.BuildConfigFromFlags(*master, *kubeconfig)
    10. } else {
    11. glog.Infof("Building kube configs for running in cluster...")
    12. config, err = rest.InClusterConfig()
    13. }
    14. if err != nil {
    15. glog.Fatalf("Failed to create config: %v", err)
    16. }
    17. clientset, err := kubernetes.NewForConfig(config)
    18. if err != nil {
    19. glog.Fatalf("Failed to create client: %v", err)
    20. }
    21. // snapclientset.NewForConfig creates a new Clientset for VolumesnapshotV1alpha1Client
    22. snapClient, err := snapclientset.NewForConfig(config)
    23. if err != nil {
    24. glog.Fatalf("Failed to create snapshot client: %v", err)
    25. }
    26. csiAPIClient, err := csiclientset.NewForConfig(config)
    27. if err != nil {
    28. glog.Fatalf("Failed to create CSI API client: %v", err)
    29. }

    通过读取对应的k8s的配置,创建clientset对象,用来执行k8s对应的API,其中主要包括对PV和PVC等对象的创建删除等操作。

    2.1.3. k8s版本校验

    1. // The controller needs to know what the server version is because out-of-tree
    2. // provisioners aren't officially supported until 1.5
    3. serverVersion, err := clientset.Discovery().ServerVersion()
    4. if err != nil {
    5. glog.Fatalf("Error getting server version: %v", err)
    6. }

    获取了k8s的版本信息,因为provisioners的功能在k8s 1.5及以上版本才支持。

    2.1.4. 连接 csi socket

    1. // Generate a unique ID for this provisioner
    2. timeStamp := time.Now().UnixNano() / int64(time.Millisecond)
    3. identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + *provisioner
    4. // Provisioner will stay in Init until driver opens csi socket, once it's done
    5. // controller will exit this loop and proceed normally.
    6. socketDown := true
    7. grpcClient := &grpc.ClientConn{}
    8. for socketDown {
    9. grpcClient, err = ctrl.Connect(*csiEndpoint, *connectionTimeout)
    10. if err == nil {
    11. socketDown = false
    12. continue
    13. }
    14. time.Sleep(10 * time.Second)
    15. }

    Provisioner会停留在初始化状态,直到csi socket连接成功才正常运行。如果连接失败,会暂停10秒后重试,其中涉及以下2个参数:

    • csiEndpoint:CSI Volume的gRPC地址,默认通过为/run/csi/socket
    • connectionTimeout:连接CSI driver socket的超时时间,默认为10秒。

    2.1.5. 构造csi-Provisioner对象

    1. // Create the provisioner: it implements the Provisioner interface expected by
    2. // the controller
    3. csiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *csiEndpoint, *connectionTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient)
    4. provisionController = controller.NewProvisionController(
    5. clientset,
    6. *provisioner,
    7. csiProvisioner,
    8. serverVersion.GitVersion,
    9. )

    通过参数clientset,csiAPIClient, csiEndpoint, connectionTimeout, identity, volumeNamePrefix, volumeNameUUIDLength,grpcClient, snapClient构造csi-Provisioner对象。

    通过csiProvisioner构造ProvisionController对象。

    2.1.6. 运行ProvisionController

    1. func main() {
    2. provisionController.Run(wait.NeverStop)
    3. }

    ProvisionController实现了具体的PV和PVC的相关逻辑,Run方法以常驻进程的方式运行。

    2.2. Provision和Delete方法

    2.2.1. Provision方法

    csiProvisionerProvision方法具体源码参考:https://github.com/kubernetes-csi/external-provisioner/blob/master/pkg/controller/controller.go#L336

    Provision方法用来创建存储资源,并且返回一个PV对象。其中入参是VolumeOptions,用来指定PV对象的相关属性。

    1、构造PV相关属性

    1. pvName, err := makeVolumeName(p.volumeNamePrefix, fmt.Sprintf("%s", options.PVC.ObjectMeta.UID), p.volumeNameUUIDLength)
    2. if err != nil {
    3. return nil, err
    4. }

    2、构造CSIPersistentVolumeSource相关属性

    1. driverState, err := checkDriverState(p.grpcClient, p.timeout, needSnapshotSupport)
    2. if err != nil {
    3. return nil, err
    4. }
    5. ...
    6. // Resolve controller publish, node stage, node publish secret references
    7. controllerPublishSecretRef, err := getSecretReference(controllerPublishSecretNameKey, controllerPublishSecretNamespaceKey, options.Parameters, pvName, options.PVC)
    8. if err != nil {
    9. return nil, err
    10. }
    11. nodeStageSecretRef, err := getSecretReference(nodeStageSecretNameKey, nodeStageSecretNamespaceKey, options.Parameters, pvName, options.PVC)
    12. if err != nil {
    13. return nil, err
    14. }
    15. nodePublishSecretRef, err := getSecretReference(nodePublishSecretNameKey, nodePublishSecretNamespaceKey, options.Parameters, pvName, options.PVC)
    16. if err != nil {
    17. return nil, err
    18. }
    19. ...
    20. volumeAttributes := map[string]string{provisionerIDKey: p.identity}
    21. for k, v := range rep.Volume.Attributes {
    22. volumeAttributes[k] = v
    23. }
    24. ...
    25. fsType := ""
    26. for k, v := range options.Parameters {
    27. switch strings.ToLower(k) {
    28. case "fstype":
    29. fsType = v
    30. }
    31. }
    32. if len(fsType) == 0 {
    33. fsType = defaultFSType
    34. }

    3、创建CSI CreateVolumeRequest

    1. // Create a CSI CreateVolumeRequest and Response
    2. req := csi.CreateVolumeRequest{
    3. Name: pvName,
    4. Parameters: options.Parameters,
    5. VolumeCapabilities: volumeCaps,
    6. CapacityRange: &csi.CapacityRange{
    7. RequiredBytes: int64(volSizeBytes),
    8. },
    9. }
    10. ...
    11. glog.V(5).Infof("CreateVolumeRequest %+v", req)
    12. rep := &csi.CreateVolumeResponse{}
    13. ...
    14. opts := wait.Backoff{Duration: backoffDuration, Factor: backoffFactor, Steps: backoffSteps}
    15. err = wait.ExponentialBackoff(opts, func() (bool, error) {
    16. ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
    17. defer cancel()
    18. rep, err = p.csiClient.CreateVolume(ctx, &req)
    19. if err == nil {
    20. // CreateVolume has finished successfully
    21. return true, nil
    22. }
    23. if status, ok := status.FromError(err); ok {
    24. if status.Code() == codes.DeadlineExceeded {
    25. // CreateVolume timed out, give it another chance to complete
    26. glog.Warningf("CreateVolume timeout: %s has expired, operation will be retried", p.timeout.String())
    27. return false, nil
    28. }
    29. }
    30. // CreateVolume failed , no reason to retry, bailing from ExponentialBackoff
    31. return false, err
    32. })
    33. if err != nil {
    34. return nil, err
    35. }
    36. if rep.Volume != nil {
    37. glog.V(3).Infof("create volume rep: %+v", *rep.Volume)
    38. }
    39. respCap := rep.GetVolume().GetCapacityBytes()
    40. if respCap < volSizeBytes {
    41. capErr := fmt.Errorf("created volume capacity %v less than requested capacity %v", respCap, volSizeBytes)
    42. delReq := &csi.DeleteVolumeRequest{
    43. VolumeId: rep.GetVolume().GetId(),
    44. }
    45. delReq.ControllerDeleteSecrets = provisionerCredentials
    46. ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
    47. defer cancel()
    48. _, err := p.csiClient.DeleteVolume(ctx, delReq)
    49. if err != nil {
    50. capErr = fmt.Errorf("%v. Cleanup of volume %s failed, volume is orphaned: %v", capErr, pvName, err)
    51. }
    52. return nil, capErr
    53. }

    Provison方法核心功能是调用p.csiClient.CreateVolume(ctx, &req)

    4、构造PV对象

    1. pv := &v1.PersistentVolume{
    2. ObjectMeta: metav1.ObjectMeta{
    3. Name: pvName,
    4. },
    5. Spec: v1.PersistentVolumeSpec{
    6. PersistentVolumeReclaimPolicy: options.PersistentVolumeReclaimPolicy,
    7. AccessModes: options.PVC.Spec.AccessModes,
    8. Capacity: v1.ResourceList{
    9. v1.ResourceName(v1.ResourceStorage): bytesToGiQuantity(respCap),
    10. },
    11. // TODO wait for CSI VolumeSource API
    12. PersistentVolumeSource: v1.PersistentVolumeSource{
    13. CSI: &v1.CSIPersistentVolumeSource{
    14. Driver: driverState.driverName,
    15. VolumeHandle: p.volumeIdToHandle(rep.Volume.Id),
    16. FSType: fsType,
    17. VolumeAttributes: volumeAttributes,
    18. ControllerPublishSecretRef: controllerPublishSecretRef,
    19. NodeStageSecretRef: nodeStageSecretRef,
    20. NodePublishSecretRef: nodePublishSecretRef,
    21. },
    22. },
    23. },
    24. }
    25. if driverState.capabilities.Has(PluginCapability_ACCESSIBILITY_CONSTRAINTS) {
    26. pv.Spec.NodeAffinity = GenerateVolumeNodeAffinity(rep.Volume.AccessibleTopology)
    27. }
    28. glog.Infof("successfully created PV %+v", pv.Spec.PersistentVolumeSource)
    29. return pv, nil

    Provision方法只是通过VolumeOptions参数来构建PV对象,并没有执行具体PV的创建或删除的操作。

    不同类型的Provisioner的,一般是PersistentVolumeSource类型和参数不同,例如csi-provisioner对应的PersistentVolumeSourceCSI,并且需要传入CSI相关的参数:

    • Driver
    • VolumeHandle
    • FSType
    • VolumeAttributes
    • ControllerPublishSecretRef
    • NodeStageSecretRef
    • NodePublishSecretRef

    2.2.2. Delete方法

    csiProvisionerdelete方法具体源码参考:https://github.com/kubernetes-csi/external-provisioner/blob/master/pkg/controller/controller.go#L606

    1. func (p *csiProvisioner) Delete(volume *v1.PersistentVolume) error {
    2. if volume == nil || volume.Spec.CSI == nil {
    3. return fmt.Errorf("invalid CSI PV")
    4. }
    5. volumeId := p.volumeHandleToId(volume.Spec.CSI.VolumeHandle)
    6. _, err := checkDriverState(p.grpcClient, p.timeout, false)
    7. if err != nil {
    8. return err
    9. }
    10. req := csi.DeleteVolumeRequest{
    11. VolumeId: volumeId,
    12. }
    13. // get secrets if StorageClass specifies it
    14. storageClassName := volume.Spec.StorageClassName
    15. if len(storageClassName) != 0 {
    16. if storageClass, err := p.client.StorageV1().StorageClasses().Get(storageClassName, metav1.GetOptions{}); err == nil {
    17. // Resolve provision secret credentials.
    18. // No PVC is provided when resolving provision/delete secret names, since the PVC may or may not exist at delete time.
    19. provisionerSecretRef, err := getSecretReference(provisionerSecretNameKey, provisionerSecretNamespaceKey, storageClass.Parameters, volume.Name, nil)
    20. if err != nil {
    21. return err
    22. }
    23. credentials, err := getCredentials(p.client, provisionerSecretRef)
    24. if err != nil {
    25. return err
    26. }
    27. req.ControllerDeleteSecrets = credentials
    28. }
    29. }
    30. ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
    31. defer cancel()
    32. _, err = p.csiClient.DeleteVolume(ctx, &req)
    33. return err
    34. }

    Delete方法主要是调用了p.csiClient.DeleteVolume(ctx, &req)方法。

    2.3. 总结

    csi provisioner实现了Provisioner接口,其中包含ProvisonDelete两个方法:

    • Provision:调用csiClient.CreateVolume方法,同时构造并返回PV对象。
    • Delete:调用csiClient.DeleteVolume方法。

    csi provisioner的核心方法都调用了csi-client相关方法。

    3. csi-client

    csi client的相关代码参考:https://github.com/container-storage-interface/spec/blob/master/lib/go/csi/v0/csi.pb.go

    3.1. 构造csi-client

    3.1.1. 构造grpcClient

    1. // Provisioner will stay in Init until driver opens csi socket, once it's done
    2. // controller will exit this loop and proceed normally.
    3. socketDown := true
    4. grpcClient := &grpc.ClientConn{}
    5. for socketDown {
    6. grpcClient, err = ctrl.Connect(*csiEndpoint, *connectionTimeout)
    7. if err == nil {
    8. socketDown = false
    9. continue
    10. }
    11. time.Sleep(10 * time.Second)
    12. }

    通过连接csi socket,连接成功才构造可用的grpcClient

    3.1.2. 构造csi-client

    通过grpcClient构造csi-client

    1. // Create the provisioner: it implements the Provisioner interface expected by
    2. // the controller
    3. csiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *csiEndpoint, *connectionTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient)

    NewCSIProvisioner

    1. // NewCSIProvisioner creates new CSI provisioner
    2. func NewCSIProvisioner(client kubernetes.Interface,
    3. csiAPIClient csiclientset.Interface,
    4. csiEndpoint string,
    5. connectionTimeout time.Duration,
    6. identity string,
    7. volumeNamePrefix string,
    8. volumeNameUUIDLength int,
    9. grpcClient *grpc.ClientConn,
    10. snapshotClient snapclientset.Interface) controller.Provisioner {
    11. csiClient := csi.NewControllerClient(grpcClient)
    12. provisioner := &csiProvisioner{
    13. client: client,
    14. grpcClient: grpcClient,
    15. csiClient: csiClient,
    16. csiAPIClient: csiAPIClient,
    17. snapshotClient: snapshotClient,
    18. timeout: connectionTimeout,
    19. identity: identity,
    20. volumeNamePrefix: volumeNamePrefix,
    21. volumeNameUUIDLength: volumeNameUUIDLength,
    22. }
    23. return provisioner
    24. }

    NewControllerClient

    1. csiClient := csi.NewControllerClient(grpcClient)
    2. ...
    3. type controllerClient struct {
    4. cc *grpc.ClientConn
    5. }
    6. func NewControllerClient(cc *grpc.ClientConn) ControllerClient {
    7. return &controllerClient{cc}
    8. }

    3.2. csiClient.CreateVolume

    csi provisoner中调用csiClient.CreateVolume代码如下:

    1. opts := wait.Backoff{Duration: backoffDuration, Factor: backoffFactor, Steps: backoffSteps}
    2. err = wait.ExponentialBackoff(opts, func() (bool, error) {
    3. ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
    4. defer cancel()
    5. rep, err = p.csiClient.CreateVolume(ctx, &req)
    6. if err == nil {
    7. // CreateVolume has finished successfully
    8. return true, nil
    9. }
    10. if status, ok := status.FromError(err); ok {
    11. if status.Code() == codes.DeadlineExceeded {
    12. // CreateVolume timed out, give it another chance to complete
    13. glog.Warningf("CreateVolume timeout: %s has expired, operation will be retried", p.timeout.String())
    14. return false, nil
    15. }
    16. }
    17. // CreateVolume failed , no reason to retry, bailing from ExponentialBackoff
    18. return false, err
    19. })

    CreateVolumeRequest的构造:

    1. // Create a CSI CreateVolumeRequest and Response
    2. req := csi.CreateVolumeRequest{
    3. Name: pvName,
    4. Parameters: options.Parameters,
    5. VolumeCapabilities: volumeCaps,
    6. CapacityRange: &csi.CapacityRange{
    7. RequiredBytes: int64(volSizeBytes),
    8. },
    9. }
    10. ...
    11. req.VolumeContentSource = volumeContentSource
    12. ...
    13. req.AccessibilityRequirements = requirements
    14. ...
    15. req.ControllerCreateSecrets = provisionerCredentials

    具体的Create实现方法如下:

    其中csiClient是个接口类型

    具体代码参考controllerClient.CreateVolume

    1. func (c *controllerClient) CreateVolume(ctx context.Context, in *CreateVolumeRequest, opts ...grpc.CallOption) (*CreateVolumeResponse, error) {
    2. out := new(CreateVolumeResponse)
    3. err := grpc.Invoke(ctx, "/csi.v0.Controller/CreateVolume", in, out, c.cc, opts...)
    4. if err != nil {
    5. return nil, err
    6. }
    7. return out, nil
    8. }

    3.3. csiClient.DeleteVolume

    csi provisoner中调用csiClient.DeleteVolume代码如下:

    1. func (p *csiProvisioner) Delete(volume *v1.PersistentVolume) error {
    2. ...
    3. req := csi.DeleteVolumeRequest{
    4. VolumeId: volumeId,
    5. }
    6. // get secrets if StorageClass specifies it
    7. ...
    8. ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
    9. defer cancel()
    10. _, err = p.csiClient.DeleteVolume(ctx, &req)
    11. return err
    12. }

    DeleteVolumeRequest的构造:

    1. req := csi.DeleteVolumeRequest{
    2. VolumeId: volumeId,
    3. }
    4. ...
    5. req.ControllerDeleteSecrets = credentials

    将构造的DeleteVolumeRequest传给DeleteVolume方法。

    具体的Delete实现方法如下:

    具体代码参考:controllerClient.DeleteVolume

    1. func (c *controllerClient) DeleteVolume(ctx context.Context, in *DeleteVolumeRequest, opts ...grpc.CallOption) (*DeleteVolumeResponse, error) {
    2. out := new(DeleteVolumeResponse)
    3. err := grpc.Invoke(ctx, "/csi.v0.Controller/DeleteVolume", in, out, c.cc, opts...)
    4. if err != nil {
    5. return nil, err
    6. }
    7. return out, nil
    8. }

    4. ProvisionController.Run

    自定义的provisioner实现了Provisoner接口ProvisionDelete方法,这两个方法主要对后端存储做创建和删除操作,并没有对PV对象进行创建和删除操作。

    PV对象的相关操作具体由ProvisionController中的provisionClaimOperationdeleteVolumeOperation具体执行,同时调用了具体provisionerProvisionDelete两个方法来对存储数据做处理。

    1. func main() {
    2. provisionController.Run(wait.NeverStop)
    3. }

    这块代码逻辑可参考:nfs-client-provisioner 源码分析

    参考文章:

    • https://github.com/kubernetes-csi/external-provisioner
    • https://github.com/container-storage-interface/spec
    • https://github.com/kubernetes/community/blob/master/contributors/design-proposals/storage/container-storage-interface.md
    • https://github.com/container-storage-interface/spec/blob/master/spec.md