tags: container,k8s,源码分析

Kubelet VolumeManager源码分析: 卷的卸载

本文是Kubelet VolumeManager源码分析中的一部分,为避免因篇幅过长导致结构混乱,将本文独立编写。

1. 入口

reconciler是kubernetes的一个重要设计模式,定期将现实状态同步为期望状态,常见于controller中。

在VolumeManager中,reconciler定期处理卷的卸载和挂载,本文分析卸载部分。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/kubelet/volumemanager/reconciler/reconciler.go#L163-L178

func (rc *reconciler) reconcile() {
    rc.unmountVolumes()
    rc.mountAttachVolumes()
    rc.unmountDetachDevices()
}

遍历实际状态中已挂载卷的pod,对于不在期望状态中,卸载该pod上的卷。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/kubelet/volumemanager/reconciler/reconciler.go#L180

func (rc *reconciler) unmountVolumes() {
    for _, mountedVolume := range rc.actualStateOfWorld.GetAllMountedVolumes() {
        if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
            ...
            err := rc.operationExecutor.UnmountVolume(
                mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
            ...
            }
        }
    }
}

2. rc.operationExecutor.UnmountVolume

对于Filesystem和Block两种不同的卷类型,有对应的卸载卷的方法。分别调用operationGenerator.GenerateUnmountVolumeFunc()operationGenerator.GenerateUnmapVolumeFunc()方法生成卸载函数,传入nestedPendingOperations.Run()执行。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/operationexecutor/operation_executor.go#L858

func (oe *operationExecutor) UnmountVolume(
    volumeToUnmount MountedVolume,
    actualStateOfWorld ActualStateOfWorldMounterUpdater,
    podsDir string) error {
    fsVolume, err := util.CheckVolumeModeFilesystem(volumeToUnmount.VolumeSpec)
    if err != nil {
        return err
    }
    var generatedOperations volumetypes.GeneratedOperations
    if fsVolume {
        // Filesystem volume case
        // Unmount a volume if a volume is mounted
        generatedOperations, err = oe.operationGenerator.GenerateUnmountVolumeFunc(
            volumeToUnmount, actualStateOfWorld, podsDir)
    } else {
        // Block volume case
        // Unmap a volume if a volume is mapped
        generatedOperations, err = oe.operationGenerator.GenerateUnmapVolumeFunc(
            volumeToUnmount, actualStateOfWorld)
    }
    if err != nil {
        return err
    }
    // All volume plugins can execute unmount/unmap for multiple pods referencing the
    // same volume in parallel
    podName := volumetypes.UniquePodName(volumeToUnmount.PodUID)

    return oe.pendingOperations.Run(
        volumeToUnmount.VolumeName, podName, "" /* nodeName */, generatedOperations)
}

之所以要生成卸载函数,再传入nestedPendingOperations.Run()方法执行,是为了避免同时对一个卷做多个操作。

该函数中维护了一个map,用于记录是否有对相同的卷的操作正在执行。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go#L142

点击查看代码
func (grm *nestedPendingOperations) Run(
    volumeName v1.UniqueVolumeName,
    podName volumetypes.UniquePodName,
    nodeName types.NodeName,
    generatedOperations volumetypes.GeneratedOperations) error {
    grm.lock.Lock()
    defer grm.lock.Unlock()

    opKey := operationKey{volumeName, podName, nodeName}

    opExists, previousOpIndex := grm.isOperationExists(opKey)
    if opExists {
        previousOp := grm.operations[previousOpIndex]
        // Operation already exists
        if previousOp.operationPending {
            // Operation is pending
            return NewAlreadyExistsError(opKey)
        }

        backOffErr := previousOp.expBackoff.SafeToRetry(fmt.Sprintf("%+v", opKey))
        if backOffErr != nil {
            if previousOp.operationName == generatedOperations.OperationName {
                return backOffErr
            }
            // previous operation and new operation are different. reset op. name and exp. backoff
            grm.operations[previousOpIndex].operationName = generatedOperations.OperationName
            grm.operations[previousOpIndex].expBackoff = exponentialbackoff.ExponentialBackoff{}
        }

        // Update existing operation to mark as pending.
        grm.operations[previousOpIndex].operationPending = true
        grm.operations[previousOpIndex].key = opKey
    } else {
        // Create a new operation
        grm.operations = append(grm.operations,
            operation{
                key:              opKey,
                operationPending: true,
                operationName:    generatedOperations.OperationName,
                expBackoff:       exponentialbackoff.ExponentialBackoff{},
            })
    }

    go func() (eventErr, detailedErr error) {
        // Handle unhandled panics (very unlikely)
        defer k8sRuntime.HandleCrash()
        // Handle completion of and error, if any, from operationFunc()
        defer grm.operationComplete(opKey, &detailedErr)
        return generatedOperations.Run()
    }()

    return nil
}

