tags: kubernetes,container,源码分析

k8s volume plugin 源码分析: Local卷 MountDevice 实现

1. 调用点

在VolumeManager中挂载卷时,如果卷类型支持挂载设备,执行DeviceMounter.MountDevice挂载block类型的卷。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/operationexecutor/operation_generator.go#L659

func (og *operationGenerator) GenerateMountVolumeFunc(...) volumetypes.GeneratedOperations {
    ...
    mountVolumeFunc := func() volumetypes.OperationContext {
        ...
        deviceMountableVolumePlugin, _ := og.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeToMount.VolumeSpec)
        var volumeDeviceMounter volume.DeviceMounter
        if deviceMountableVolumePlugin != nil {
            volumeDeviceMounter, _ = deviceMountableVolumePlugin.NewDeviceMounter()
        }
        ...
        if volumeDeviceMounter != nil && actualStateOfWorld.GetDeviceMountState(volumeToMount.VolumeName) != DeviceGloballyMounted {
            deviceMountPath, err :=
                volumeDeviceMounter.GetDeviceMountPath(volumeToMount.VolumeSpec)
            ...
            // Mount device to global mount path
            err = volumeDeviceMounter.MountDevice(
                volumeToMount.VolumeSpec,
                devicePath,
                deviceMountPath,
                volume.DeviceMounterArgs{FsGroup: fsGroup},
            )
            ...
            // Update actual state of world to reflect volume is globally mounted
            markDeviceMountedErr := actualStateOfWorld.MarkDeviceAsMounted(
                volumeToMount.VolumeName, devicePath, deviceMountPath)
            ...

            // If volume expansion is performed after MountDevice but before SetUp then
            // deviceMountPath and deviceStagePath is going to be the same.
            // Deprecation: Calling NodeExpandVolume after NodeStage/MountDevice will be deprecated
            // in a future version of k8s.
            resizeOptions.DeviceMountPath = deviceMountPath
            resizeOptions.DeviceStagePath = deviceMountPath
            resizeOptions.CSIVolumePhase = volume.CSIVolumeStaged

            // NodeExpandVolume will resize the file system if user has requested a resize of
            // underlying persistent volume and is allowed to do so.
            resizeDone, resizeError = og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions)
            ...
        }        
        ...
    }
}

不是所有类型的卷,都实现了MountDevice方法。不同类型的卷的实现也有所不同,本文只对local卷的实现进行分析。

2. 只挂载Block类型设备

如果文件的实际类型是Block设备,则执行dm.mountLocalBlockDevice

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/local/local.go#L371

func (dm *deviceMounter) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string, _ volume.DeviceMounterArgs) error {
	...
	fileType, err := dm.hostUtil.GetFileType(spec.PersistentVolume.Spec.Local.Path)
	if err != nil {
		return err
	}

	switch fileType {
	case hostutil.FileTypeBlockDev:
		// local volume plugin does not implement AttachableVolumePlugin interface, so set devicePath to Path in PV spec directly
		return dm.mountLocalBlockDevice(spec, spec.PersistentVolume.Spec.Local.Path, deviceMountPath)
	case hostutil.FileTypeDirectory:
		// if the given local volume path is of already filesystem directory, return directly
		return nil
	default:
		return fmt.Errorf("only directory and block device are supported")
	}
}


https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/local/local.go#L348

func (dm *deviceMounter) mountLocalBlockDevice(spec *volume.Spec, devicePath string, deviceMountPath string) error {
    ...
    notMnt, err := dm.mounter.IsLikelyNotMountPoint(deviceMountPath)
    if err != nil {
        if os.IsNotExist(err) {
            if err := os.MkdirAll(deviceMountPath, 0750); err != nil {
                return err
            }
            ...
        } 
        ...
    }
    ...
    err = dm.mounter.FormatAndMount(devicePath, deviceMountPath, fstype, mountOptions)
    if err != nil {
        ...
        return ...
    }
    ...
    return nil
}

https://github.com/kubernetes/kubernetes/blob/v1.23.1/staging/src/k8s.io/mount-utils/mount.go#L147-L158

func (mounter *SafeFormatAndMount) FormatAndMount(source string, target string, fstype string, options []string) error {
    return mounter.FormatAndMountSensitive(source, target, fstype, options, nil /* sensitiveOptions */)
}

func (mounter *SafeFormatAndMount) FormatAndMountSensitive(source string, target string, fstype string, options []string, sensitiveOptions []string) error {
    return mounter.formatAndMountSensitive(source, target, fstype, options, sensitiveOptions)
}

formatAndMountSensitive调用系统命令格式化,并挂载块设备。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/staging/src/k8s.io/mount-utils/mount_linux.go#L396

func (mounter *SafeFormatAndMount) formatAndMountSensitive(source string, target string, fstype string, options []string, sensitiveOptions []string) error {
    ...
}

3. 调用blkid命令判断磁盘是否格式化

