tags: k8s,源码分析,container

Kubelet VolumeManager源码分析: 卷的挂载

本文是Kubelet VolumeManager源码分析中的一部分,为避免因篇幅过长导致结构混乱,将本文独立编写。

1. 入口

reconciler是kubernetes的一个重要设计模式,定期将现实状态同步为期望状态,常见于controller中。

在VolumeManager中,reconciler定期处理卷的卸载和挂载,本文分析挂载部分。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/kubelet/volumemanager/reconciler/reconciler.go#L163-L178

func (rc *reconciler) reconcile() {
    rc.unmountVolumes()
    rc.mountAttachVolumes()
    rc.unmountDetachDevices()
}

遍历期望状态中要挂载的卷,根据其挂载的阶段,执行相应分支的操作。

这些分支分别处理以下情况:

  • 卷未被"attach"到node
  • 卷未被挂载到pod,或被标记为需要重新挂载
  • 卷需要重新设置大小,即"ExpandInUsePersistentVolumes"特性

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/kubelet/volumemanager/reconciler/reconciler.go#L200

func (rc *reconciler) mountAttachVolumes() {
    // Ensure volumes that should be attached/mounted are attached/mounted.
    for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
        volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
        ...
        if cache.IsVolumeNotAttachedError(err) {
            ...
        } else if !volMounted || cache.IsRemountRequiredError(err) {
            ...
        } else if cache.IsFSResizeRequiredError(err) &&
            utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {
            ...
        }
}

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/kubelet/volumemanager/cache/actual_state_of_world.go#L696

func (asw *actualStateOfWorld) PodExistsInVolume(
	podName volumetypes.UniquePodName,
	volumeName v1.UniqueVolumeName) (bool, string, error) {
	asw.RLock()
	defer asw.RUnlock()

	volumeObj, volumeExists := asw.attachedVolumes[volumeName]
	if !volumeExists {
		return false, "", newVolumeNotAttachedError(volumeName)
	}

	podObj, podExists := volumeObj.mountedPods[podName]
	if podExists {
		// if volume mount was uncertain we should keep trying to mount the volume
		if podObj.volumeMountStateForPod == operationexecutor.VolumeMountUncertain {
			return false, volumeObj.devicePath, nil
		}
		if podObj.remountRequired {
			return true, volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName)
		}
		if podObj.fsResizeRequired &&
			!volumeObj.volumeInUseErrorForExpansion &&
			utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {
			return true, volumeObj.devicePath, newFsResizeRequiredError(volumeObj.volumeName, podObj.podName)
		}
	}

	return podExists, volumeObj.devicePath, nil
}

2. attach

如果卷不存在于asw.attachedVolumes中,则actualStateOfWorld.PodExistsInVolume()方法将返回VolumeNotAttachedError错误。事实上,每个Volume在刚进入VolumeManager时,都是不存在于asw.attachedVolumes中的(但这不一定表示卷没有被attach到node上),后续将根据实际情况将卷标记到asw.attachedVolumes中。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/kubelet/volumemanager/cache/actual_state_of_world.go#L702-L705

func (asw *actualStateOfWorld) PodExistsInVolume(
    ...
    volumeObj, volumeExists := asw.attachedVolumes[volumeName]
    if !volumeExists {
        return false, "", newVolumeNotAttachedError(volumeName)
    }
    ...
}

在处理attach时,又根据kubelet特性开启情况分为两种处理方式:

  1. 如果开启enableControllerAttachDetach特性或卷插件不支持attach,则由Controller处理卷的attach/detach, VolumeManager只同步卷的状态
  2. 如果未开启enableControllerAttachDetach特性且卷插件支持attach,则由kubelet自行attach卷

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/kubelet/volumemanager/reconciler/reconciler.go#L200

func (rc *reconciler) mountAttachVolumes() {
    ...
    if cache.IsVolumeNotAttachedError(err) {
        if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
            ...
        } else {
            ...
        }
    }
    ...
}

enableControllerAttachDetach默认值为true, 故默认情况下VolumeManager不会自行attach卷。