对于具体操作的实现,仍然是定义在volumetypes.GeneratedOperations中的, 即GeneratedOperations.OperationFunc()方法。下文我们将详细看operationGenerator.GenerateUnmountVolumeFunc()operationGenerator.GenerateUnmapVolumeFunc()中生成的这个方法。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/types/types.go#L64

func (o *GeneratedOperations) Run() (eventErr, detailedErr error) {
    var context OperationContext
    if o.CompleteFunc != nil {
        c := CompleteFuncParam{
            Err:      &context.DetailedErr,
            Migrated: &context.Migrated,
        }
        defer o.CompleteFunc(c)
    }
    if o.EventRecorderFunc != nil {
        defer o.EventRecorderFunc(&eventErr)
    }
    // Handle panic, if any, from operationFunc()
    defer runtime.RecoverFromPanic(&detailedErr)

    context = o.OperationFunc()
    return context.EventErr, context.DetailedErr
}

3. FileSystem模式卷的卸载

operationGenerator.GenerateUnmountVolumeFunc中定义的OperationFunc主要执行了subpather.CleanSubPaths()volumeUnmounter.TearDown()

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/operationexecutor/operation_generator.go#L858

func (og *operationGenerator) GenerateUnmountVolumeFunc(
    volumeToUnmount MountedVolume,
    actualStateOfWorld ActualStateOfWorldMounterUpdater,
    podsDir string) (volumetypes.GeneratedOperations, error) {
    ...
    unmountVolumeFunc := func() volumetypes.OperationContext {
        subpather := og.volumePluginMgr.Host.GetSubpather()

        migrated := getMigratedStatusBySpec(volumeToUnmount.VolumeSpec)

        // Remove all bind-mounts for subPaths
        podDir := filepath.Join(podsDir, string(volumeToUnmount.PodUID))
        if err := subpather.CleanSubPaths(podDir, volumeToUnmount.InnerVolumeSpecName); err != nil {
            eventErr, detailedErr := volumeToUnmount.GenerateError("error cleaning subPath mounts", err)
            return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
        }

        // Execute unmount
        unmountErr := volumeUnmounter.TearDown()
        if unmountErr != nil {
            ...
            return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
        }
        ...
        return volumetypes.NewOperationContext(nil, nil, migrated)
    }

    return volumetypes.GeneratedOperations{
        OperationName:     "volume_unmount",
        OperationFunc:     unmountVolumeFunc,
        CompleteFunc:      util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeToUnmount.VolumeSpec), "volume_unmount"),
        EventRecorderFunc: nil, // nil because we do not want to generate event on error
    }, nil
}

3.1 subpath.CleanSubPaths()

subpath.CleanSubPaths()遍历/var/lib/kubelet/pods/<uid>/volume-subpaths/<volume>/下每一个目录,这些目录分别表示不同的容器目录。

对这些目录执行walk操作得到该目录下的第一层子路径,表示某个容器挂载的subpath目录,对这些子路径执行doCleanSubPath函数(和doCleanSubPaths是两个函数)。

在完成对每个子路径的清理后,整个/var/lib/kubelet/pods/<uid>/volume-subpaths/目录将被删除。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/subpath/subpath_linux.go#L57

func (sp *subpath) CleanSubPaths(podDir string, volumeName string) error {
    return doCleanSubPaths(sp.mounter, podDir, volumeName)
}

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/subpath/subpath_linux.go#L241