https://github.com/kubernetes/kubernetes/blob/v1.23.1/staging/src/k8s.io/mount-utils/mount_linux.go#L418

func (mounter *SafeFormatAndMount) formatAndMountSensitive(source string, target string, fstype string, options []string, sensitiveOptions []string) error {
    ...
    // Check if the disk is already formatted
    existingFormat, err := mounter.GetDiskFormat(source)
    if err != nil {
        return NewMountError(GetDiskFormatFailed, "failed to get disk format of disk %s: %v", source, err)
    }
    ...
}

https://github.com/kubernetes/kubernetes/blob/v1.23.1/staging/src/k8s.io/mount-utils/mount_linux.go#L538

func (mounter *SafeFormatAndMount) GetDiskFormat(disk string) (string, error) {
    return getDiskFormat(mounter.Exec, disk)
}

调用blkid -p -s TYPE -s PTTYPE -o export ${DISK}命令。-o export选项将提供key=value键值对,根据结果中的TYPE字段得到设备类型。

例如挂载/dev/sdb设备,将得到DEVNAME=/dev/vdb TYPE=ext4结果。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/staging/src/k8s.io/mount-utils/mount_linux.go#L485

func getDiskFormat(exec utilexec.Interface, disk string) (string, error) {
    args := []string{"-p", "-s", "TYPE", "-s", "PTTYPE", "-o", "export", disk}
    klog.V(4).Infof("Attempting to determine if disk %q is formatted using blkid with args: (%v)", disk, args)
    dataOut, err := exec.Command("blkid", args...).CombinedOutput()
    output := string(dataOut)
    ...
    var fstype, pttype string

    lines := strings.Split(output, "\n")
    for _, l := range lines {
        ...
        cs := strings.Split(l, "=")
        ...
        // TYPE is filesystem type, and PTTYPE is partition table type, according
        // to https://www.kernel.org/pub/linux/utils/util-linux/v2.21/libblkid-docs/.
        if cs[0] == "TYPE" {
            fstype = cs[1]
        } else if cs[0] == "PTTYPE" {
            pttype = cs[1]
        }
    }
    ...
    return fstype, nil
}

4. 调用mkfs.{fstype}命令对文件系统格式化

如果根据上一步结果,发现设备未被格式化,则调用mkfs.{fstype}命令,对设备格式化。格式如果未在Pod Spec中指定,默认为ext4。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/staging/src/k8s.io/mount-utils/mount_linux.go#L428-L460

func (mounter *SafeFormatAndMount) formatAndMountSensitive(source string, target string, fstype string, options []string, sensitiveOptions []string) error {
    ...
    existingFormat, err := mounter.GetDiskFormat(source)
    ...
    // Use 'ext4' as the default
    if len(fstype) == 0 {
        fstype = "ext4"
    }

    if existingFormat == "" {
        // Do not attempt to format the disk if mounting as readonly, return an error to reflect this.
        if readOnly {
            return NewMountError(UnformattedReadOnly, "cannot mount unformatted disk %s as we are manipulating it in read-only mode", source)
        }

        // Disk is unformatted so format it.
        args := []string{source}
        if fstype == "ext4" || fstype == "ext3" {
            args = []string{
                "-F",  // Force flag
                "-m0", // Zero blocks reserved for super-user
                source,
            }
        } else if fstype == "xfs" {
            args = []string{
                "-f", // force flag
                source,
            }
        }

        klog.Infof("Disk %q appears to be unformatted, attempting to format as type: %q with options: %v", source, fstype, args)
        output, err := mounter.Exec.Command("mkfs."+fstype, args...).CombinedOutput()
        if err != nil {
            ...
            return ...
        }
        ...
    } else {
        ...
    }
    ...
}

5. 调用fsck, 检查文件系统是否正常

如果已被格式化,则调用fsck -a {dev}命令,检查文件系统是否存在异常。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/staging/src/k8s.io/mount-utils/mount_linux.go#L460-L474

