Kubelet VolumeManager源码分析: 卷的卸载
tags: container,k8s,源码分析
Kubelet VolumeManager源码分析: 卷的卸载
本文是Kubelet VolumeManager源码分析中的一部分,为避免因篇幅过长导致结构混乱,将本文独立编写。
1. 入口
reconciler是kubernetes的一个重要设计模式,定期将现实状态同步为期望状态,常见于controller中。
在VolumeManager中,reconciler定期处理卷的卸载和挂载,本文分析卸载部分。
func (rc *reconciler) reconcile() {
rc.unmountVolumes()
rc.mountAttachVolumes()
rc.unmountDetachDevices()
}
遍历实际状态中已挂载卷的pod,对于不在期望状态中,卸载该pod上的卷。
func (rc *reconciler) unmountVolumes() {
for _, mountedVolume := range rc.actualStateOfWorld.GetAllMountedVolumes() {
if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
...
err := rc.operationExecutor.UnmountVolume(
mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
...
}
}
}
}
2. rc.operationExecutor.UnmountVolume
对于Filesystem和Block两种不同的卷类型,有对应的卸载卷的方法。分别调用operationGenerator.GenerateUnmountVolumeFunc()
和operationGenerator.GenerateUnmapVolumeFunc()
方法生成卸载函数,传入nestedPendingOperations.Run()
执行。
func (oe *operationExecutor) UnmountVolume(
volumeToUnmount MountedVolume,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
podsDir string) error {
fsVolume, err := util.CheckVolumeModeFilesystem(volumeToUnmount.VolumeSpec)
if err != nil {
return err
}
var generatedOperations volumetypes.GeneratedOperations
if fsVolume {
// Filesystem volume case
// Unmount a volume if a volume is mounted
generatedOperations, err = oe.operationGenerator.GenerateUnmountVolumeFunc(
volumeToUnmount, actualStateOfWorld, podsDir)
} else {
// Block volume case
// Unmap a volume if a volume is mapped
generatedOperations, err = oe.operationGenerator.GenerateUnmapVolumeFunc(
volumeToUnmount, actualStateOfWorld)
}
if err != nil {
return err
}
// All volume plugins can execute unmount/unmap for multiple pods referencing the
// same volume in parallel
podName := volumetypes.UniquePodName(volumeToUnmount.PodUID)
return oe.pendingOperations.Run(
volumeToUnmount.VolumeName, podName, "" /* nodeName */, generatedOperations)
}
之所以要生成卸载函数,再传入nestedPendingOperations.Run()
方法执行,是为了避免同时对一个卷做多个操作。
该函数中维护了一个map,用于记录是否有对相同的卷的操作正在执行。
点击查看代码
func (grm *nestedPendingOperations) Run(
volumeName v1.UniqueVolumeName,
podName volumetypes.UniquePodName,
nodeName types.NodeName,
generatedOperations volumetypes.GeneratedOperations) error {
grm.lock.Lock()
defer grm.lock.Unlock()
opKey := operationKey{volumeName, podName, nodeName}
opExists, previousOpIndex := grm.isOperationExists(opKey)
if opExists {
previousOp := grm.operations[previousOpIndex]
// Operation already exists
if previousOp.operationPending {
// Operation is pending
return NewAlreadyExistsError(opKey)
}
backOffErr := previousOp.expBackoff.SafeToRetry(fmt.Sprintf("%+v", opKey))
if backOffErr != nil {
if previousOp.operationName == generatedOperations.OperationName {
return backOffErr
}
// previous operation and new operation are different. reset op. name and exp. backoff
grm.operations[previousOpIndex].operationName = generatedOperations.OperationName
grm.operations[previousOpIndex].expBackoff = exponentialbackoff.ExponentialBackoff{}
}
// Update existing operation to mark as pending.
grm.operations[previousOpIndex].operationPending = true
grm.operations[previousOpIndex].key = opKey
} else {
// Create a new operation
grm.operations = append(grm.operations,
operation{
key: opKey,
operationPending: true,
operationName: generatedOperations.OperationName,
expBackoff: exponentialbackoff.ExponentialBackoff{},
})
}
go func() (eventErr, detailedErr error) {
// Handle unhandled panics (very unlikely)
defer k8sRuntime.HandleCrash()
// Handle completion of and error, if any, from operationFunc()
defer grm.operationComplete(opKey, &detailedErr)
return generatedOperations.Run()
}()
return nil
}
对于具体操作的实现,仍然是定义在volumetypes.GeneratedOperations
中的, 即GeneratedOperations.OperationFunc()
方法。下文我们将详细看operationGenerator.GenerateUnmountVolumeFunc()
和operationGenerator.GenerateUnmapVolumeFunc()
中生成的这个方法。
https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/types/types.go#L64
func (o *GeneratedOperations) Run() (eventErr, detailedErr error) {
var context OperationContext
if o.CompleteFunc != nil {
c := CompleteFuncParam{
Err: &context.DetailedErr,
Migrated: &context.Migrated,
}
defer o.CompleteFunc(c)
}
if o.EventRecorderFunc != nil {
defer o.EventRecorderFunc(&eventErr)
}
// Handle panic, if any, from operationFunc()
defer runtime.RecoverFromPanic(&detailedErr)
context = o.OperationFunc()
return context.EventErr, context.DetailedErr
}
3. FileSystem模式卷的卸载
operationGenerator.GenerateUnmountVolumeFunc
中定义的OperationFunc
主要执行了subpather.CleanSubPaths()
和volumeUnmounter.TearDown()
。
func (og *operationGenerator) GenerateUnmountVolumeFunc(
volumeToUnmount MountedVolume,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
podsDir string) (volumetypes.GeneratedOperations, error) {
...
unmountVolumeFunc := func() volumetypes.OperationContext {
subpather := og.volumePluginMgr.Host.GetSubpather()
migrated := getMigratedStatusBySpec(volumeToUnmount.VolumeSpec)
// Remove all bind-mounts for subPaths
podDir := filepath.Join(podsDir, string(volumeToUnmount.PodUID))
if err := subpather.CleanSubPaths(podDir, volumeToUnmount.InnerVolumeSpecName); err != nil {
eventErr, detailedErr := volumeToUnmount.GenerateError("error cleaning subPath mounts", err)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
// Execute unmount
unmountErr := volumeUnmounter.TearDown()
if unmountErr != nil {
...
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
...
return volumetypes.NewOperationContext(nil, nil, migrated)
}
return volumetypes.GeneratedOperations{
OperationName: "volume_unmount",
OperationFunc: unmountVolumeFunc,
CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeToUnmount.VolumeSpec), "volume_unmount"),
EventRecorderFunc: nil, // nil because we do not want to generate event on error
}, nil
}
3.1 subpath.CleanSubPaths()
subpath.CleanSubPaths()
遍历/var/lib/kubelet/pods/<uid>/volume-subpaths/<volume>/
下每一个目录,这些目录分别表示不同的容器目录。
对这些目录执行walk操作得到该目录下的第一层子路径,表示某个容器挂载的subpath目录,对这些子路径执行doCleanSubPath
函数(和doCleanSubPaths是两个函数)。
在完成对每个子路径的清理后,整个/var/lib/kubelet/pods/<uid>/volume-subpaths/
目录将被删除。
https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/subpath/subpath_linux.go#L57
func (sp *subpath) CleanSubPaths(podDir string, volumeName string) error {
return doCleanSubPaths(sp.mounter, podDir, volumeName)
}
https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/subpath/subpath_linux.go#L241
func doCleanSubPaths(mounter mount.Interface, podDir string, volumeName string) error {
// scan /var/lib/kubelet/pods/<uid>/volume-subpaths/<volume>/*
subPathDir := filepath.Join(podDir, containerSubPathDirectoryName, volumeName)
...
containerDirs, err := ioutil.ReadDir(subPathDir)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return fmt.Errorf("error reading %s: %s", subPathDir, err)
}
for _, containerDir := range containerDirs {
if !containerDir.IsDir() {
...
continue
}
...
// scan /var/lib/kubelet/pods/<uid>/volume-subpaths/<volume>/<container name>/*
fullContainerDirPath := filepath.Join(subPathDir, containerDir.Name())
err = filepath.Walk(fullContainerDirPath, func(path string, info os.FileInfo, _ error) error {
if path == fullContainerDirPath {
// Skip top level directory
return nil
}
// pass through errors and let doCleanSubPath handle them
if err = doCleanSubPath(mounter, fullContainerDirPath, filepath.Base(path)); err != nil {
return err
}
// We need to check that info is not nil. This may happen when the incoming err is not nil due to stale mounts or permission errors.
if info != nil && info.IsDir() {
// skip subdirs of the volume: it only matters the first level to unmount, otherwise it would try to unmount subdir of the volume
return filepath.SkipDir
}
return nil
})
...
if err := os.Remove(fullContainerDirPath); err != nil {
return fmt.Errorf("error deleting %s: %s", fullContainerDirPath, err)
}
...
}
if err := os.Remove(subPathDir); err != nil {
return fmt.Errorf("error deleting %s: %s", subPathDir, err)
}
...
podSubPathDir := filepath.Join(podDir, containerSubPathDirectoryName)
if err := os.Remove(podSubPathDir); err != nil && !os.IsExist(err) {
return fmt.Errorf("error deleting %s: %s", podSubPathDir, err)
}
...
}
doCleanSubPath
函数中:
- 先判断路径是否是挂载点,如果不是,则直接删除(这里默认路径是一个空文件夹)
- 执行unmount,Linux上实际为调用操作系统的umount命令
- 重复一次步骤1
https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/subpath/subpath_linux.go#L308
func doCleanSubPath(mounter mount.Interface, fullContainerDirPath, subPathIndex string) error {
// process /var/lib/kubelet/pods/<uid>/volume-subpaths/<volume>/<container name>/<subPathName>
...
fullSubPath := filepath.Join(fullContainerDirPath, subPathIndex)
if err := mount.CleanupMountPoint(fullSubPath, mounter, true); err != nil {
return fmt.Errorf("error cleaning subpath mount %s: %s", fullSubPath, err)
}
...
return nil
}
func CleanupMountPoint(mountPath string, mounter Interface, extensiveMountPointCheck bool) error {
...
return doCleanupMountPoint(mountPath, mounter, extensiveMountPointCheck, corruptedMnt)
}
func doCleanupMountPoint(mountPath string, mounter Interface, extensiveMountPointCheck bool, corruptedMnt bool) error {
if !corruptedMnt {
notMnt, err = removePathIfNotMountPoint(mountPath, mounter, extensiveMountPointCheck)
...
}
...
if err := mounter.Unmount(mountPath); err != nil {
return err
}
notMnt, err = removePathIfNotMountPoint(mountPath, mounter, extensiveMountPointCheck)
...
}
func (mounter *Mounter) Unmount(target string) error {
command := exec.Command("umount", target)
output, err := command.CombinedOutput()
...
}
3.2 volumeUnmounter.TearDown()
subpather.CleanSubPaths()
方法分析结束,完成了对subpath的清理。下面还需要分析volumeUnmounter.TearDown()
方法,执行对卷本身的卸载操作。但各类型卷对TearDown方法的实现不一,我们留待分析各类型卷实现时再详细分析。
https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/volume.go#L178
// Unmounter interface provides methods to cleanup/unmount the volumes.
type Unmounter interface {
Volume
// TearDown unmounts the volume from a self-determined directory and
// removes traces of the SetUp procedure.
TearDown() error
// TearDown unmounts the volume from the specified directory and
// removes traces of the SetUp procedure.
TearDownAt(dir string) error
}
至此,operationGenerator.GenerateUnmountVolumeFunc()
分析完毕。下面分析operationGenerator.GenerateUnmapVolumeFunc()
中生成的卸载函数。
4. Block模式卷的卸载
4.1 什么是Block模式卷
上文我们说到针对FileSystem和Block模式的卷,卸载方法不同,这里处理的就是Block模式的卷。所谓Block模式,实际就是直接将块设备暴露出来,有多种类型的卷支持Block模式。以local类型卷为例,可以使用如下配置生成一个pv:
apiVersion: v1
kind: PersistentVolume
metadata:
name: example-local
spec:
capacity:
storage: 1Gi
volumeMode: Block
accessModes:
- ReadWriteOnce
persistentVolumeReclaimPolicy: Delete
storageClassName: local-storage
local:
path: /dev/vdb
nodeAffinity:
required:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/hostname
operator: In
values:
- wanglei-k8s-containerd
---
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
name: local-storage
provisioner: kubernetes.io/no-provisioner
volumeBindingMode: WaitForFirstConsumer
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: example-local-claim
spec:
volumeMode: Block
storageClassName: local-storage
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
k8s把Block的卸载过程叫做Unmap,取自devicemap的反义。
4.2 operationGenerator.GenerateUnmapVolumeFunc
在卸载Block模式的卷时:
- 先执行了通用的卸载方法
util.UnmapBlockVolume
- 然后对实现了卸载方法的特定卷
customBlockVolumeUnmapper.UnmapPodDevice()
,卷类型包括csi, fc, iscsi, rdb,但其实除了csi,其他三种类型的卷都未做任何处理。
func (og *operationGenerator) GenerateUnmapVolumeFunc(
...
unmapVolumeFunc := func() volumetypes.OperationContext {
...
// Execute common unmap
unmapErr := util.UnmapBlockVolume(og.blkUtil, globalUnmapPath, podDeviceUnmapPath, volName, volumeToUnmount.PodUID)
if unmapErr != nil {
// On failure, return error. Caller will log and retry.
eventErr, detailedErr := volumeToUnmount.GenerateError("UnmapVolume.UnmapBlockVolume failed", unmapErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
// Call UnmapPodDevice if blockVolumeUnmapper implements CustomBlockVolumeUnmapper
if customBlockVolumeUnmapper, ok := blockVolumeUnmapper.(volume.CustomBlockVolumeUnmapper); ok {
// Execute plugin specific unmap
unmapErr = customBlockVolumeUnmapper.UnmapPodDevice()
if unmapErr != nil {
// On failure, return error. Caller will log and retry.
eventErr, detailedErr := volumeToUnmount.GenerateError("UnmapVolume.UnmapPodDevice failed", unmapErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
}
...
return volumetypes.NewOperationContext(nil, nil, migrated)
}
return volumetypes.GeneratedOperations{
...
OperationFunc: unmapVolumeFunc,
...
}, nil
}
我们主要关注通用卸载方法util.UnmapBlockVolume
。该函数中执行了三个动作:
- 释放块设备loop文件
- 在pod路径下卸载卷
- 在node路径下卸载卷
https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/volume/util/util.go#L552
func UnmapBlockVolume(...) error {
// Release file descriptor lock.
err := blkUtil.DetachFileDevice(filepath.Join(globalUnmapPath, string(podUID)))
if err != nil {
return fmt.Errorf("blkUtil.DetachFileDevice failed. globalUnmapPath:%s, podUID: %s: %v",
globalUnmapPath, string(podUID), err)
}
// unmap devicePath from pod volume path
unmapDeviceErr := blkUtil.UnmapDevice(podDeviceUnmapPath, volumeMapName, false /* bindMount */)
if unmapDeviceErr != nil {
return fmt.Errorf("blkUtil.DetachFileDevice failed. podDeviceUnmapPath:%s, volumeMapName: %s, bindMount: %v: %v",
podDeviceUnmapPath, volumeMapName, false, unmapDeviceErr)
}
// unmap devicePath from global node path
unmapDeviceErr = blkUtil.UnmapDevice(globalUnmapPath, string(podUID), true /* bindMount */)
if unmapDeviceErr != nil {
return fmt.Errorf("blkUtil.DetachFileDevice failed. globalUnmapPath:%s, podUID: %s, bindMount: %v: %v",
globalUnmapPath, string(podUID), true, unmapDeviceErr)
}
return nil
}
4.2.1 释放块设备loop文件
释放块设备对应的loop文件时,先找到与块设备对应的loop文件路径,然后删除loop文件。
func (v VolumePathHandler) DetachFileDevice(path string) error {
loopPath, err := v.GetLoopDevice(path)
...
err = removeLoopDevice(loopPath)
...
}
通过遍历每个/sys/block/loop*/loop/backing_file
文件内容,与node上块设备的路径对比,相同即找到了loop文件的名称,loop文件的路径即为/dev/loop*
。其中,node上块设备路径为/var/lib/kubelet/plugins/{VOLUME_TYPE}/volumeDevices/{VOLUME_NAME}/{POD_UID}
。
func (v VolumePathHandler) GetLoopDevice(path string) (string, error) {
...
return getLoopDeviceFromSysfs(path)
}
func getLoopDeviceFromSysfs(path string) (string, error) {
// If the file is a symlink.
realPath, err := filepath.EvalSymlinks(path)
if err != nil {
return "", fmt.Errorf("failed to evaluate path %s: %s", path, err)
}
devices, err := filepath.Glob("/sys/block/loop*")
if err != nil {
return "", fmt.Errorf("failed to list loop devices in sysfs: %s", err)
}
for _, device := range devices {
backingFile := fmt.Sprintf("%s/loop/backing_file", device)
// The contents of this file is the absolute path of "path".
data, err := ioutil.ReadFile(backingFile)
if err != nil {
continue
}
// Return the first match.
backingFilePath := strings.TrimSpace(string(data))
if backingFilePath == path || backingFilePath == realPath {
return fmt.Sprintf("/dev/%s", filepath.Base(device)), nil
}
}
return "", errors.New(ErrDeviceNotFound)
}
调用losetup -d {/dev/loop?}
命令删除上文获取到的loop设备路径。
func removeLoopDevice(device string) error {
args := []string{"-d", device}
cmd := exec.Command(losetupPath, args...)
out, err := cmd.CombinedOutput()
...
}
4.2.2 在pod路径下卸载卷
pod路径/var/lib/kubelet/pods/{POD_UID}/volumeDevices/{VOLUME_TYPE}
下存在一个名为卷名称指向块设备的软链接,卸载时删除了它。
func UnmapBlockVolume(...) error {
...
unmapDeviceErr := blkUtil.UnmapDevice(podDeviceUnmapPath, volumeMapName, false /* bindMount */)
...
}
func (v VolumePathHandler) UnmapDevice(mapPath string, linkName string, bindMount bool) error {
...
return unmapSymlinkDevice(v, mapPath, linkName)
}
func unmapSymlinkDevice(v VolumePathHandler, mapPath string, linkName string) error {
// Check symbolic link exists
linkPath := filepath.Join(mapPath, string(linkName))
...
return os.Remove(linkPath)
}
4.2.3 在node路径下卸载卷
在node的卷路径下/var/lib/kubelet/plugins/{VOLUME_TYPE}/volumeDevices/{VOLUME_NAME}/
,有一个由块设备bind mount而来的名为{POD_UID}的块文件,卸载时将其卸载(unmount), 然后删除。
func UnmapBlockVolume(...) error {
...
unmapDeviceErr = blkUtil.UnmapDevice(globalUnmapPath, string(podUID), true /* bindMount */)
...
}
func (v VolumePathHandler) UnmapDevice(mapPath string, linkName string, bindMount bool) error {
...
if bindMount {
return unmapBindMountDevice(v, mapPath, linkName)
}
...
}
func unmapBindMountDevice(v VolumePathHandler, mapPath string, linkName string) error {
...
mounter := &mount.SafeFormatAndMount{Interface: mount.New(""), Exec: utilexec.New()}
if err := mounter.Unmount(linkPath); err != nil {
return fmt.Errorf("failed to unmount linkPath %s: %v", linkPath, err)
}
// Remove file
if err := os.Remove(linkPath); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to remove file %s: %v", linkPath, err)
}
return nil
}
unmount直接调用umount /var/lib/kubelet/plugins/{VOLUME_TYPE}/volumeDevices/{VOLUME_NAME}/{POD_UID}
命令卸载块设备。
func (mounter *Mounter) Unmount(target string) error {
...
command := exec.Command("umount", target)
output, err := command.CombinedOutput()
...
}
至此,operationGenerator.GenerateUnmapVolumeFunc()中生成的卸载函数分析完毕。