func doCleanSubPaths(mounter mount.Interface, podDir string, volumeName string) error {
    // scan /var/lib/kubelet/pods/<uid>/volume-subpaths/<volume>/*
    subPathDir := filepath.Join(podDir, containerSubPathDirectoryName, volumeName)
    ...
    containerDirs, err := ioutil.ReadDir(subPathDir)
    if err != nil {
        if os.IsNotExist(err) {
            return nil
        }
        return fmt.Errorf("error reading %s: %s", subPathDir, err)
    }

    for _, containerDir := range containerDirs {
        if !containerDir.IsDir() {
            ...
            continue
        }
        ...
        // scan /var/lib/kubelet/pods/<uid>/volume-subpaths/<volume>/<container name>/*
        fullContainerDirPath := filepath.Join(subPathDir, containerDir.Name())
        err = filepath.Walk(fullContainerDirPath, func(path string, info os.FileInfo, _ error) error {
            if path == fullContainerDirPath {
                // Skip top level directory
                return nil
            }

            // pass through errors and let doCleanSubPath handle them
            if err = doCleanSubPath(mounter, fullContainerDirPath, filepath.Base(path)); err != nil {
                return err
            }

            // We need to check that info is not nil. This may happen when the incoming err is not nil due to stale mounts or permission errors.
            if info != nil && info.IsDir() {
                // skip subdirs of the volume: it only matters the first level to unmount, otherwise it would try to unmount subdir of the volume
                return filepath.SkipDir
            }

            return nil
        })
        ...
        if err := os.Remove(fullContainerDirPath); err != nil {
            return fmt.Errorf("error deleting %s: %s", fullContainerDirPath, err)
        }
        ...
    }
    if err := os.Remove(subPathDir); err != nil {
        return fmt.Errorf("error deleting %s: %s", subPathDir, err)
    }
    ...
    podSubPathDir := filepath.Join(podDir, containerSubPathDirectoryName)
    if err := os.Remove(podSubPathDir); err != nil && !os.IsExist(err) {
        return fmt.Errorf("error deleting %s: %s", podSubPathDir, err)
    }
    ...
}

doCleanSubPath函数中:

  1. 先判断路径是否是挂载点,如果不是,则直接删除(这里默认路径是一个空文件夹)
  2. 执行unmount,Linux上实际为调用操作系统的umount命令
  3. 重复一次步骤1

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/subpath/subpath_linux.go#L308

func doCleanSubPath(mounter mount.Interface, fullContainerDirPath, subPathIndex string) error {
    // process /var/lib/kubelet/pods/<uid>/volume-subpaths/<volume>/<container name>/<subPathName>
    ...
    fullSubPath := filepath.Join(fullContainerDirPath, subPathIndex)

    if err := mount.CleanupMountPoint(fullSubPath, mounter, true); err != nil {
        return fmt.Errorf("error cleaning subpath mount %s: %s", fullSubPath, err)
    }
    ...
    return nil
}

https://github.com/kubernetes/kubernetes/blob/v1.23.1/staging/src/k8s.io/mount-utils/mount_helper_common.go#L31

func CleanupMountPoint(mountPath string, mounter Interface, extensiveMountPointCheck bool) error {
    ...
    return doCleanupMountPoint(mountPath, mounter, extensiveMountPointCheck, corruptedMnt)
}

https://github.com/kubernetes/kubernetes/blob/v1.23.1/staging/src/k8s.io/mount-utils/mount_helper_common.go#L86

func doCleanupMountPoint(mountPath string, mounter Interface, extensiveMountPointCheck bool, corruptedMnt bool) error {
    if !corruptedMnt {
        notMnt, err = removePathIfNotMountPoint(mountPath, mounter, extensiveMountPointCheck)
        ...
    }
    ...
    if err := mounter.Unmount(mountPath); err != nil {
        return err
    }
    notMnt, err = removePathIfNotMountPoint(mountPath, mounter, extensiveMountPointCheck)
    ...
}

https://github.com/kubernetes/kubernetes/blob/v1.23.1/staging/src/k8s.io/mount-utils/mount_linux.go#L293

func (mounter *Mounter) Unmount(target string) error {
    command := exec.Command("umount", target)
    output, err := command.CombinedOutput()
    ...
}

3.2 volumeUnmounter.TearDown()

subpather.CleanSubPaths()方法分析结束,完成了对subpath的清理。下面还需要分析volumeUnmounter.TearDown()方法,执行对卷本身的卸载操作。但各类型卷对TearDown方法的实现不一,我们留待分析各类型卷实现时再详细分析。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/volume.go#L178

// Unmounter interface provides methods to cleanup/unmount the volumes.
type Unmounter interface {
	Volume
	// TearDown unmounts the volume from a self-determined directory and
	// removes traces of the SetUp procedure.
	TearDown() error
	// TearDown unmounts the volume from the specified directory and
	// removes traces of the SetUp procedure.
	TearDownAt(dir string) error
}

至此,operationGenerator.GenerateUnmountVolumeFunc()分析完毕。下面分析operationGenerator.GenerateUnmapVolumeFunc()中生成的卸载函数。

4. Block模式卷的卸载

4.1 什么是Block模式卷

上文我们说到针对FileSystem和Block模式的卷,卸载方法不同,这里处理的就是Block模式的卷。所谓Block模式,实际就是直接将块设备暴露出来,有多种类型的卷支持Block模式。以local类型卷为例,可以使用如下配置生成一个pv:

