Kubelet VolumeManager源码分析: 卷的挂载
tags: k8s,源码分析,container
Kubelet VolumeManager源码分析: 卷的挂载
本文是Kubelet VolumeManager源码分析中的一部分,为避免因篇幅过长导致结构混乱,将本文独立编写。
1. 入口
reconciler是kubernetes的一个重要设计模式,定期将现实状态同步为期望状态,常见于controller中。
在VolumeManager中,reconciler定期处理卷的卸载和挂载,本文分析挂载部分。
func (rc *reconciler) reconcile() {
rc.unmountVolumes()
rc.mountAttachVolumes()
rc.unmountDetachDevices()
}
遍历期望状态中要挂载的卷,根据其挂载的阶段,执行相应分支的操作。
这些分支分别处理以下情况:
- 卷未被"attach"到node
- 卷未被挂载到pod,或被标记为需要重新挂载
- 卷需要重新设置大小,即"ExpandInUsePersistentVolumes"特性
func (rc *reconciler) mountAttachVolumes() {
// Ensure volumes that should be attached/mounted are attached/mounted.
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
...
if cache.IsVolumeNotAttachedError(err) {
...
} else if !volMounted || cache.IsRemountRequiredError(err) {
...
} else if cache.IsFSResizeRequiredError(err) &&
utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {
...
}
}
func (asw *actualStateOfWorld) PodExistsInVolume(
podName volumetypes.UniquePodName,
volumeName v1.UniqueVolumeName) (bool, string, error) {
asw.RLock()
defer asw.RUnlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
return false, "", newVolumeNotAttachedError(volumeName)
}
podObj, podExists := volumeObj.mountedPods[podName]
if podExists {
// if volume mount was uncertain we should keep trying to mount the volume
if podObj.volumeMountStateForPod == operationexecutor.VolumeMountUncertain {
return false, volumeObj.devicePath, nil
}
if podObj.remountRequired {
return true, volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName)
}
if podObj.fsResizeRequired &&
!volumeObj.volumeInUseErrorForExpansion &&
utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {
return true, volumeObj.devicePath, newFsResizeRequiredError(volumeObj.volumeName, podObj.podName)
}
}
return podExists, volumeObj.devicePath, nil
}
2. attach
如果卷不存在于asw.attachedVolumes
中,则actualStateOfWorld.PodExistsInVolume()
方法将返回VolumeNotAttachedError
错误。事实上,每个Volume在刚进入VolumeManager时,都是不存在于asw.attachedVolumes
中的(但这不一定表示卷没有被attach到node上),后续将根据实际情况将卷标记到asw.attachedVolumes
中。
func (asw *actualStateOfWorld) PodExistsInVolume(
...
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
return false, "", newVolumeNotAttachedError(volumeName)
}
...
}
在处理attach时,又根据kubelet特性开启情况分为两种处理方式:
- 如果开启enableControllerAttachDetach特性或卷插件不支持attach,则由Controller处理卷的attach/detach, VolumeManager只同步卷的状态
- 如果未开启enableControllerAttachDetach特性且卷插件支持attach,则由kubelet自行attach卷
func (rc *reconciler) mountAttachVolumes() {
...
if cache.IsVolumeNotAttachedError(err) {
if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
...
} else {
...
}
}
...
}
enableControllerAttachDetach
默认值为true, 故默认情况下VolumeManager不会自行attach卷。
点击展开调用链
func init() {
localSchemeBuilder.Register(addDefaultingFuncs)
}
func addDefaultingFuncs(scheme *kruntime.Scheme) error {
return RegisterDefaults(scheme)
}
func RegisterDefaults(scheme *runtime.Scheme) error {
scheme.AddTypeDefaultingFunc(&v1beta1.KubeletConfiguration{}, func(obj interface{}) { SetObjectDefaults_KubeletConfiguration(obj.(*v1beta1.KubeletConfiguration)) })
return nil
}
func SetObjectDefaults_KubeletConfiguration(in *v1beta1.KubeletConfiguration) {
SetDefaults_KubeletConfiguration(in)
...
}
func SetDefaults_KubeletConfiguration(obj *kubeletconfigv1beta1.KubeletConfiguration) {
...
if obj.EnableControllerAttachDetach == nil {
obj.EnableControllerAttachDetach = utilpointer.BoolPtr(true)
}
...
}
2.1 Controller处理attach,VolumeManager同步
调用operationExecutor.VerifyControllerAttachedVolume
方法同步卷的attach状态。
func (rc *reconciler) mountAttachVolumes() {
...
if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
...
err := rc.operationExecutor.VerifyControllerAttachedVolume(
volumeToMount.VolumeToMount,
rc.nodeName,
rc.actualStateOfWorld)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
}
if err == nil {
klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
}
} else {
...
func (oe *operationExecutor) VerifyControllerAttachedVolume(...) error {
generatedOperations, err := oe.operationGenerator.GenerateVerifyControllerAttachedVolumeFunc(volumeToMount, nodeName, actualStateOfWorld)
...
}
如果卷的插件不支持attach,则默认卷已被attach并更新状态。
func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(...) (...) {
...
verifyControllerAttachedVolumeFunc := func() volumetypes.OperationContext {
...
if !volumeToMount.PluginIsAttachable {
...
addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
volumeToMount.VolumeName, volumeToMount.VolumeSpec, nodeName, "" /* devicePath */)
...
}
...
}
}
调用api获取node中记录的卷状态,并将已attach的卷同步到现实状态。
func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(...) (...) {
...
verifyControllerAttachedVolumeFunc := func() volumetypes.OperationContext {
...
// Fetch current node object
node, fetchErr := og.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(nodeName), metav1.GetOptions{})
...
for _, attachedVolume := range node.Status.VolumesAttached {
if attachedVolume.Name == volumeToMount.VolumeName {
addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
v1.UniqueVolumeName(""), volumeToMount.VolumeSpec, nodeName, attachedVolume.DevicePath)
...
}
}
...
}
...
}
2.2 VolumeManager处理attach
如果未开启enableControllerAttachDetach特性且卷插件支持attach,则由kubelet自行attach卷(而不是controller负责attach)。调用operationExecutor.AttachVolume
方法实现:
func (rc *reconciler) mountAttachVolumes() {
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
...
if cache.IsVolumeNotAttachedError(err) {
if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
...
} else {
// Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
// so attach it
volumeToAttach := operationexecutor.VolumeToAttach{
VolumeName: volumeToMount.VolumeName,
VolumeSpec: volumeToMount.VolumeSpec,
NodeName: rc.nodeName,
}
...
err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
...
}
}
...
}
}
func (oe *operationExecutor) AttachVolume(...) error {
generatedOperations :=
oe.operationGenerator.GenerateAttachVolumeFunc(volumeToAttach, actualStateOfWorld)
...
}
调用plugin实现的Attach方法,如果attach成功则将卷标记为attached,否则标记为Uncertail。
func (og *operationGenerator) GenerateAttachVolumeFunc(...) volumetypes.GeneratedOperations {
attachVolumeFunc := func() volumetypes.OperationContext {
...
volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher()
...
devicePath, attachErr := volumeAttacher.Attach(
volumeToAttach.VolumeSpec, volumeToAttach.NodeName)
if attachErr != nil {
...
addErr := actualStateOfWorld.MarkVolumeAsUncertain(
volumeToAttach.VolumeName,
volumeToAttach.VolumeSpec,
uncertainNode)
...
}
...
// Update actual state of world
addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
v1.UniqueVolumeName(""), volumeToAttach.VolumeSpec, volumeToAttach.NodeName, devicePath)
...
}
...
}
不同Plugin实现的Attach方法不同,留待后续在其他文章中分析。
https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/volume.go#L260
type Attacher interface {
...
Attach(spec *Spec, nodeName types.NodeName) (string, error)
...
}
3. mount
如果卷未被挂载到pod,或被标记为需要重新挂载,则执行operationExecutor.MountVolume
方法。
func (asw *actualStateOfWorld) PodExistsInVolume(...) (bool, string, error) {
...
podObj, podExists := volumeObj.mountedPods[podName]
if podExists {
// if volume mount was uncertain we should keep trying to mount the volume
if podObj.volumeMountStateForPod == operationexecutor.VolumeMountUncertain {
return false, volumeObj.devicePath, nil
}
if podObj.remountRequired {
return true, volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName)
}
...
}
return podExists, volumeObj.devicePath, nil
}
func (rc *reconciler) mountAttachVolumes() {
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
...
} else if !volMounted || cache.IsRemountRequiredError(err) {
...
err := rc.operationExecutor.MountVolume(
rc.waitForAttachTimeout,
volumeToMount.VolumeToMount,
rc.actualStateOfWorld,
isRemount)
...
}
...
}
}
与卷的卸载类似,对于不同的VolumeMode,采用不同的挂载方法。Filesystem模式的卷对应operationGenerator.GenerateMountVolumeFunc
, Block模式的卷对应operationGenerator.GenerateMapVolumeFunc
。
func (oe *operationExecutor) MountVolume(...) error {
fsVolume, err := util.CheckVolumeModeFilesystem(volumeToMount.VolumeSpec)
...
if fsVolume {
// Filesystem volume case
// Mount/remount a volume when a volume is attached
generatedOperations = oe.operationGenerator.GenerateMountVolumeFunc(
waitForAttachTimeout, volumeToMount, actualStateOfWorld, isRemount)
} else {
// Block volume case
// Creates a map to device if a volume is attached
generatedOperations, err = oe.operationGenerator.GenerateMapVolumeFunc(
waitForAttachTimeout, volumeToMount, actualStateOfWorld)
}
...
// TODO mount_device
return oe.pendingOperations.Run(
volumeToMount.VolumeName, podName, "" /* nodeName */, generatedOperations)
}
3.1 Filesystem模式卷的挂载
下面将逐一分析operationGenerator.GenerateMountVolumeFunc
中生成的mountVolumeFunc
函数。
func (og *operationGenerator) GenerateMountVolumeFunc(...) volumetypes.GeneratedOperations {
...
mountVolumeFunc := func() volumetypes.OperationContext {
...
}
...
return volumetypes.GeneratedOperations{
...
OperationFunc: mountVolumeFunc,
...
}
}
3.1.1 检查卷与node的亲和性
PV 卷可以通过设置节点亲和性来定义一些约束,进而限制从哪些节点上可以访问此卷。 使用这些卷的 Pod 只会被调度到节点亲和性规则所选择的节点上执行。 要设置节点亲和性,可以配置 PV 卷 .spec 中的 nodeAffinity。 https://kubernetes.io/zh/docs/concepts/storage/persistent-volumes/#node-affinity
func (og *operationGenerator) GenerateMountVolumeFunc(...) volumetypes.GeneratedOperations {
...
mountVolumeFunc := func() volumetypes.OperationContext {
...
affinityErr := checkNodeAffinity(og, volumeToMount)
if affinityErr != nil {
eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.NodeAffinity check failed", affinityErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
...
}
...
}
func checkNodeAffinity(og *operationGenerator, volumeToMount VolumeToMount) error {
pv := volumeToMount.VolumeSpec.PersistentVolume
if pv != nil {
nodeLabels, err := og.volumePluginMgr.Host.GetNodeLabels()
if err != nil {
return err
}
err = util.CheckNodeAffinity(pv, nodeLabels)
if err != nil {
return err
}
}
return nil
}
3.1.2 ReadWriteOncePod
处理访问模式为ReadWriteOncePod的场景,如果kubelet开启了ReadWriteOncePod
特性,并在卷的配置中设置了ReadWriteOncePod
,则该卷不能在其他pod上已被挂载。
卷可以被单个 Pod 以读写方式挂载。 如果你想确保整个集群中只有一个 Pod 可以读取或写入该 PVC, 请使用ReadWriteOncePod 访问模式。这只支持 CSI 卷以及需要 Kubernetes 1.22 以上版本。 https://v1-22.docs.kubernetes.io/zh/docs/concepts/storage/persistent-volumes/#access-modes
func (og *operationGenerator) GenerateMountVolumeFunc(...) volumetypes.GeneratedOperations {
...
mountVolumeFunc := func() volumetypes.OperationContext {
...
if utilfeature.DefaultFeatureGate.Enabled(features.ReadWriteOncePod) &&
actualStateOfWorld.IsVolumeMountedElsewhere(volumeToMount.VolumeName, volumeToMount.PodName) &&
// Because we do not know what access mode the pod intends to use if there are multiple.
len(volumeToMount.VolumeSpec.PersistentVolume.Spec.AccessModes) == 1 &&
v1helper.ContainsAccessMode(volumeToMount.VolumeSpec.PersistentVolume.Spec.AccessModes, v1.ReadWriteOncePod) {
err = goerrors.New("volume uses the ReadWriteOncePod access mode and is already in use by another pod")
eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.SetUp failed", err)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
...
}
...
}
当前版本(v1.23.1),该特性默认未开启。
https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/features/kube_features.go#L968
func init() {
runtime.Must(utilfeature.DefaultMutableFeatureGate.Add(defaultKubernetesFeatureGates))
}
var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
...
ReadWriteOncePod: {Default: false, PreRelease: featuregate.Alpha},
...
}
3.1.3 WaitForAttach
如果volume类型可被attach, 则要等待其attach完毕,并更新devicePath。
func (og *operationGenerator) GenerateMountVolumeFunc(...) volumetypes.GeneratedOperations {
...
mountVolumeFunc := func() volumetypes.OperationContext {
...
attachableVolumePlugin, _ :=
og.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec)
var volumeAttacher volume.Attacher
if attachableVolumePlugin != nil {
volumeAttacher, _ = attachableVolumePlugin.NewAttacher()
}
...
devicePath := volumeToMount.DevicePath
if volumeAttacher != nil {
...
devicePath, err = volumeAttacher.WaitForAttach(
volumeToMount.VolumeSpec, devicePath, volumeToMount.Pod, waitForAttachTimeout)
...
}
}
}
3.1.4 MountDevice
如果卷类型支持挂载设备,先执行DeviceMounter.MountDevice
挂载block类型的卷。(尽管前面已对卷类型做了判断,只有Filesystem类型卷才可能走到这里,但之前是根据Pod的Spec中定义的值判断的,而这里是根据文件系统上实际的状态进行的判断)。完成挂载后,还会调整卷的大小(这里不展开分析)。
func (og *operationGenerator) GenerateMountVolumeFunc(...) volumetypes.GeneratedOperations {
...
mountVolumeFunc := func() volumetypes.OperationContext {
...
deviceMountableVolumePlugin, _ := og.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeToMount.VolumeSpec)
var volumeDeviceMounter volume.DeviceMounter
if deviceMountableVolumePlugin != nil {
volumeDeviceMounter, _ = deviceMountableVolumePlugin.NewDeviceMounter()
}
...
if volumeDeviceMounter != nil && actualStateOfWorld.GetDeviceMountState(volumeToMount.VolumeName) != DeviceGloballyMounted {
deviceMountPath, err :=
volumeDeviceMounter.GetDeviceMountPath(volumeToMount.VolumeSpec)
...
// Mount device to global mount path
err = volumeDeviceMounter.MountDevice(
volumeToMount.VolumeSpec,
devicePath,
deviceMountPath,
volume.DeviceMounterArgs{FsGroup: fsGroup},
)
...
// Update actual state of world to reflect volume is globally mounted
markDeviceMountedErr := actualStateOfWorld.MarkDeviceAsMounted(
volumeToMount.VolumeName, devicePath, deviceMountPath)
...
// If volume expansion is performed after MountDevice but before SetUp then
// deviceMountPath and deviceStagePath is going to be the same.
// Deprecation: Calling NodeExpandVolume after NodeStage/MountDevice will be deprecated
// in a future version of k8s.
resizeOptions.DeviceMountPath = deviceMountPath
resizeOptions.DeviceStagePath = deviceMountPath
resizeOptions.CSIVolumePhase = volume.CSIVolumeStaged
// NodeExpandVolume will resize the file system if user has requested a resize of
// underlying persistent volume and is allowed to do so.
resizeDone, resizeError = og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions)
...
}
...
}
}
不是所有类型的卷,都实现了MountDevice方法。不同类型的卷的实现也有所不同,这里以local卷的实现为例进行分析。其他类型卷的实现,将在对每个plugin进行分析的文章中体现。
3.1.5 Mounter.SetUp
最终,执行SetUp
方法,真正将卷挂载至pod内。
func (og *operationGenerator) GenerateMountVolumeFunc(...) volumetypes.GeneratedOperations {
...
mountVolumeFunc := func() volumetypes.OperationContext {
...
// Execute mount
mountErr := volumeMounter.SetUp(volume.MounterArgs{
FsUser: util.FsUserFrom(volumeToMount.Pod),
FsGroup: fsGroup,
DesiredSize: volumeToMount.DesiredSizeLimit,
FSGroupChangePolicy: fsGroupChangePolicy,
})
...
markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markOpts)
...
}
...
}
同样的,不是所有类型的卷,都实现了SetUp方法。不同类型的卷的实现也有所不同,这里以local卷的实现为例进行分析。其他类型卷的实现,将在对每个plugin进行分析的文章中体现。