func (mounter *SafeFormatAndMount) formatAndMountSensitive(source string, target string, fstype string, options []string, sensitiveOptions []string) error {
    ...
    existingFormat, err := mounter.GetDiskFormat(source)
    ...
    // Use 'ext4' as the default
    if len(fstype) == 0 {
        fstype = "ext4"
    }
    if existingFormat == "" {
        ...
    } else {        
        if fstype != existingFormat {
            ...
        }
        if !readOnly {
            // Run check tools on the disk to fix repairable issues, only do this for formatted volumes requested as rw.
            err := mounter.checkAndRepairFilesystem(source)
            if err != nil {
                return err
            }
        }
    }

https://github.com/kubernetes/kubernetes/blob/v1.23.1/staging/src/k8s.io/mount-utils/mount_linux.go#L378

func (mounter *SafeFormatAndMount) checkAndRepairFilesystem(source string) error {
    klog.V(4).Infof("Checking for issues with fsck on disk: %s", source)
    args := []string{"-a", source}
    out, err := mounter.Exec.Command("fsck", args...).CombinedOutput()
    if err != nil {
        ee, isExitError := err.(utilexec.ExitError)
        switch {
        case err == utilexec.ErrExecutableNotFound:
            klog.Warningf("'fsck' not found on system; continuing mount without running 'fsck'.")
        case isExitError && ee.ExitStatus() == fsckErrorsCorrected:
            klog.Infof("Device %s has errors which were corrected by fsck.", source)
        case isExitError && ee.ExitStatus() == fsckErrorsUncorrected:
            return NewMountError(HasFilesystemErrors, "'fsck' found errors on device %s but could not correct them: %s", source, string(out))
        case isExitError && ee.ExitStatus() > fsckErrorsUncorrected:
            klog.Infof("`fsck` error %s", string(out))
        }
    }
    return nil
}

6. 调用mount, 挂载设备

格式化后执行挂载操作。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/staging/src/k8s.io/mount-utils/mount_linux.go#L478

func (mounter *SafeFormatAndMount) formatAndMountSensitive(source string, target string, fstype string, options []string, sensitiveOptions []string) error {
    ...
    // Mount the disk
    klog.V(4).Infof("Attempting to mount disk %s in %s format at %s", source, fstype, target)
    if err := mounter.MountSensitive(source, target, fstype, options, sensitiveOptions); err != nil {
        return NewMountError(mountErrorValue, err.Error())
    }

    return nil
}

如果options中指定了bind选项,则先bind mount一次,然后再remount一次。再remount一次是为了应用挂载选项,因为bind mount可能会忽略一些选项;

如果不是bind mount,则只需要执行一次,直接应用原始的挂载选项即可。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/staging/src/k8s.io/mount-utils/mount_linux.go#L87

func (mounter *Mounter) MountSensitive(source string, target string, fstype string, options []string, sensitiveOptions []string) error {
    // Path to mounter binary if containerized mounter is needed. Otherwise, it is set to empty.
    // All Linux distros are expected to be shipped with a mount utility that a support bind mounts.
    mounterPath := ""
    bind, bindOpts, bindRemountOpts, bindRemountOptsSensitive := MakeBindOptsSensitive(options, sensitiveOptions)
    if bind {
        err := mounter.doMount(mounterPath, defaultMountCommand, source, target, fstype, bindOpts, bindRemountOptsSensitive, nil /* mountFlags */, true)
        if err != nil {
            return err
        }
        return mounter.doMount(mounterPath, defaultMountCommand, source, target, fstype, bindRemountOpts, bindRemountOptsSensitive, nil /* mountFlags */, true)
    }
    ...
    return mounter.doMount(mounterPath, defaultMountCommand, source, target, fstype, options, sensitiveOptions, nil /* mountFlags */, true)
}

Mounter.doMount调用MakeMountArgsSensitiveWithMountFlags函数组装命令,然后执行。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/staging/src/k8s.io/mount-utils/mount_linux.go#L144

func (mounter *Mounter) doMount(mounterPath string, mountCmd string, source string, target string, fstype string, options []string, sensitiveOptions []string, mountFlags []string, systemdMountRequired bool) error {
    mountArgs, mountArgsLogStr := MakeMountArgsSensitiveWithMountFlags(source, target, fstype, options, sensitiveOptions, mountFlags)
    if len(mounterPath) > 0 {
        mountArgs = append([]string{mountCmd}, mountArgs...)
        mountArgsLogStr = mountCmd + " " + mountArgsLogStr
        mountCmd = mounterPath
    }
    ...
    command := exec.Command(mountCmd, mountArgs...)
    output, err := command.CombinedOutput()
    ...
    return err
}

以挂载/dev/vdb为例,将执行mount -t ext4 -o defaults /dev/vdb /var/lib/kubelet/plugins/kubernetes.io/local-volume/mounts/example-local-filesystem-pv

https://github.com/kubernetes/kubernetes/blob/v1.23.1/staging/src/k8s.io/mount-utils/mount_linux.go#L243

func MakeMountArgsSensitiveWithMountFlags(source, target, fstype string, options []string, sensitiveOptions []string, mountFlags []string) (mountArgs []string, mountArgsLogStr string) {
	// Build mount command as follows:
	//   mount [$mountFlags] [-t $fstype] [-o $options] [$source] $target
    mountArgs = []string{}
    ...
    mountArgs = append(mountArgs, mountFlags...)
    ...
    if len(fstype) > 0 {
        mountArgs = append(mountArgs, "-t", fstype)
        ...
    }
    if len(options) > 0 || len(sensitiveOptions) > 0 {
        combinedOptions := []string{}
        combinedOptions = append(combinedOptions, options...)
        combinedOptions = append(combinedOptions, sensitiveOptions...)
        mountArgs = append(mountArgs, "-o", strings.Join(combinedOptions, ","))
        ...
    }
    if len(source) > 0 {
        mountArgs = append(mountArgs, source)
        ...
    }
    mountArgs = append(mountArgs, target)
    ...
    return mountArgs, mountArgsLogStr
}