tags: container,k8s,源码分析

kubelet 进程启动流程源码分析

// TODO

初始化

显而易见,入口就在cmd/kubelet/kubelet.go。

k8s使用cobra库实现命令行cli, 执行命令时,执行过程定义在每个cobra.Command对象的Run函数。

https://github.com/kubernetes/kubernetes/blob/v1.23.4/cmd/kubelet/kubelet.go#L40

点击查看代码
package main

import (
    ...
    "github.com/spf13/cobra"
    "k8s.io/kubernetes/cmd/kubelet/app"
    ...
)

func main() {
    command := app.NewKubeletCommand()
    ...
    code := run(command)
    ...
}

func run(command *cobra.Command) int {
    ...
    if err := command.Execute(); err != nil {
    ...
}

cobra.Command的实例化中,大部分都在解析参数,不是本文要关注的,要分析启动流程,需要跟入Run()函数。

https://github.com/kubernetes/kubernetes/blob/v1.23.4/cmd/kubelet/app/server.go#L301

点击查看代码
func NewKubeletCommand() *cobra.Command {
    ...
    cmd := &cobra.Command{
        Use: componentKubelet,
        ...
        Run: func(cmd *cobra.Command, args []string) {
            ...
            // run the kubelet
            if err := Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate); err != nil {
            ...
        },
    }
    ...
    return cmd
}

又跟进了很久,跟到createAndInitKubelet函数,创建并初始化了kubelet对象。

https://github.com/kubernetes/kubernetes/blob/v1.23.4/cmd/kubelet/app/server.go#L301

点击查看代码
func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error {
    ...
    if err := run(ctx, s, kubeDeps, featureGate); err != nil {
    ...
}

func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
    ...
    if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
    ...
}

    
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
    ...
    k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
        kubeDeps,
        &kubeServer.ContainerRuntimeOptions,
        kubeServer.ContainerRuntime,
        ...
    )
    ...
    if runOnce {
        if _, err := k.RunOnce(podCfg.Updates()); err != nil {
            return fmt.Errorf("runonce failed: %w", err)
        }
        klog.InfoS("Started kubelet as runonce")
    } else {
        startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
        klog.InfoS("Started kubelet")
    }
    return nil
}

在创建和初始化kubelet时,对各成员进行了初始化。包括containerRuntime, imageManager, volumeManager等。

https://github.com/kubernetes/kubernetes/blob/v1.23.4/cmd/kubelet/app/server.go#L1286

func createAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
    kubeDeps *kubelet.Dependencies,
    ...
) (k kubelet.Bootstrap, err error) {
    k, err = kubelet.NewMainKubelet(kubeCfg,
        kubeDeps,
        crOptions,
        containerRuntime,
        ...
    )
    ...
    return k, nil
}

https://github.com/kubernetes/kubernetes/blob/v1.23.4/pkg/kubelet/kubelet.go#L347

func NewMainKubelet(...) (*Kubelet, error) {
    ...
    klet.podWorkers = newPodWorkers(...)
    runtime, err := kuberuntime.NewKubeGenericRuntimeManager(...)
    if err != nil {
        return nil, err
    }
    klet.containerRuntime = runtime
    klet.streamingRuntime = runtime
    klet.runner = runtime
    ...
    klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
    ...
    klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))

    // setup imageManager
    imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions.PodSandboxImage)
    if err != nil {
        return nil, fmt.Errorf("failed to initialize image manager: %v", err)
    }
    klet.imageManager = imageManager
    ...
    klet.probeManager = prober.NewManager(...)
    tokenManager := token.NewManager(kubeDeps.KubeClient)
    ...
    klet.volumePluginMgr, err =
        NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, tokenManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber)
    if err != nil {
        return nil, err
    }
    klet.pluginManager = pluginmanager.NewPluginManager(
        klet.getPluginsRegistrationDir(), /* sockDir */
        kubeDeps.Recorder,
    )
    ...
    klet.volumeManager = volumemanager.NewVolumeManager(...)
    ...
    return klet, nil
}

kubelet对象初始化完成后,调用startKubelet启动kubelet服务。

https://github.com/kubernetes/kubernetes/blob/v1.23.4/cmd/kubelet/app/server.go#L1230

func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
    ...
    k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
    ...
    // process pods and exit.
    if runOnce {
        if _, err := k.RunOnce(podCfg.Updates()); err != nil {
            return fmt.Errorf("runonce failed: %w", err)
        }
        klog.InfoS("Started kubelet as runonce")
    } else {
        startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
        klog.InfoS("Started kubelet")
    }
    return nil
}
点击展开代码