apiVersion: v1
kind: PersistentVolume
metadata:
  name: example-local
spec:
  capacity:
    storage: 1Gi
  volumeMode: Block
  accessModes:
  - ReadWriteOnce
  persistentVolumeReclaimPolicy: Delete
  storageClassName: local-storage
  local:
    path: /dev/vdb
  nodeAffinity:
    required:
      nodeSelectorTerms:
      - matchExpressions:
        - key: kubernetes.io/hostname
          operator: In
          values:
          - wanglei-k8s-containerd

---
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
  name: local-storage
provisioner: kubernetes.io/no-provisioner
volumeBindingMode: WaitForFirstConsumer

---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: example-local-claim
spec:
  volumeMode: Block
  storageClassName: local-storage
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 1Gi

k8s把Block的卸载过程叫做Unmap,取自devicemap的反义。

4.2 operationGenerator.GenerateUnmapVolumeFunc

在卸载Block模式的卷时:

  1. 先执行了通用的卸载方法util.UnmapBlockVolume
  2. 然后对实现了卸载方法的特定卷customBlockVolumeUnmapper.UnmapPodDevice(),卷类型包括csi, fc, iscsi, rdb,但其实除了csi,其他三种类型的卷都未做任何处理。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/operationexecutor/operation_generator.go#L1309

func (og *operationGenerator) GenerateUnmapVolumeFunc(
    ...
    unmapVolumeFunc := func() volumetypes.OperationContext {
        ...
        // Execute common unmap
        unmapErr := util.UnmapBlockVolume(og.blkUtil, globalUnmapPath, podDeviceUnmapPath, volName, volumeToUnmount.PodUID)
        if unmapErr != nil {
            // On failure, return error. Caller will log and retry.
            eventErr, detailedErr := volumeToUnmount.GenerateError("UnmapVolume.UnmapBlockVolume failed", unmapErr)
            return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
        }

        // Call UnmapPodDevice if blockVolumeUnmapper implements CustomBlockVolumeUnmapper
        if customBlockVolumeUnmapper, ok := blockVolumeUnmapper.(volume.CustomBlockVolumeUnmapper); ok {
            // Execute plugin specific unmap
            unmapErr = customBlockVolumeUnmapper.UnmapPodDevice()
            if unmapErr != nil {
                // On failure, return error. Caller will log and retry.
                eventErr, detailedErr := volumeToUnmount.GenerateError("UnmapVolume.UnmapPodDevice failed", unmapErr)
                return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
            }
        }
        ...
        return volumetypes.NewOperationContext(nil, nil, migrated)
    }

    return volumetypes.GeneratedOperations{
        ...
        OperationFunc:     unmapVolumeFunc,
        ...
    }, nil
}

我们主要关注通用卸载方法util.UnmapBlockVolume。该函数中执行了三个动作:

  1. 释放块设备loop文件
  2. 在pod路径下卸载卷
  3. 在node路径下卸载卷

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/util.go#L552

func UnmapBlockVolume(...) error {
    // Release file descriptor lock.
    err := blkUtil.DetachFileDevice(filepath.Join(globalUnmapPath, string(podUID)))
    if err != nil {
        return fmt.Errorf("blkUtil.DetachFileDevice failed. globalUnmapPath:%s, podUID: %s: %v",
            globalUnmapPath, string(podUID), err)
    }

    // unmap devicePath from pod volume path
    unmapDeviceErr := blkUtil.UnmapDevice(podDeviceUnmapPath, volumeMapName, false /* bindMount */)
    if unmapDeviceErr != nil {
        return fmt.Errorf("blkUtil.DetachFileDevice failed. podDeviceUnmapPath:%s, volumeMapName: %s, bindMount: %v: %v",
            podDeviceUnmapPath, volumeMapName, false, unmapDeviceErr)
    }

    // unmap devicePath from global node path
    unmapDeviceErr = blkUtil.UnmapDevice(globalUnmapPath, string(podUID), true /* bindMount */)
    if unmapDeviceErr != nil {
        return fmt.Errorf("blkUtil.DetachFileDevice failed. globalUnmapPath:%s, podUID: %s, bindMount: %v: %v",
            globalUnmapPath, string(podUID), true, unmapDeviceErr)
    }
    return nil
}

4.2.1 释放块设备loop文件

释放块设备对应的loop文件时,先找到与块设备对应的loop文件路径,然后删除loop文件。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/volumepathhandler/volume_path_handler_linux.go#L58

func (v VolumePathHandler) DetachFileDevice(path string) error {
    loopPath, err := v.GetLoopDevice(path)
    ...
    err = removeLoopDevice(loopPath)
    ...
}

