kubelet 进程启动流程源码分析
tags: container,k8s,源码分析
kubelet 进程启动流程源码分析
// TODO
初始化
显而易见,入口就在cmd/kubelet/kubelet.go。
k8s使用cobra库实现命令行cli, 执行命令时,执行过程定义在每个cobra.Command对象的Run函数。
https://github.com/kubernetes/kubernetes/blob/v1.23.4/cmd/kubelet/kubelet.go#L40
点击查看代码
package main
import (
...
"github.com/spf13/cobra"
"k8s.io/kubernetes/cmd/kubelet/app"
...
)
func main() {
command := app.NewKubeletCommand()
...
code := run(command)
...
}
func run(command *cobra.Command) int {
...
if err := command.Execute(); err != nil {
...
}
cobra.Command的实例化中,大部分都在解析参数,不是本文要关注的,要分析启动流程,需要跟入Run()
函数。
https://github.com/kubernetes/kubernetes/blob/v1.23.4/cmd/kubelet/app/server.go#L301
点击查看代码
func NewKubeletCommand() *cobra.Command {
...
cmd := &cobra.Command{
Use: componentKubelet,
...
Run: func(cmd *cobra.Command, args []string) {
...
// run the kubelet
if err := Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate); err != nil {
...
},
}
...
return cmd
}
又跟进了很久,跟到createAndInitKubelet
函数,创建并初始化了kubelet对象。
https://github.com/kubernetes/kubernetes/blob/v1.23.4/cmd/kubelet/app/server.go#L301
点击查看代码
func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error {
...
if err := run(ctx, s, kubeDeps, featureGate); err != nil {
...
}
func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
...
if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
...
}
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
...
k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
kubeDeps,
&kubeServer.ContainerRuntimeOptions,
kubeServer.ContainerRuntime,
...
)
...
if runOnce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %w", err)
}
klog.InfoS("Started kubelet as runonce")
} else {
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
klog.InfoS("Started kubelet")
}
return nil
}
在创建和初始化kubelet时,对各成员进行了初始化。包括containerRuntime, imageManager, volumeManager等。
https://github.com/kubernetes/kubernetes/blob/v1.23.4/cmd/kubelet/app/server.go#L1286
func createAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeDeps *kubelet.Dependencies,
...
) (k kubelet.Bootstrap, err error) {
k, err = kubelet.NewMainKubelet(kubeCfg,
kubeDeps,
crOptions,
containerRuntime,
...
)
...
return k, nil
}
https://github.com/kubernetes/kubernetes/blob/v1.23.4/pkg/kubelet/kubelet.go#L347
func NewMainKubelet(...) (*Kubelet, error) {
...
klet.podWorkers = newPodWorkers(...)
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(...)
if err != nil {
return nil, err
}
klet.containerRuntime = runtime
klet.streamingRuntime = runtime
klet.runner = runtime
...
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
...
klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))
// setup imageManager
imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions.PodSandboxImage)
if err != nil {
return nil, fmt.Errorf("failed to initialize image manager: %v", err)
}
klet.imageManager = imageManager
...
klet.probeManager = prober.NewManager(...)
tokenManager := token.NewManager(kubeDeps.KubeClient)
...
klet.volumePluginMgr, err =
NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, tokenManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber)
if err != nil {
return nil, err
}
klet.pluginManager = pluginmanager.NewPluginManager(
klet.getPluginsRegistrationDir(), /* sockDir */
kubeDeps.Recorder,
)
...
klet.volumeManager = volumemanager.NewVolumeManager(...)
...
return klet, nil
}
kubelet对象初始化完成后,调用startKubelet
启动kubelet服务。
https://github.com/kubernetes/kubernetes/blob/v1.23.4/cmd/kubelet/app/server.go#L1230
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
...
k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
...
// process pods and exit.
if runOnce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %w", err)
}
klog.InfoS("Started kubelet as runonce")
} else {
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
klog.InfoS("Started kubelet")
}
return nil
}
点击展开代码
https://github.com/kubernetes/kubernetes/blob/v1.23.4/pkg/kubelet/kubelet.go#L1456
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
// start the kubelet
go k.Run(podCfg.Updates())
// start the kubelet server
if enableServer {
go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth)
}
if kubeCfg.ReadOnlyPort > 0 {
go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
go k.ListenAndServePodResources()
}
}
在启动kubelet服务过程中,具体启动了volumeManager,runtimeClassManager等服务,然后开启了syncLoop
流程。
https://github.com/kubernetes/kubernetes/blob/v1.23.4/pkg/kubelet/kubelet.go#L1456
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
...
// Start the cloud provider sync manager
if kl.cloudResourceSyncManager != nil {
go kl.cloudResourceSyncManager.Run(wait.NeverStop)
}
...
// Start volume manager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
...
// Start component sync loops.
kl.statusManager.Start()
// Start syncing RuntimeClasses if enabled.
if kl.runtimeClassManager != nil {
kl.runtimeClassManager.Start(wait.NeverStop)
}
// Start the pod lifecycle event generator.
kl.pleg.Start()
kl.syncLoop(updates, kl)
}
syncLoop
reconciler是k8s最重要的设计模式之一,pod的生命周期中的所有操作都是由状态驱动的,reconciler会获取理想状态和现实状态,比较两者间的区别,当有新的状态时,reconciler会对pod执行修改或删除等操作,而syncLoop就是reconciler的一种体现。
syncLoop是处理变化的主循环。它观察来自三个通道(文件、apiserver和http)的变化,对于看到的任何新的变化,将在期望状态和运行状态之间进行同步。如果没有看到配置的变化,将每隔同步频率秒同步最后已知的期望状态。
https://github.com/kubernetes/kubernetes/blob/v1.23.4/pkg/kubelet/kubelet.go#L2011
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
...
for {
...
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
...
}
}
当观察到任何变化时,将调用handler对应的方法,一些是同步状态变化,一些是真正处理状态变化。这里的handler即上文中初始化的kubelet对象。
https://github.com/kubernetes/kubernetes/blob/v1.23.4/pkg/kubelet/kubelet.go#L2050
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
// Update from a config source; dispatch it to the right handler
// callback.
if !open {
klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
return false
}
switch u.Op {
case kubetypes.ADD:
klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjs(u.Pods))
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjs(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
klog.ErrorS(nil, "Kubelet does not support snapshot update")
default:
klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
}
...
case e := <-plegCh:
...
if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it.
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
handler.HandlePodSyncs([]*v1.Pod{pod})
} else {
// If the pod no longer exists, ignore the event.
klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
}
}
...
case <-syncCh:
...
handler.HandlePodSyncs(podsToSync)
case update := <-kl.livenessManager.Updates():
...
handleProbeSync(kl, update, handler, "liveness", "unhealthy")
...
case update := <-kl.readinessManager.Updates():
...
handleProbeSync(kl, update, handler, "readiness", status)
case update := <-kl.startupManager.Updates():
...
handleProbeSync(kl, update, handler, "startup", status)
case <-housekeepingCh:
...
if err := handler.HandlePodCleanups(); err != nil {
...
}
return true
}