https://github.com/kubernetes/kubernetes/blob/v1.23.4/pkg/kubelet/kubelet.go#L1456

func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
	// start the kubelet
	go k.Run(podCfg.Updates())

	// start the kubelet server
	if enableServer {
		go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth)
	}
	if kubeCfg.ReadOnlyPort > 0 {
		go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
	}
	if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
		go k.ListenAndServePodResources()
	}
}

在启动kubelet服务过程中,具体启动了volumeManager,runtimeClassManager等服务,然后开启了syncLoop流程。

https://github.com/kubernetes/kubernetes/blob/v1.23.4/pkg/kubelet/kubelet.go#L1456

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
    ...
    // Start the cloud provider sync manager
    if kl.cloudResourceSyncManager != nil {
        go kl.cloudResourceSyncManager.Run(wait.NeverStop)
    }
    ...
    // Start volume manager
    go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
    ...
    // Start component sync loops.
    kl.statusManager.Start()

    // Start syncing RuntimeClasses if enabled.
    if kl.runtimeClassManager != nil {
        kl.runtimeClassManager.Start(wait.NeverStop)
    }

    // Start the pod lifecycle event generator.
    kl.pleg.Start()
    kl.syncLoop(updates, kl)
}

syncLoop

reconciler是k8s最重要的设计模式之一,pod的生命周期中的所有操作都是由状态驱动的,reconciler会获取理想状态和现实状态,比较两者间的区别,当有新的状态时,reconciler会对pod执行修改或删除等操作,而syncLoop就是reconciler的一种体现。

syncLoop是处理变化的主循环。它观察来自三个通道(文件、apiserver和http)的变化,对于看到的任何新的变化,将在期望状态和运行状态之间进行同步。如果没有看到配置的变化,将每隔同步频率秒同步最后已知的期望状态。

https://github.com/kubernetes/kubernetes/blob/v1.23.4/pkg/kubelet/kubelet.go#L2011

func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
    ...
    for {
        ...
        if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
            break
        }
        ...
    }
}

当观察到任何变化时,将调用handler对应的方法,一些是同步状态变化,一些是真正处理状态变化。这里的handler即上文中初始化的kubelet对象。

https://github.com/kubernetes/kubernetes/blob/v1.23.4/pkg/kubelet/kubelet.go#L2050

func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
    select {
    case u, open := <-configCh:
        // Update from a config source; dispatch it to the right handler
        // callback.
        if !open {
            klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
            return false
        }

        switch u.Op {
        case kubetypes.ADD:
            klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjs(u.Pods))
            // After restarting, kubelet will get all existing pods through
            // ADD as if they are new pods. These pods will then go through the
            // admission process and *may* be rejected. This can be resolved
            // once we have checkpointing.
            handler.HandlePodAdditions(u.Pods)
        case kubetypes.UPDATE:
            klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjs(u.Pods))
            handler.HandlePodUpdates(u.Pods)
        case kubetypes.REMOVE:
            klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjs(u.Pods))
            handler.HandlePodRemoves(u.Pods)
        case kubetypes.RECONCILE:
            klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjs(u.Pods))
            handler.HandlePodReconcile(u.Pods)
        case kubetypes.DELETE:
            klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjs(u.Pods))
            // DELETE is treated as a UPDATE because of graceful deletion.
            handler.HandlePodUpdates(u.Pods)
        case kubetypes.SET:
            // TODO: Do we want to support this?
            klog.ErrorS(nil, "Kubelet does not support snapshot update")
        default:
            klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
        }
        ...

    case e := <-plegCh:
        ...
        if isSyncPodWorthy(e) {
            // PLEG event for a pod; sync it.
            if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
                klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
                handler.HandlePodSyncs([]*v1.Pod{pod})
            } else {
                // If the pod no longer exists, ignore the event.
                klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
            }
        }
        ...
    case <-syncCh:
        ...
        handler.HandlePodSyncs(podsToSync)
    case update := <-kl.livenessManager.Updates():
        ...
        handleProbeSync(kl, update, handler, "liveness", "unhealthy")
        ...
    case update := <-kl.readinessManager.Updates():
        ...
        handleProbeSync(kl, update, handler, "readiness", status)
    case update := <-kl.startupManager.Updates():
        ...
        handleProbeSync(kl, update, handler, "startup", status)
    case <-housekeepingCh:
        ...
        if err := handler.HandlePodCleanups(); err != nil {
        ...
    }
    return true
}