点击展开调用链

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/kubelet/apis/config/v1beta1/register.go#L38

func init() {
    localSchemeBuilder.Register(addDefaultingFuncs)
}

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/kubelet/apis/config/v1beta1/defaults.go#L51-L53

func addDefaultingFuncs(scheme *kruntime.Scheme) error {
    return RegisterDefaults(scheme)
}

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/kubelet/apis/config/v1beta1/zz_generated.defaults.go#L39

func RegisterDefaults(scheme *runtime.Scheme) error {
	scheme.AddTypeDefaultingFunc(&v1beta1.KubeletConfiguration{}, func(obj interface{}) { SetObjectDefaults_KubeletConfiguration(obj.(*v1beta1.KubeletConfiguration)) })
	return nil
}

func SetObjectDefaults_KubeletConfiguration(in *v1beta1.KubeletConfiguration) {
    SetDefaults_KubeletConfiguration(in)
    ...
}

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/kubelet/apis/config/v1beta1/defaults.go#L218

func SetDefaults_KubeletConfiguration(obj *kubeletconfigv1beta1.KubeletConfiguration) {
    ...
    if obj.EnableControllerAttachDetach == nil {
        obj.EnableControllerAttachDetach = utilpointer.BoolPtr(true)
    }
    ...
}

2.1 Controller处理attach,VolumeManager同步

调用operationExecutor.VerifyControllerAttachedVolume方法同步卷的attach状态。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/kubelet/volumemanager/reconciler/reconciler.go#L204-L218

func (rc *reconciler) mountAttachVolumes() {
    ...
    if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
        ...
        err := rc.operationExecutor.VerifyControllerAttachedVolume(
            volumeToMount.VolumeToMount,
            rc.nodeName,
            rc.actualStateOfWorld)
        if err != nil && !isExpectedError(err) {
            klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
        }
        if err == nil {
            klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
        }
    } else {
    ...

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/operationexecutor/operation_executor.go#L933

func (oe *operationExecutor) VerifyControllerAttachedVolume(...) error {
	generatedOperations, err := oe.operationGenerator.GenerateVerifyControllerAttachedVolumeFunc(volumeToMount, nodeName, actualStateOfWorld)
    ...
}

如果卷的插件不支持attach,则默认卷已被attach并更新状态。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/operationexecutor/operation_generator.go#L1519-L1533

func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(...) (...) {
    ...
    verifyControllerAttachedVolumeFunc := func() volumetypes.OperationContext {
        ...
        if !volumeToMount.PluginIsAttachable {
            ...
            addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
                volumeToMount.VolumeName, volumeToMount.VolumeSpec, nodeName, "" /* devicePath */)
            ...
        }
        ...
    }
}

调用api获取node中记录的卷状态,并将已attach的卷同步到现实状态。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/operationexecutor/operation_generator.go#L1547-L1574

func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(...) (...) {
    ...
    verifyControllerAttachedVolumeFunc := func() volumetypes.OperationContext {
        ...
        // Fetch current node object
        node, fetchErr := og.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(nodeName), metav1.GetOptions{})
        ...
        for _, attachedVolume := range node.Status.VolumesAttached {
            if attachedVolume.Name == volumeToMount.VolumeName {
                addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
                    v1.UniqueVolumeName(""), volumeToMount.VolumeSpec, nodeName, attachedVolume.DevicePath)
                ...
            }
        }
        ...
    }
    ...
}

2.2 VolumeManager处理attach

如果未开启enableControllerAttachDetach特性且卷插件支持attach,则由kubelet自行attach卷(而不是controller负责attach)。调用operationExecutor.AttachVolume方法实现:

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/kubelet/volumemanager/reconciler/reconciler.go#L218-L234

func (rc *reconciler) mountAttachVolumes() {
    for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
        ...
        if cache.IsVolumeNotAttachedError(err) {
            if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
                ...
            } else {
                // Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
                // so attach it
                volumeToAttach := operationexecutor.VolumeToAttach{
                    VolumeName: volumeToMount.VolumeName,
                    VolumeSpec: volumeToMount.VolumeSpec,
                    NodeName:   rc.nodeName,
                }
                ...
                err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
                ...
            }
        }
        ...
    }
}

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/operationexecutor/operation_executor.go#L682