通过遍历每个/sys/block/loop*/loop/backing_file文件内容,与node上块设备的路径对比,相同即找到了loop文件的名称,loop文件的路径即为/dev/loop*。其中,node上块设备路径为/var/lib/kubelet/plugins/{VOLUME_TYPE}/volumeDevices/{VOLUME_NAME}/{POD_UID}

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/volumepathhandler/volume_path_handler_linux.go#L78

func (v VolumePathHandler) GetLoopDevice(path string) (string, error) {
    ...
    return getLoopDeviceFromSysfs(path)
}

func getLoopDeviceFromSysfs(path string) (string, error) {
    // If the file is a symlink.
    realPath, err := filepath.EvalSymlinks(path)
    if err != nil {
        return "", fmt.Errorf("failed to evaluate path %s: %s", path, err)
    }

    devices, err := filepath.Glob("/sys/block/loop*")
    if err != nil {
        return "", fmt.Errorf("failed to list loop devices in sysfs: %s", err)
    }

    for _, device := range devices {
        backingFile := fmt.Sprintf("%s/loop/backing_file", device)

        // The contents of this file is the absolute path of "path".
        data, err := ioutil.ReadFile(backingFile)
        if err != nil {
            continue
        }

        // Return the first match.
        backingFilePath := strings.TrimSpace(string(data))
        if backingFilePath == path || backingFilePath == realPath {
            return fmt.Sprintf("/dev/%s", filepath.Base(device)), nil
        }
    }

    return "", errors.New(ErrDeviceNotFound)
}

调用losetup -d {/dev/loop?}命令删除上文获取到的loop设备路径。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/volumepathhandler/volume_path_handler_linux.go#L104

func removeLoopDevice(device string) error {
    args := []string{"-d", device}
    cmd := exec.Command(losetupPath, args...)
    out, err := cmd.CombinedOutput()
    ...
}

4.2.2 在pod路径下卸载卷

pod路径/var/lib/kubelet/pods/{POD_UID}/volumeDevices/{VOLUME_TYPE}下存在一个名为卷名称指向块设备的软链接,卸载时删除了它。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/volumepathhandler/volume_path_handler.go#L171

func UnmapBlockVolume(...) error {
    ...
    unmapDeviceErr := blkUtil.UnmapDevice(podDeviceUnmapPath, volumeMapName, false /* bindMount */)
    ...
}

func (v VolumePathHandler) UnmapDevice(mapPath string, linkName string, bindMount bool) error {
    ...
    return unmapSymlinkDevice(v, mapPath, linkName)
}

func unmapSymlinkDevice(v VolumePathHandler, mapPath string, linkName string) error {
	// Check symbolic link exists
	linkPath := filepath.Join(mapPath, string(linkName))
	...
	return os.Remove(linkPath)
}

4.2.3 在node路径下卸载卷

在node的卷路径下/var/lib/kubelet/plugins/{VOLUME_TYPE}/volumeDevices/{VOLUME_NAME}/,有一个由块设备bind mount而来的名为{POD_UID}的块文件,卸载时将其卸载(unmount), 然后删除。

func UnmapBlockVolume(...) error {
    ...
    unmapDeviceErr = blkUtil.UnmapDevice(globalUnmapPath, string(podUID), true /* bindMount */)
    ...
}

func (v VolumePathHandler) UnmapDevice(mapPath string, linkName string, bindMount bool) error {
    ...
    if bindMount {
        return unmapBindMountDevice(v, mapPath, linkName)
    }
    ...
}

func unmapBindMountDevice(v VolumePathHandler, mapPath string, linkName string) error {
    ...
    mounter := &mount.SafeFormatAndMount{Interface: mount.New(""), Exec: utilexec.New()}
    if err := mounter.Unmount(linkPath); err != nil {
        return fmt.Errorf("failed to unmount linkPath %s: %v", linkPath, err)
    }

    // Remove file
    if err := os.Remove(linkPath); err != nil && !os.IsNotExist(err) {
        return fmt.Errorf("failed to remove file %s: %v", linkPath, err)
    }

    return nil
}


unmount直接调用umount /var/lib/kubelet/plugins/{VOLUME_TYPE}/volumeDevices/{VOLUME_NAME}/{POD_UID}命令卸载块设备。

func (mounter *Mounter) Unmount(target string) error {
    ...
    command := exec.Command("umount", target)
    output, err := command.CombinedOutput()
    ...
}

至此,operationGenerator.GenerateUnmapVolumeFunc()中生成的卸载函数分析完毕。