func (oe *operationExecutor) AttachVolume(...) error {
    generatedOperations :=
        oe.operationGenerator.GenerateAttachVolumeFunc(volumeToAttach, actualStateOfWorld)
    ...
}

调用plugin实现的Attach方法,如果attach成功则将卷标记为attached,否则标记为Uncertail。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/operationexecutor/operation_generator.go#L368

func (og *operationGenerator) GenerateAttachVolumeFunc(...) volumetypes.GeneratedOperations {
    attachVolumeFunc := func() volumetypes.OperationContext {
        ...
        volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher()
        ...
        devicePath, attachErr := volumeAttacher.Attach(
            volumeToAttach.VolumeSpec, volumeToAttach.NodeName)

        if attachErr != nil {
            ...
            addErr := actualStateOfWorld.MarkVolumeAsUncertain(
                volumeToAttach.VolumeName,
                volumeToAttach.VolumeSpec,
                uncertainNode)
            ...
        }
        ...
        // Update actual state of world
        addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
            v1.UniqueVolumeName(""), volumeToAttach.VolumeSpec, volumeToAttach.NodeName, devicePath)
        ...
    }
    ...
}

不同Plugin实现的Attach方法不同,留待后续在其他文章中分析。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/volume.go#L260

type Attacher interface {
    ...
    Attach(spec *Spec, nodeName types.NodeName) (string, error)
    ...
}

3. mount

如果卷未被挂载到pod,或被标记为需要重新挂载,则执行operationExecutor.MountVolume方法。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/kubelet/volumemanager/cache/actual_state_of_world.go#L696

func (asw *actualStateOfWorld) PodExistsInVolume(...) (bool, string, error) {
    ...
    podObj, podExists := volumeObj.mountedPods[podName]
    if podExists {
        // if volume mount was uncertain we should keep trying to mount the volume
        if podObj.volumeMountStateForPod == operationexecutor.VolumeMountUncertain {
            return false, volumeObj.devicePath, nil
        }
        if podObj.remountRequired {
            return true, volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName)
        }
        ...
    }
    
    return podExists, volumeObj.devicePath, nil
}

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/kubelet/volumemanager/reconciler/reconciler.go#L243

func (rc *reconciler) mountAttachVolumes() {
    for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
        volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
        ...
        } else if !volMounted || cache.IsRemountRequiredError(err) {
            ...
            err := rc.operationExecutor.MountVolume(
                rc.waitForAttachTimeout,
                volumeToMount.VolumeToMount,
                rc.actualStateOfWorld,
                isRemount)
            ...
        }
        ...
    }
}

与卷的卸载类似,对于不同的VolumeMode,采用不同的挂载方法。Filesystem模式的卷对应operationGenerator.GenerateMountVolumeFunc, Block模式的卷对应operationGenerator.GenerateMapVolumeFunc

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/operationexecutor/operation_executor.go#L830

func (oe *operationExecutor) MountVolume(...) error {
    fsVolume, err := util.CheckVolumeModeFilesystem(volumeToMount.VolumeSpec)
    ...
    if fsVolume {
        // Filesystem volume case
        // Mount/remount a volume when a volume is attached
        generatedOperations = oe.operationGenerator.GenerateMountVolumeFunc(
            waitForAttachTimeout, volumeToMount, actualStateOfWorld, isRemount)

    } else {
        // Block volume case
        // Creates a map to device if a volume is attached
        generatedOperations, err = oe.operationGenerator.GenerateMapVolumeFunc(
            waitForAttachTimeout, volumeToMount, actualStateOfWorld)
    }
    ...
    // TODO mount_device
    return oe.pendingOperations.Run(
        volumeToMount.VolumeName, podName, "" /* nodeName */, generatedOperations)
}

3.1 Filesystem模式卷的挂载

下面将逐一分析operationGenerator.GenerateMountVolumeFunc中生成的mountVolumeFunc函数。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/operationexecutor/operation_generator.go#L557

func (og *operationGenerator) GenerateMountVolumeFunc(...) volumetypes.GeneratedOperations {
    ...
    mountVolumeFunc := func() volumetypes.OperationContext {
        ...
    }
    ...
    return volumetypes.GeneratedOperations{
        ...
        OperationFunc:     mountVolumeFunc,
        ...
    }
}

3.1.1 检查卷与node的亲和性

PV 卷可以通过设置节点亲和性来定义一些约束,进而限制从哪些节点上可以访问此卷。 使用这些卷的 Pod 只会被调度到节点亲和性规则所选择的节点上执行。 要设置节点亲和性,可以配置 PV 卷 .spec 中的 nodeAffinity。 https://kubernetes.io/zh/docs/concepts/storage/persistent-volumes/#node-affinity

func (og *operationGenerator) GenerateMountVolumeFunc(...) volumetypes.GeneratedOperations {
    ...
    mountVolumeFunc := func() volumetypes.OperationContext {
        ...
        affinityErr := checkNodeAffinity(og, volumeToMount)
        if affinityErr != nil {
            eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.NodeAffinity check failed", affinityErr)
            return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
        }
        ...
    }
    ...
}

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/operationexecutor/operation_generator.go#L2286

func checkNodeAffinity(og *operationGenerator, volumeToMount VolumeToMount) error {
    pv := volumeToMount.VolumeSpec.PersistentVolume
    if pv != nil {
        nodeLabels, err := og.volumePluginMgr.Host.GetNodeLabels()
        if err != nil {
            return err
        }
        err = util.CheckNodeAffinity(pv, nodeLabels)
        if err != nil {
            return err
        }
    }
    return nil
}

3.1.2 ReadWriteOncePod

处理访问模式为ReadWriteOncePod的场景,如果kubelet开启了ReadWriteOncePod特性,并在卷的配置中设置了ReadWriteOncePod,则该卷不能在其他pod上已被挂载。

卷可以被单个 Pod 以读写方式挂载。 如果你想确保整个集群中只有一个 Pod 可以读取或写入该 PVC, 请使用ReadWriteOncePod 访问模式。这只支持 CSI 卷以及需要 Kubernetes 1.22 以上版本。 https://v1-22.docs.kubernetes.io/zh/docs/concepts/storage/persistent-volumes/#access-modes

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/operationexecutor/operation_generator.go#L590-L599

func (og *operationGenerator) GenerateMountVolumeFunc(...) volumetypes.GeneratedOperations {
    ...
    mountVolumeFunc := func() volumetypes.OperationContext {
        ...
        if utilfeature.DefaultFeatureGate.Enabled(features.ReadWriteOncePod) &&
            actualStateOfWorld.IsVolumeMountedElsewhere(volumeToMount.VolumeName, volumeToMount.PodName) &&
            // Because we do not know what access mode the pod intends to use if there are multiple.
            len(volumeToMount.VolumeSpec.PersistentVolume.Spec.AccessModes) == 1 &&
            v1helper.ContainsAccessMode(volumeToMount.VolumeSpec.PersistentVolume.Spec.AccessModes, v1.ReadWriteOncePod) {

            err = goerrors.New("volume uses the ReadWriteOncePod access mode and is already in use by another pod")
            eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.SetUp failed", err)
            return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
        }
        ...
    }
    ...
}

当前版本(v1.23.1),该特性默认未开启。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/features/kube_features.go#L968

func init() {
    runtime.Must(utilfeature.DefaultMutableFeatureGate.Add(defaultKubernetesFeatureGates))
}

var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
    ...
    ReadWriteOncePod: {Default: false, PreRelease: featuregate.Alpha},
    ...
}

3.1.3 WaitForAttach

如果volume类型可被attach, 则要等待其attach完毕,并更新devicePath。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/operationexecutor/operation_generator.go#L632

func (og *operationGenerator) GenerateMountVolumeFunc(...) volumetypes.GeneratedOperations {
    ...
    mountVolumeFunc := func() volumetypes.OperationContext {
        ...
        attachableVolumePlugin, _ :=
            og.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec)
        var volumeAttacher volume.Attacher
        if attachableVolumePlugin != nil {
            volumeAttacher, _ = attachableVolumePlugin.NewAttacher()
        }
        ...
        devicePath := volumeToMount.DevicePath
        if volumeAttacher != nil {
            ...
            devicePath, err = volumeAttacher.WaitForAttach(
                volumeToMount.VolumeSpec, devicePath, volumeToMount.Pod, waitForAttachTimeout)
            ...
        }
    }
}

3.1.4 MountDevice

如果卷类型支持挂载设备,先执行DeviceMounter.MountDevice挂载block类型的卷。(尽管前面已对卷类型做了判断,只有Filesystem类型卷才可能走到这里,但之前是根据Pod的Spec中定义的值判断的,而这里是根据文件系统上实际的状态进行的判断)。完成挂载后,还会调整卷的大小(这里不展开分析)。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/operationexecutor/operation_generator.go#L659

func (og *operationGenerator) GenerateMountVolumeFunc(...) volumetypes.GeneratedOperations {
    ...
    mountVolumeFunc := func() volumetypes.OperationContext {
        ...
        deviceMountableVolumePlugin, _ := og.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeToMount.VolumeSpec)
        var volumeDeviceMounter volume.DeviceMounter
        if deviceMountableVolumePlugin != nil {
            volumeDeviceMounter, _ = deviceMountableVolumePlugin.NewDeviceMounter()
        }
        ...
        if volumeDeviceMounter != nil && actualStateOfWorld.GetDeviceMountState(volumeToMount.VolumeName) != DeviceGloballyMounted {
            deviceMountPath, err :=
                volumeDeviceMounter.GetDeviceMountPath(volumeToMount.VolumeSpec)
            ...
            // Mount device to global mount path
            err = volumeDeviceMounter.MountDevice(
                volumeToMount.VolumeSpec,
                devicePath,
                deviceMountPath,
                volume.DeviceMounterArgs{FsGroup: fsGroup},
            )
            ...
            // Update actual state of world to reflect volume is globally mounted
            markDeviceMountedErr := actualStateOfWorld.MarkDeviceAsMounted(
                volumeToMount.VolumeName, devicePath, deviceMountPath)
            ...

            // If volume expansion is performed after MountDevice but before SetUp then
            // deviceMountPath and deviceStagePath is going to be the same.
            // Deprecation: Calling NodeExpandVolume after NodeStage/MountDevice will be deprecated
            // in a future version of k8s.
            resizeOptions.DeviceMountPath = deviceMountPath
            resizeOptions.DeviceStagePath = deviceMountPath
            resizeOptions.CSIVolumePhase = volume.CSIVolumeStaged

            // NodeExpandVolume will resize the file system if user has requested a resize of
            // underlying persistent volume and is allowed to do so.
            resizeDone, resizeError = og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions)
            ...
        }        
        ...
    }
}

不是所有类型的卷,都实现了MountDevice方法。不同类型的卷的实现也有所不同,这里以local卷的实现为例进行分析。其他类型卷的实现,将在对每个plugin进行分析的文章中体现。


3.1.5 Mounter.SetUp

最终,执行SetUp方法,真正将卷挂载至pod内。

https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/operationexecutor/operation_generator.go#L725

func (og *operationGenerator) GenerateMountVolumeFunc(...) volumetypes.GeneratedOperations {
    ...
    mountVolumeFunc := func() volumetypes.OperationContext {
        ...
        // Execute mount
        mountErr := volumeMounter.SetUp(volume.MounterArgs{
            FsUser:              util.FsUserFrom(volumeToMount.Pod),
            FsGroup:             fsGroup,
            DesiredSize:         volumeToMount.DesiredSizeLimit,
            FSGroupChangePolicy: fsGroupChangePolicy,
        })
        ...
        markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markOpts)
        ...
    }
    ...
}

同样的,不是所有类型的卷,都实现了SetUp方法。不同类型的卷的实现也有所不同,这里以local卷的实现为例进行分析。其他类型卷的实现,将在对每个plugin进行分析的文章中体现。

3.2 Block模式卷的挂载

4. FSResize