Junedayday Blog

六月天天的个人博客

0%

聚焦目标

理解 kube-controller-manager 的运行机制

目录

  1. 运行的主函数
  2. 控制器的启动函数
  3. 引入概念ReplicaSet
  4. 查看ReplicaSetController
  5. ReplicaSet的核心实现函数
  6. 总结

Run

我们找到了对应的主函数,看看其中的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
// configz 模块,在kube-scheduler分析中已经了解
if cfgz, err := configz.New(ConfigzName); err == nil {
cfgz.Set(c.ComponentConfig)
} else {
klog.Errorf("unable to register configz: %v", err)
}

// 健康监测与http服务,跳过
var checks []healthz.HealthChecker
var unsecuredMux *mux.PathRecorderMux

run := func(ctx context.Context) {
rootClientBuilder := controller.SimpleControllerClientBuilder{
ClientConfig: c.Kubeconfig,
}

// client认证相关
var clientBuilder controller.ControllerClientBuilder

// 创建controller的上下文context
controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
if err != nil {
klog.Fatalf("error building controller context: %v", err)
}
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController

if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
klog.Fatalf("error starting controllers: %v", err)
}

// 这里的 InformerFactory 和我们在kube-scheduler中看的 SharedInformerFactory 基本一致
controllerContext.InformerFactory.Start(controllerContext.Stop)
controllerContext.ObjectOrMetadataInformerFactory.Start(controllerContext.Stop)
close(controllerContext.InformersStarted)

select {}
}

// 是否进行选举
if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
run(context.TODO())
panic("unreachable")
}

// 拼接出一个全局唯一的id
id, err := os.Hostname()
if err != nil {
return err
}
id = id + "_" + string(uuid.NewUUID())

rl, err := resourcelock.New(c.ComponentConfig.Generic.LeaderElection.ResourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
c.ComponentConfig.Generic.LeaderElection.ResourceName,
c.LeaderElectionClient.CoreV1(),
c.LeaderElectionClient.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: c.EventRecorder,
})
if err != nil {
klog.Fatalf("error creating lock: %v", err)
}

// 正常情况下都是阻塞在RunOrDie这个函数中,不停地进行选举相关的工作
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
// 开始成为Leader的时候,调用run函数
OnStartedLeading: run,
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
},
WatchDog: electionChecker,
Name: "kube-controller-manager",
})
panic("unreachable")
}

StartControllers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error {
// 关键性的循环,启动每个controllers,key为控制器名字,value为初始化函数
for controllerName, initFn := range controllers {
// 是否允许启动
if !ctx.IsControllerEnabled(controllerName) {
klog.Warningf("%q is disabled", controllerName)
continue
}
time.Sleep(wait.Jitter(ctx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
klog.V(1).Infof("Starting %q", controllerName)
// 调用init函数进行启动
debugHandler, started, err := initFn(ctx)
if err != nil {
klog.Errorf("Error starting %q", controllerName)
return err
}
if !started {
klog.Warningf("Skipping %q", controllerName)
continue
}
// 注册对应controller到debug的url中
if debugHandler != nil && unsecuredMux != nil {
basePath := "/debug/controllers/" + controllerName
unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler))
unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler))
}
klog.Infof("Started %q", controllerName)
}

return nil
}

// 我们再去传入controller的函数去看看,对应的controller有哪些,这里有我们很多常见的概念,今天不一一细讲
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
controllers := map[string]InitFunc{}
controllers["endpoint"] = startEndpointController
controllers["endpointslice"] = startEndpointSliceController
controllers["endpointslicemirroring"] = startEndpointSliceMirroringController
controllers["replicationcontroller"] = startReplicationController
controllers["podgc"] = startPodGCController
controllers["resourcequota"] = startResourceQuotaController
controllers["namespace"] = startNamespaceController
controllers["serviceaccount"] = startServiceAccountController
controllers["garbagecollector"] = startGarbageCollectorController
controllers["daemonset"] = startDaemonSetController
controllers["job"] = startJobController
controllers["deployment"] = startDeploymentController
controllers["replicaset"] = startReplicaSetController
controllers["horizontalpodautoscaling"] = startHPAController
controllers["disruption"] = startDisruptionController
controllers["statefulset"] = startStatefulSetController
controllers["cronjob"] = startCronJobController
controllers["csrsigning"] = startCSRSigningController
controllers["csrapproving"] = startCSRApprovingController
controllers["csrcleaner"] = startCSRCleanerController
controllers["ttl"] = startTTLController
controllers["bootstrapsigner"] = startBootstrapSignerController
controllers["tokencleaner"] = startTokenCleanerController
controllers["nodeipam"] = startNodeIpamController
controllers["nodelifecycle"] = startNodeLifecycleController
if loopMode == IncludeCloudLoops {
controllers["service"] = startServiceController
controllers["route"] = startRouteController
controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
}
controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
controllers["attachdetach"] = startAttachDetachController
controllers["persistentvolume-expander"] = startVolumeExpandController
controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
controllers["pvc-protection"] = startPVCProtectionController
controllers["pv-protection"] = startPVProtectionController
controllers["ttl-after-finished"] = startTTLAfterFinishedController
controllers["root-ca-cert-publisher"] = startRootCACertPublisher
controllers["ephemeral-volume"] = startEphemeralVolumeController

return controllers
}

ReplicaSet

由于我们的示例是创建一个nginx的pod,涉及到kube-controller-manager的内容很少。

但是,为了加深大家对 kube-controller-manager 的认识,我们引入一个新的概念 - ReplicaSet,下面是官方说明:

A ReplicaSet’s purpose is to maintain a stable set of replica Pods running at any given time. As such, it is often used to guarantee the availability of a specified number of identical Pods.

ReplicaSet 的目的是维护一组在任何时候都处于运行状态的 Pod 副本的稳定集合。 因此,它通常用来保证给定数量的、完全相同的 Pod 的可用性。

简单来说,ReplicaSet 就是用来生成指定个数的Pod。

ReplicaSetController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {
return nil, false, nil
}

// 用goroutine异步运行,包含了 ReplicaSet和Pod 的两个Informer
// 这一点很好理解:我们是要控制ReplicaSet声明的数量和运行的Pod数量一致,需要同时观察者两种资源
go replicaset.NewReplicaSetController(
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
replicaset.BurstReplicas,
).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
return nil, true, nil
}

// 运行函数
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer rsc.queue.ShutDown()

controllerName := strings.ToLower(rsc.Kind)
klog.Infof("Starting %v controller", controllerName)
defer klog.Infof("Shutting down %v controller", controllerName)

if !cache.WaitForNamedCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
return
}

for i := 0; i < workers; i++ {
// 工作的函数
go wait.Until(rsc.worker, time.Second, stopCh)
}

<-stopCh
}

func (rsc *ReplicaSetController) worker() {
// 继续查找实现
for rsc.processNextWorkItem() {
}
}

func (rsc *ReplicaSetController) processNextWorkItem() bool {
// 这里也有个queue的概念,可以类比kube-scheduler中的实现
// 不同的是,这里的queue是 workqueue.RateLimitingInterface ,也就是限制速率的,具体实现今天不细看

// 获取元素
key, quit := rsc.queue.Get()
if quit {
return false
}
defer rsc.queue.Done(key)

// 处理对应的元素
err := rsc.syncHandler(key.(string))
if err == nil {
rsc.queue.Forget(key)
return true
}

utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
rsc.queue.AddRateLimited(key)

return true
}

// 再回过头,去查看syncHandler的具体实现
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {

rsc.syncHandler = rsc.syncReplicaSet

return rsc
}

syncReplicaSet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
}()

// 从key中拆分出 namespace 和 name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}

// 根据name,从 Lister 获取对应的 ReplicaSets 信息
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
if errors.IsNotFound(err) {
klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)
rsc.expectations.DeleteExpectations(key)
return nil
}
if err != nil {
return err
}

rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
// 获取 selector (k8s 是根据selector中的label来匹配 ReplicaSets 和 Pod 的)
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err))
return nil
}

// 根据namespace和labels获取所有的pod
allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
if err != nil {
return err
}

// 过滤无效的pod
filteredPods := controller.FilterActivePods(allPods)

// 根据selector再过滤pod
filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
if err != nil {
return err
}

var manageReplicasErr error
if rsNeedsSync && rs.DeletionTimestamp == nil {
// 管理 ReplicaSet,下面详细分析
manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
}
rs = rs.DeepCopy()
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)

// 更新状态
updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
if err != nil {
return err
}
if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
}
return manageReplicasErr
}

// 我们再一起看看,当Pod数量和ReplicaSet中声明的不同时,是怎么工作的
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
// diff = 当前pod数 - 期望pod数
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
rsKey, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
return nil
}

// diff小于0,表示需要扩容,即新增Pod
if diff < 0 {

// 具体的实现暂时不细看

// diff 大于0,即需要缩容
} else if diff > 0 {

}

return nil
}

Summary

kube-controller-manager 的核心思想是: 根据期望状态当前状态,管理Kubernetes中的资源。

以ReplicaSet为例,它对比了定义声明的Pod数当前集群中满足条件的Pod数,进行相对应的扩缩容。

Github: https://github.com/Junedayday/code_reading

Blog: http://junes.tech/

Bilibili:https://space.bilibili.com/293775192

公众号:golangcoding

聚焦目标

理解一个pod的被调度的大致流程

目录

  1. 分析Scheduler的结构体
  2. 往SchedulingQueue里
  3. 调度一个pod对象
    1. 调度计算结果 - ScheduleResult
    2. 初步推算 - Assume
    3. 实际绑定 - Bind
  4. 将绑定成功后的数据更新到etcd
  5. pod绑定Node的总结

Scheduler

在前面,我们了解了Pod调度算法的注册Informer机制来监听kube-apiserver上的资源变化,今天这一讲,我们就将两者串联起来,看看在kube-scheduler中,Informer监听到资源变化后,如何用调度算法将pod进行调度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// 在运行 kube-scheduler 的初期,我们创建了一个Scheduler的数据结构,回头再看看有什么和pod调度算法相关的
type Scheduler struct {
SchedulerCache internalcache.Cache
Algorithm core.ScheduleAlgorithm

// 获取下一个需要调度的Pod
NextPod func() *framework.QueuedPodInfo

Error func(*framework.QueuedPodInfo, error)
StopEverything <-chan struct{}

// 等待调度的Pod队列,我们重点看看这个队列是什么
SchedulingQueue internalqueue.SchedulingQueue

Profiles profile.Map
scheduledPodsHasSynced func() bool
client clientset.Interface
}

// Scheduler的实例化函数
func New(){
var sched *Scheduler
switch {
// 从 Provider 创建
case source.Provider != nil:
sc, err := configurator.createFromProvider(*source.Provider)
sched = sc
// 从文件或者ConfigMap中创建
case source.Policy != nil:
sc, err := configurator.createFromConfig(*policy)
sched = sc
default:
return nil, fmt.Errorf("unsupported algorithm source: %v", source)
}
}

// 两个创建方式,底层都是调用的 create 函数
func (c *Configurator) createFromProvider(providerName string) (*Scheduler, error) {
return c.create()
}
func (c *Configurator) createFromConfig(policy schedulerapi.Policy) (*Scheduler, error){
return c.create()
}

func (c *Configurator) create() (*Scheduler, error) {
// 实例化 podQueue
podQueue := internalqueue.NewSchedulingQueue(
lessFn,
internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),
internalqueue.WithPodNominator(nominator),
)

return &Scheduler{
SchedulerCache: c.schedulerCache,
Algorithm: algo,
Profiles: profiles,
// NextPod 函数依赖于 podQueue
NextPod: internalqueue.MakeNextPodFunc(podQueue),
Error: MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),
StopEverything: c.StopEverything,
// 调度队列被赋值为podQueue
SchedulingQueue: podQueue,
}, nil
}

// 再看看这个调度队列的初始化函数,从命名可以看到是一个优先队列,它的实现细节暂不细看
// 结合实际情况思考下,pod会有重要程度的区分,所以调度的顺序需要考虑优先级的
func NewSchedulingQueue(lessFn framework.LessFunc, opts ...Option) SchedulingQueue {
return NewPriorityQueue(lessFn, opts...)
}

SchedulingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// 在上面实例化Scheduler后,有个注册事件 Handler 的函数:addAllEventHandlers(sched, informerFactory, podInformer)
func addAllEventHandlers(
sched *Scheduler,
informerFactory informers.SharedInformerFactory,
podInformer coreinformers.PodInformer,
) {
/*
函数前后有很多注册的Handler,但是和未调度pod添加到队列相关的,只有这个
*/
podInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
// 定义过滤函数:必须为未调度的pod
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return !assignedPod(t) && responsibleForPod(t, sched.Profiles)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
return !assignedPod(pod) && responsibleForPod(pod, sched.Profiles)
}
utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
return false
default:
utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
return false
}
},
// 增改删三个操作对应的Handler,操作到对应的Queue
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: sched.addPodToSchedulingQueue,
UpdateFunc: sched.updatePodInSchedulingQueue,
DeleteFunc: sched.deletePodFromSchedulingQueue,
},
},
)
}

// 牢记我们第一阶段要分析的对象:create nginx pod,所以进入这个add的操作,对应加入到队列
func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
pod := obj.(*v1.Pod)
klog.V(3).Infof("add event for unscheduled pod %s/%s", pod.Namespace, pod.Name)
// 加入到队列
if err := sched.SchedulingQueue.Add(pod); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
}
}

// 入队操作我们清楚了,那出队呢?我们回过头去看看上面定义的NextPod的方法实现
func MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo {
return func() *framework.QueuedPodInfo {
// 从队列中弹出
podInfo, err := queue.Pop()
if err == nil {
klog.V(4).Infof("About to try and schedule pod %v/%v", podInfo.Pod.Namespace, podInfo.Pod.Name)
return podInfo
}
klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err)
return nil
}
}

scheduleOne

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// 了解入队和出队操作后,我们看一下Scheduler运行的过程
func (sched *Scheduler) Run(ctx context.Context) {
if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
return
}
sched.SchedulingQueue.Run()
// 调度一个pod对象
wait.UntilWithContext(ctx, sched.scheduleOne, 0)
sched.SchedulingQueue.Close()
}

// 接下来scheduleOne方法代码很长,我们一步一步来看
func (sched *Scheduler) scheduleOne(ctx context.Context) {
// podInfo 就是从队列中获取到的pod对象
podInfo := sched.NextPod()
// 检查pod的有效性
if podInfo == nil || podInfo.Pod == nil {
return
}
pod := podInfo.Pod
// 根据定义的 pod.Spec.SchedulerName 查到对应的profile
prof, err := sched.profileForPod(pod)
if err != nil {
klog.Error(err)
return
}
// 可以跳过调度的情况,一般pod进不来
if sched.skipPodSchedule(prof, pod) {
return
}

// 调用调度算法,获取结果
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
if err != nil {
/*
出现调度失败的情况:
这个时候可能会触发抢占preempt,抢占是一套复杂的逻辑,后面我们专门会讲
目前假设各类资源充足,能正常调度
*/
}
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))

// assumePod 是假设这个Pod按照前面的调度算法分配后,进行验证
assumedPodInfo := podInfo.DeepCopy()
assumedPod := assumedPodInfo.Pod
// SuggestedHost 为建议的分配的Host
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
if err != nil {
// 失败就重新分配,不考虑这种情况
}

// 运行相关插件的代码先跳过

// 异步绑定pod
go func() {

// 有一系列的检查工作

// 真正做绑定的动作
err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state)
if err != nil {
// 错误处理,清除状态并重试
} else {
// 打印结果,调试时将log level调整到2以上
if klog.V(2).Enabled() {
klog.InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
}
// metrics中记录相关的监控指标
metrics.PodScheduled(prof.Name, metrics.SinceInSeconds(start))
metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts))
metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))

// 运行绑定后的插件
prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
}
}()
}

ScheduleResult

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// 调用算法下的Schedule
func New(){
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
}

func (c *Configurator) create() (*Scheduler, error) {
algo := core.NewGenericScheduler(
c.schedulerCache,
c.nodeInfoSnapshot,
extenders,
c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
c.disablePreemption,
c.percentageOfNodesToScore,
)
return &Scheduler{
Algorithm: algo,
}, nil
}

// genericScheduler 的 Schedule 的实现
func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
// 对 pod 进行 pvc 的信息检查
if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
return result, err
}
// 对当前的信息做一个快照
if err := g.snapshot(); err != nil {
return result, err
}
// Node 节点数量为0,表示无可用节点
if g.nodeInfoSnapshot.NumNodes() == 0 {
return result, ErrNoNodesAvailable
}
// Predict阶段:找到所有满足调度条件的节点feasibleNodes,不满足的就直接过滤
feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod)
// 没有可用节点直接报错
if len(feasibleNodes) == 0 {
return result, &FitError{
Pod: pod,
NumAllNodes: g.nodeInfoSnapshot.NumNodes(),
FilteredNodesStatuses: filteredNodesStatuses,
}
}
// 只有一个节点就直接选用
if len(feasibleNodes) == 1 {
return ScheduleResult{
SuggestedHost: feasibleNodes[0].Name,
EvaluatedNodes: 1 + len(filteredNodesStatuses),
FeasibleNodes: 1,
}, nil
}
// Priority阶段:通过打分,找到一个分数最高、也就是最优的节点
priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, feasibleNodes)
host, err := g.selectHost(priorityList)

return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(feasibleNodes) + len(filteredNodesStatuses),
FeasibleNodes: len(feasibleNodes),
}, err
}

/*
Predict 和 Priority 是选择调度节点的两个关键性步骤, 它的底层调用了各种algorithm算法。我们暂时不细看。
以我们前面讲到过的 NodeName 算法为例,节点必须与 NodeName 匹配,它是属于Predict阶段的。
*/

Assume

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
// 将 host 填入到 pod spec字段的nodename,假定分配到对应的节点上
assumed.Spec.NodeName = host
// 调用 SchedulerCache 下的 AssumePod
if err := sched.SchedulerCache.AssumePod(assumed); err != nil {
klog.Errorf("scheduler cache AssumePod failed: %v", err)
return err
}
if sched.SchedulingQueue != nil {
sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
}
return nil
}

// 回头去找 SchedulerCache 初始化的地方
func (c *Configurator) create() (*Scheduler, error) {
return &Scheduler{
SchedulerCache: c.schedulerCache,
}, nil
}

func New() (*Scheduler, error) {
// 这里就是初始化的实例 schedulerCache
schedulerCache := internalcache.New(30*time.Second, stopEverything)
configurator := &Configurator{
schedulerCache: schedulerCache,
}
}

// 看看AssumePod做了什么
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
// 获取 pod 的 uid
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
// 加锁操作,保证并发情况下的一致性
cache.mu.Lock()
defer cache.mu.Unlock()
// 根据 uid 找不到 pod 当前的状态
if _, ok := cache.podStates[key]; ok {
return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)
}

// 把 Assume Pod 的信息放到对应 Node 节点中
cache.addPod(pod)
// 把 pod 状态设置为 Assume 成功
ps := &podState{
pod: pod,
}
cache.podStates[key] = ps
cache.assumedPods[key] = true
return nil
}

Bind

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (sched *Scheduler) bind(ctx context.Context, prof *profile.Profile, assumed *v1.Pod, targetNode string, state *framework.CycleState) (err error) {
start := time.Now()
// 把 assumed 的 pod 信息保存下来
defer func() {
sched.finishBinding(prof, assumed, targetNode, start, err)
}()
// 阶段1: 运行扩展绑定进行验证,如果已经绑定报错
bound, err := sched.extendersBinding(assumed, targetNode)
if bound {
return err
}
// 阶段2:运行绑定插件验证状态
bindStatus := prof.RunBindPlugins(ctx, state, assumed, targetNode)
if bindStatus.IsSuccess() {
return nil
}
if bindStatus.Code() == framework.Error {
return bindStatus.AsError()
}
return fmt.Errorf("bind status: %s, %v", bindStatus.Code().String(), bindStatus.Message())
}

Update To Etcd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 这块的代码我不做细致的逐层分析了,大家根据兴趣自行探索
func (b DefaultBinder) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
klog.V(3).Infof("Attempting to bind %v/%v to %v", p.Namespace, p.Name, nodeName)
binding := &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID},
Target: v1.ObjectReference{Kind: "Node", Name: nodeName},
}
// ClientSet就是访问kube-apiserver的客户端,将数据更新上去
err := b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
return nil
}

Summary

今天这一次分享比较长,我们一起来总结一下:

  1. Pod的调度是通过一个队列SchedulingQueue异步工作的
    1. 监听到对应pod事件后,放入队列
    2. 有个消费者从队列中获取pod,进行调度
  2. 单个pod的调度主要分为3个步骤:
    1. 根据Predict和Priority两个阶段,调用各自的算法插件,选择最优的Node
    2. Assume这个Pod被调度到对应的Node,保存到cache
    3. 用extender和plugins进行验证,如果通过则绑定
  3. 绑定成功后,将数据通过client向kube-apiserver发送,更新etcd

Github: https://github.com/Junedayday/code_reading

Blog: http://junes.tech/

Bilibili:https://space.bilibili.com/293775192

公众号:golangcoding

聚焦目标

了解Informer在发现资源变化后,是怎么处理的

目录

  1. 查看消费的过程
  2. 掌握Index数据结构
  3. 信息的分发distribute
  4. Informer的综合思考

Process

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
func (c *controller) processLoop() {
for {
// Pop出Object元素
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// 重新进队列
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}

// 去查看Pop的具体实现
func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
// 调用process去处理item,然后返回
item, ok := f.items[id]
delete(f.items, id)
err := process(item)
return item, err
}
}

// 然后去查一下 PopProcessFunc 的定义,在创建controller前
cfg := &Config{
Process: s.HandleDeltas,
}

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()

for _, d := range obj.(Deltas) {
switch d.Type {
// 增、改、替换、同步
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
// 先去indexer查询
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
// 如果数据已经存在,就执行Update逻辑
if err := s.indexer.Update(d.Object); err != nil {
return err
}

isSync := false
switch {
case d.Type == Sync:
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
// 分发Update事件
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
// 没查到数据,就执行Add操作
if err := s.indexer.Add(d.Object); err != nil {
return err
}
// 分发 Add 事件
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
// 删除
case Deleted:
// 去indexer删除
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
// 分发 delete 事件
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}

Index

Index 的定义为资源的本地存储,保持与etcd中的资源信息一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 我们去看看Index是怎么创建的
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
// indexer 的初始化
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
clock: realClock,
}
return sharedIndexInformer
}

// 生成一个map和func组合而成的Indexer
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}

// ThreadSafeStore的底层是一个并发安全的map,具体实现我们暂不考虑
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
return &threadSafeMap{
items: map[string]interface{}{},
indexers: indexers,
indices: indices,
}
}

distribute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 在上面的Process代码中,我们看到了将数据存储到Indexer后,调用了一个分发的函数
s.processor.distribute()

// 分发process的创建
func NewSharedIndexInformer() SharedIndexInformer {
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
}
return sharedIndexInformer
}

// sharedProcessor的结构
type sharedProcessor struct {
listenersStarted bool
// 读写锁
listenersLock sync.RWMutex
// 普通监听列表
listeners []*processorListener
// 同步监听列表
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
}

// 查看distribute函数
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// 将object分发到 同步监听 或者 普通监听 的列表
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}

// 这个add的操作是利用了channel
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}

Summary

  1. Informer 依赖于 Reflector 模块,它有个组件为 xxxInformer,如 podInformer
  2. 具体资源的 Informer 包含了一个连接到kube-apiserverclient,通过ListWatch接口查询资源变更情况
  3. 检测到资源发生变化后,通过Controller 将数据放入队列DeltaFIFOQueue里,生产阶段完成
  4. DeltaFIFOQueue的另一端,有消费者在不停地处理资源变化的事件,处理逻辑主要分2步
    1. 将数据保存到本地存储Indexer,它的底层实现是一个并发安全的threadSafeMap
    2. 有些组件需要实时关注资源变化,会实时监听listen,就将事件分发到对应注册上来的listener上,自行处理

Github: https://github.com/Junedayday/code_reading

Blog: http://junes.tech/

Bilibili:https://space.bilibili.com/293775192

公众号:golangcoding

聚焦目标

了解Informer是如何从kube-apiserver监听资源变化的情况

目录

  1. 什么是Informer
  2. Shared Informer的实现
  3. PodInformer的背后的实现
  4. 聚焦Reflect结构
  5. 本节小节

Informer

什么是Informer?这一节,我将先抛开代码,重点讲一下这个Informer,因为它是理解k8s运行机制的核心概念。

我没有在官方文档中找到Informer的明确定义,中文直译为通知器。从这个链接中,我们可以看到一个自定义资源的的处理流程。

我简单概况下,Informer的核心功能是 获取并监听(ListAndWatch)对应资源的增删改,触发相应的事件操作(ResourceEventHandler)

Shared Informer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/*
client 是连接到 kube-apiserver 的客户端。
我们要理解k8s的设计:
1. etcd是核心的数据存储,对资源的修改会进行持久化
2. 只有kube-apiserver可以访问etcd
所以,kube-scheduler要了解资源的变化情况,只能通过kube-apiserver
*/

// 定义了 Shared Informer,其中这个client是用来连接kube-apiserver的
c.InformerFactory = informers.NewSharedInformerFactory(client, 0)

// 这里解答了为什么叫shared:一个资源会对应多个Informer,会导致效率低下,所以让一个资源对应一个sharedInformer,而一个sharedInformer内部自己维护多个Informer
type sharedInformerFactory struct {
client kubernetes.Interface
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
lock sync.Mutex
defaultResync time.Duration
customResync map[reflect.Type]time.Duration
// 这个map就是维护多个Informer的关键实现
informers map[reflect.Type]cache.SharedIndexInformer
startedInformers map[reflect.Type]bool
}

// 运行函数
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
// goroutine异步处理
go informer.Run(stopCh)
// 标记为已经运行,这样即使下次Start也不会重复运行
f.startedInformers[informerType] = true
}
}
}

// 查找对应的informer
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
// 找到就直接返回
informerType := reflect.TypeOf(obj)
informer, exists := f.informers[informerType]
if exists {
return informer
}

resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}
// 没找到就会新建
informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
}

// SharedInformerFactory 是 sharedInformerFactory 的接口定义
type SharedInformerFactory interface {
// 我们这一阶段关注的Pod的Informer,属于核心资源
Core() core.Interface
}

// core.Interface的定义
type Interface interface {
// V1 provides access to shared informers for resources in V1.
V1() v1.Interface
}

// v1.Interface 的定义
type Interface interface {
// Pod的定义
Pods() PodInformer
}

// PodInformer 是对应的接口
type PodInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.PodLister
}
// podInformer 是具体的实现
type podInformer struct {
factory internalinterfaces.SharedInformerFactory
tweakListOptions internalinterfaces.TweakListOptionsFunc
namespace string
}

// 最后,我们可以看到podInformer调用了InformerFor函数进行了添加
func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

PodInformer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// 实例化PodInformer,把对应的List/Watch操作方法传入到实例化函数,生成统一的SharedIndexInformer接口
func NewFilteredPodInformer() cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
// List和Watch实现从PodInterface里面查询
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
},
},
&corev1.Pod{},
resyncPeriod,
indexers,
)
}

// 我们先看看Pod基本的List和Watch是怎么定义的
// Pod基本的增删改查等操作
type PodInterface interface {
List(ctx context.Context, opts metav1.ListOptions) (*v1.PodList, error)
Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
...
}
// pods 是PodInterface的实现
type pods struct {
client rest.Interface
ns string
}

// List 和 Watch 是依赖客户端,也就是从kube-apiserver中查询的
func (c *pods) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PodList, err error) {
err = c.client.Get().
Namespace(c.ns).
Resource("pods").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Do(ctx).
Into(result)
return
}
func (c *pods) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
return c.client.Get().
Namespace(c.ns).
Resource("pods").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Watch(ctx)
}

// 在上面,我们看到了异步运行Informer的代码 go informer.Run(stopCh),我们看看是怎么run的
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
// 这里有个 DeltaFIFO 的对象,
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})
// 传入这个fifo到cfg
cfg := &Config{
Queue: fifo,
...
}
// 新建controller
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()

s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// 运行controller
s.controller.Run(stopCh)
}

// Controller的运行
func (c *controller) Run(stopCh <-chan struct{}) {
//
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}

c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()

var wg wait.Group
// 生产,往Queue里放数据
wg.StartWithChannel(stopCh, r.Run)
// 消费,从Queue消费数据
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}

Reflect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// 我们再回头看看这个Reflect结构
r := NewReflector(
// ListerWatcher 我们已经有了解,就是通过client监听kube-apiserver暴露出来的Resource
c.config.ListerWatcher,
c.config.ObjectType,
// Queue 是我们前文看到的一个 DeltaFIFOQueue,认为这是一个先进先出的队列
c.config.Queue,
c.config.FullResyncPeriod,
)

func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
// 调用了ListAndWatch
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// watchHandler顾名思义,就是Watch到对应的事件,调用对应的Handler
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
switch {
case isExpiredError(err):
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
}
}
return nil
}
}
}

func (r *Reflector) watchHandler() error {
loop:
for {
// 一个经典的GO语言select监听多channel的模式
select {
// 整体的step channel
case <-stopCh:
return errorStopRequested
// 错误相关的error channel
case err := <-errc:
return err
// 接收事件event的channel
case event, ok := <-w.ResultChan():
// channel被关闭,退出loop
if !ok {
break loop
}

// 一系列的资源验证代码跳过

switch event.Type {
// 增删改三种Event,分别对应到去store,即DeltaFIFO中,操作object
case watch.Added:
err := r.store.Add(event.Object)
case watch.Modified:
err := r.store.Update(event.Object)
case watch.Deleted:
err := r.store.Delete(event.Object)
case watch.Bookmark:
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
}
}
}
return nil
}

Summary

  1. Informer 依赖于 Reflector 模块,它有个组件为 xxxInformer,如 podInformer
  2. 具体资源的 Informer 包含了一个连接到kube-apiserverclient,通过ListWatch接口查询资源变更情况
  3. 检测到资源发生变化后,通过Controller 将数据放入队列DeltaFIFOQueue里,生产阶段完成

Github: https://github.com/Junedayday/code_reading

Blog: http://junes.tech/

Bilibili:https://space.bilibili.com/293775192

公众号:golangcoding

聚焦目标

理解kube-scheduler启动的流程

目录

  1. kube-scheduler的启动
  2. Scheduler的注册
  3. 了解一个最简单的算法NodeName

run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// kube-scheduler 类似于kube-apiserver,是个常驻进程,查看其对应的Run函数
func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
// 根据入参,返回配置cc与调度sched
cc, sched, err := Setup(ctx, opts, registryOptions...)
// 运行
return Run(ctx, cc, sched)
}

// 运行调度策略
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
// 将配置注册到configz中,会保存在一个全局map里
if cz, err := configz.New("componentconfig"); err == nil {
cz.Set(cc.ComponentConfig)
} else {
return fmt.Errorf("unable to register configz: %s", err)
}

// 事件广播管理器,涉及到k8s里的一个核心资源 - Event事件,暂时不细讲
cc.EventBroadcaster.StartRecordingToSink(ctx.Done())

// 健康监测的服务
var checks []healthz.HealthChecker

// 异步各个Informer。Informer是kube-scheduler的一个重点
go cc.PodInformer.Informer().Run(ctx.Done())
cc.InformerFactory.Start(ctx.Done())
cc.InformerFactory.WaitForCacheSync(ctx.Done())

// 选举Leader的工作,因为Master节点可以存在多个,选举一个作为Leader
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
// 两个钩子函数,开启Leading时运行调度,结束时打印报错
OnStartedLeading: sched.Run,
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
}
leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
if err != nil {
return fmt.Errorf("couldn't create leader elector: %v", err)
}
// 参与选举的会持续通信
leaderElector.Run(ctx)
return fmt.Errorf("lost lease")
}

// 不参与选举的,也就是单节点的情况时,在这里运行
sched.Run(ctx)
return fmt.Errorf("finished without leader elect")
}

/*
到这里,我们已经接触了kube-scheduler的2个核心概念:
1. scheduler:正如程序名kube-scheduler,这个进程的核心作用是进行调度,会涉及到多种调度策略
2. Informer:k8s中有各种类型的资源,包括自定义的。而Informer的实现就将调度和资源结合了起来
*/

Scheduler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// 在创建scheduler的函数
func Setup() {
// 创建scheduler,包括多个选项
sched, err := scheduler.New(cc.Client,
cc.InformerFactory,
cc.PodInformer,
recorderFactory,
ctx.Done(),
scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
)
return &cc, sched, nil
}

// 我们再看一下New这个函数
func New() (*Scheduler, error) {
// 先注册了所有的算法,保存到一个 map[string]PluginFactory 中
registry := frameworkplugins.NewInTreeRegistry()

// 重点看一下Scheduler的创建过程
var sched *Scheduler
source := options.schedulerAlgorithmSource
switch {
// 根据Provider创建,重点看这里
case source.Provider != nil:
sc, err := configurator.createFromProvider(*source.Provider)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
}
sched = sc
// 根据用户设置创建,来自文件或者ConfigMap
case source.Policy != nil:
policy := &schedulerapi.Policy{}
switch {
case source.Policy.File != nil:
if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
return nil, err
}
case source.Policy.ConfigMap != nil:
if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
return nil, err
}
}
configurator.extenders = policy.Extenders
sc, err := configurator.createFromConfig(*policy)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
}
sched = sc
default:
return nil, fmt.Errorf("unsupported algorithm source: %v", source)
}
}

// 创建
func (c *Configurator) createFromProvider(providerName string) (*Scheduler, error) {
klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
// 实例化算法的Registry
r := algorithmprovider.NewRegistry()
defaultPlugins, exist := r[providerName]
if !exist {
return nil, fmt.Errorf("algorithm provider %q is not registered", providerName)
}

// 将各种算法作为plugin进行设置
for i := range c.profiles {
prof := &c.profiles[i]
plugins := &schedulerapi.Plugins{}
plugins.Append(defaultPlugins)
plugins.Apply(prof.Plugins)
prof.Plugins = plugins
}
return c.create()
}

// 从这个初始化中可以看到,主要分为2类:默认与ClusterAutoscaler两种算法
func NewRegistry() Registry {
// 默认算法包括过滤、打分、绑定等,有兴趣的去源码中逐个阅读
defaultConfig := getDefaultConfig()
applyFeatureGates(defaultConfig)
// ClusterAutoscaler 是集群自动扩展的算法,被单独拎出来,
caConfig := getClusterAutoscalerConfig()
applyFeatureGates(caConfig)

return Registry{
schedulerapi.SchedulerDefaultProviderName: defaultConfig,
ClusterAutoscalerProvider: caConfig,
}
}
/*
在这里,熟悉k8s的朋友会有个疑问:以前听说kubernets的调度有个Predicate和Priority两个算法,这里怎么没有分类?
这个疑问,我们在后面具体场景时再进行分析。
*/

NodeName

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// 为了加深大家对Plugin的印象,我选择一个最简单的示例:根据Pod的spec字段中的NodeName,分配到指定名称的节点
package nodename

import (
"context"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)

type NodeName struct{}

var _ framework.FilterPlugin = &NodeName{}

// 这个调度算法的名称和错误信息
const (
Name = "NodeName"
ErrReason = "node(s) didn't match the requested hostname"
)

// 调度算法的明明
func (pl *NodeName) Name() string {
return Name
}

// 过滤功能,这个就是NodeName算法的实现
func (pl *NodeName) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
// 找不到Node
if nodeInfo.Node() == nil {
return framework.NewStatus(framework.Error, "node not found")
}
// 匹配不到,返回错误
if !Fits(pod, nodeInfo) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReason)
}
return nil
}

/*
匹配的算法,两种条件满足一个就认为成功
1. spec没有填NodeName
2.spec的NodeName和节点匹配
*/
func Fits(pod *v1.Pod, nodeInfo *framework.NodeInfo) bool {
return len(pod.Spec.NodeName) == 0 || pod.Spec.NodeName == nodeInfo.Node().Name
}

// 初始化
func New(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) {
return &NodeName{}, nil
}

Github: https://github.com/Junedayday/code_reading

Blog: http://junes.tech/

Bilibili:https://space.bilibili.com/293775192

公众号:golangcoding

聚焦目标

理解Pod发送到kube-apiserver后是怎么保存的

目录

  1. RESTCreateStrategy创建的预处理
  2. REST Pod数据的存储
  3. 存储的底层实现
  4. kube-apiserver第一阶段源码阅读总结

RESTCreateStrategy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// podStrategy 是封装了 Pod 的各类动作,这里我们先关注create这个操作
type podStrategy struct {
runtime.ObjectTyper
names.NameGenerator
}

// podStrategy 的接口
type RESTCreateStrategy interface {
runtime.ObjectTyper
names.NameGenerator
// 是否属于当前的 namespace
NamespaceScoped() bool
// 准备创建前的检查
PrepareForCreate(ctx context.Context, obj runtime.Object)
// 验证资源对象
Validate(ctx context.Context, obj runtime.Object) field.ErrorList
// 规范化
Canonicalize(obj runtime.Object)
}

// 完成了检查,我们就要保存数据了

Storage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// PodStorage 是 Pod 存储的实现,里面包含了多个存储的定义
type PodStorage struct {
// REST implements a RESTStorage for pods
Pod *REST
// BindingREST implements the REST endpoint for binding pods to nodes when etcd is in use.
Binding *BindingREST
// LegacyBindingREST implements the REST endpoint for binding pods to nodes when etcd is in use.
LegacyBinding *LegacyBindingREST
Eviction *EvictionREST
// StatusREST implements the REST endpoint for changing the status of a pod.
Status *StatusREST
// EphemeralContainersREST implements the REST endpoint for adding EphemeralContainers
EphemeralContainers *EphemeralContainersREST
Log *podrest.LogREST
Proxy *podrest.ProxyREST
Exec *podrest.ExecREST
Attach *podrest.AttachREST
PortForward *podrest.PortForwardREST
}

/*
从上一节的map关系中,保存在REST中
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
}
*/
type REST struct {
*genericregistry.Store
proxyTransport http.RoundTripper
}

// Store是一个通用的数据结构
type Store struct {
// Storage定义
Storage DryRunnableStorage
}

// DryRunnableStorage中的Storage是一个Interface
type DryRunnableStorage struct {
Storage storage.Interface
Codec runtime.Codec
}

func (s *DryRunnableStorage) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64, dryRun bool) error {
if dryRun {
if err := s.Storage.Get(ctx, key, storage.GetOptions{}, out); err == nil {
return storage.NewKeyExistsError(key, 0)
}
return s.copyInto(obj, out)
}
// 这里,就是Create的真正调用
return s.Storage.Create(ctx, key, obj, out, ttl)
}

Storage Implement

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Storage Interface 的定义,包括基本的增删改查,以及watch等等进阶操作
type Interface interface {
Versioner() Versioner
Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error
Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions, validateDeletion ValidateObjectFunc) error
Watch(ctx context.Context, key string, opts ListOptions) (watch.Interface, error)
WatchList(ctx context.Context, key string, opts ListOptions) (watch.Interface, error)
Get(ctx context.Context, key string, opts GetOptions, objPtr runtime.Object) error
GetToList(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error
List(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error
GuaranteedUpdate(
ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
precondtions *Preconditions, tryUpdate UpdateFunc, suggestion ...runtime.Object) error
Count(key string) (int64, error)
}

func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc, error) {
return factory.Create(*config)
}

func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
switch c.Type {
// 已经不支持etcd2
case "etcd2":
return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type)
// 默认为etcd3版本
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3Storage(c)
default:
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}

Summary

我们对第一阶段学习kube-apiserver的知识点进行总结:

  1. kube-apiserver 包含三个apiserverAPIExtensionsServerKubeAPIServerAggregatorServer
    1. 三个APIServer底层均依赖通用的GenericServer,使用go-restful对外提供RESTful风格的API服务
  2. kube-apiserver 对请求进行 AuthenticationAuthorizationAdmission三层验证
  3. 完成验证后,请求会根据路由规则,触发到对应资源的handler,主要包括数据的预处理保存
  4. kube-apiserver 的底层存储为etcd v3,它被抽象为一种RESTStorage,使请求和存储操作一一对应

Github: https://github.com/Junedayday/code_reading

Blog: http://junes.tech/

Bilibili:https://space.bilibili.com/293775192

公众号:golangcoding

聚焦目标

理解kube-apiserver是中的管理核心资源的KubeAPIServer是怎么启动的

目录

  1. genericServer的创建
  2. 创建REST的Handler
  3. Generic的API路由规则
  4. 初始化核心Apiserver
  5. 核心资源的API路由规则
  6. 创建Pod的函数

GenericServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 在APIExtensionsServer、KubeAPIServer和AggregatorServer三种Server启动时,我们都能发现这么一个函数
// APIExtensionsServer
genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
// KubeAPIServer
s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
// AggregatorServer
genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)

// 都通过GenericConfig创建了genericServer,我们先大致浏览下
func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
// 新建Handler
apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())

// 实例化一个Server
s := &GenericAPIServer{
...
}

// 处理钩子hook操作
for k, v := range delegationTarget.PostStartHooks() {
s.postStartHooks[k] = v
}

for k, v := range delegationTarget.PreShutdownHooks() {
s.preShutdownHooks[k] = v
}

// 健康监测
for _, delegateCheck := range delegationTarget.HealthzChecks() {
skip := false
for _, existingCheck := range c.HealthzChecks {
if existingCheck.Name() == delegateCheck.Name() {
skip = true
break
}
}
if skip {
continue
}
s.AddHealthChecks(delegateCheck)
}

// 安装API相关参数,这个是重点
installAPI(s, c.Config)

return s, nil
}

NewAPIServerHandler

1
2
3
4
5
func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
// 采用了 github.com/emicklei/go-restful 这个库作为 RESTful 接口的设计,目前了解即可
gorestfulContainer := restful.NewContainer()

}

installAPI

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func installAPI(s *GenericAPIServer, c *Config) {
// 添加 /index.html 路由规则
if c.EnableIndex {
routes.Index{}.Install(s.listedPathProvider, s.Handler.NonGoRestfulMux)
}
// 添加go语言 /pprof 的路由规则,常用于性能分析
if c.EnableProfiling {
routes.Profiling{}.Install(s.Handler.NonGoRestfulMux)
if c.EnableContentionProfiling {
goruntime.SetBlockProfileRate(1)
}
routes.DebugFlags{}.Install(s.Handler.NonGoRestfulMux, "v", routes.StringFlagPutHandler(logs.GlogSetter))
}
// 添加监控相关的 /metrics 的指标路由规则
if c.EnableMetrics {
if c.EnableProfiling {
routes.MetricsWithReset{}.Install(s.Handler.NonGoRestfulMux)
} else {
routes.DefaultMetrics{}.Install(s.Handler.NonGoRestfulMux)
}
}
// 添加版本 /version 的路由规则
routes.Version{Version: c.Version}.Install(s.Handler.GoRestfulContainer)
// 开启服务发现
if c.EnableDiscovery {
s.Handler.GoRestfulContainer.Add(s.DiscoveryGroupManager.WebService())
}
if feature.DefaultFeatureGate.Enabled(features.APIPriorityAndFairness) {
c.FlowControl.Install(s.Handler.NonGoRestfulMux)
}
}

Apiserver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {
// genericServer的初始化
s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
// 核心KubeAPIServer的实例化
m := &Master{
GenericAPIServer: s,
ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
}

// 注册Legacy API的注册
if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{}
if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
return nil, err
}
}
// REST接口的存储定义,可以看到很多k8s上的常见定义,比如node节点/storage存储/event事件等等
restStorageProviders := []RESTStorageProvider{
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
coordinationrest.RESTStorageProvider{},
discoveryrest.StorageProvider{},
extensionsrest.RESTStorageProvider{},
networkingrest.RESTStorageProvider{},
noderest.RESTStorageProvider{},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
schedulingrest.RESTStorageProvider{},
settingsrest.RESTStorageProvider{},
storagerest.RESTStorageProvider{},
flowcontrolrest.RESTStorageProvider{},
// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
// See https://github.com/kubernetes/kubernetes/issues/42392
appsrest.StorageProvider{},
admissionregistrationrest.RESTStorageProvider{},
eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
}
// 注册API
if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
return nil, err
}
// 添加Hook
m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error {
})
return m, nil
}

注册API的关键在InstallLegacyAPIInstallAPIs,如果你对kubernetes的资源有一定的了解,会知道核心资源都放在Legacy中(如果不了解的话,点击函数看一下,就能有所有了解)

InstallLegacyAPI

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error {
// RESTStorage的初始化
legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)

// 前缀为 /api,注册上对应的Version和Resource
// Pod作为核心资源,没有Group的概念
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
return fmt.Errorf("error in registering group versions: %v", err)
}
return nil
}

// 我们再细看这个RESTStorage的初始化
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
// pod 模板
podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter)
// event事件
eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))
// limitRange资源限制
limitRangeStorage, err := limitrangestore.NewREST(restOptionsGetter)
// resourceQuota资源配额
resourceQuotaStorage, resourceQuotaStatusStorage, err := resourcequotastore.NewREST(restOptionsGetter)
// secret加密
secretStorage, err := secretstore.NewREST(restOptionsGetter)
// PV 存储
persistentVolumeStorage, persistentVolumeStatusStorage, err := pvstore.NewREST(restOptionsGetter)
// PVC 存储
persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage, err := pvcstore.NewREST(restOptionsGetter)
// ConfigMap 配置
configMapStorage, err := configmapstore.NewREST(restOptionsGetter)
// 等等核心资源,暂不一一列举

// pod模板,我们的示例nginx-pod属于这个类型的资源
podStorage, err := podstore.NewStorage()

// 保存storage的对应关系
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
"pods/attach": podStorage.Attach,
"pods/status": podStorage.Status,
"pods/log": podStorage.Log,
"pods/exec": podStorage.Exec,
"pods/portforward": podStorage.PortForward,
"pods/proxy": podStorage.Proxy,
"pods/binding": podStorage.Binding,
"bindings": podStorage.LegacyBinding,
...
}
}

Create Pod

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 查看Pod初始化
func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) (PodStorage, error) {

store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &api.Pod{} },
NewListFunc: func() runtime.Object { return &api.PodList{} },
PredicateFunc: registrypod.MatchPod,
DefaultQualifiedResource: api.Resource("pods"),
// 增改删的策略
CreateStrategy: registrypod.Strategy,
UpdateStrategy: registrypod.Strategy,
DeleteStrategy: registrypod.Strategy,
ReturnDeletedObject: true,

TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
}
}
// 查看 Strategy 的初始化
var Strategy = podStrategy{legacyscheme.Scheme, names.SimpleNameGenerator}

// 又查询到Scheme的初始化。Schema可以理解为Kubernetes的注册表,即所有的资源类型必须先注册进Schema才可使用
var Scheme = runtime.NewScheme()

Github: https://github.com/Junedayday/code_reading

Blog: http://junes.tech/

Bilibili:https://space.bilibili.com/293775192

公众号:golangcoding

聚焦目标

理解kubectl是怎么向kube-apiserver发送请求的

目录

  1. 向kube-apiserver发送请求
  2. RESTful客户端是怎么创建的
  3. Object是怎么生成的
  4. 发送post请求
  5. kubectl第一阶段源码阅读总结

send request

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 在RunCreate函数中,关键的发送函数
obj, err := resource.
NewHelper(info.Client, info.Mapping).
DryRun(o.DryRunStrategy == cmdutil.DryRunServer).
WithFieldManager(o.fieldManager).
Create(info.Namespace, true, info.Object)

// 进入create函数,查看到
m.createResource(m.RESTClient, m.Resource, namespace, obj, options)

// 对应的实现为
func (m *Helper) createResource(c RESTClient, resource, namespace string, obj runtime.Object, options *metav1.CreateOptions) (runtime.Object, error) {
return c.Post().
NamespaceIfScoped(namespace, m.NamespaceScoped).
Resource(resource).
VersionedParams(options, metav1.ParameterCodec).
Body(obj).
Do(context.TODO()).
Get()
}

/*

到这里,我们发现了2个关键性的定义:
1. RESTClient 与kube-apiserver交互的RESTful风格的客户端
2. runtime.Object 资源对象的抽象,包括Pod/Deployment/Service等各类资源

*/

RESTful Client

我们先来看看,与kube-apiserver交互的Client是怎么创建的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 从传入参数来看,数据来源于Info这个结构
r.Visit(func(info *resource.Info, err error) error{})

// 而info来源于前面的Builder,前面部分都是将Builder参数化,核心的生成为Do函数
r := f.NewBuilder().
Unstructured().
Schema(schema).
ContinueOnError().
NamespaceParam(cmdNamespace).DefaultNamespace().
FilenameParam(enforceNamespace, &o.FilenameOptions).
LabelSelectorParam(o.Selector).
Flatten().
Do()

// 大致看一下这些函数,我们可以在Unstructured()中看到getClient函数,其实这就是我们要找的函数
func (b *Builder) getClient(gv schema.GroupVersion) (RESTClient, error)

// 从返回值来看,client包括默认的REST client和配置选项
NewClientWithOptions(client, b.requestTransforms...)

// 这个Client会在kubernetes项目中大量出现,它是与kube-apiserver交互的核心组件,以后再深入。

Object

Object这个对象是怎么获取到的呢?因为我们的数据源是来自文件的,那么我们最直观的想法就是FileVisitor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (v *FileVisitor) Visit(fn VisitorFunc) error {
// 省略读取这块的代码,底层调用的是StreamVisitor的逻辑
return v.StreamVisitor.Visit(fn)
}

func (v *StreamVisitor) Visit(fn VisitorFunc) error {
d := yaml.NewYAMLOrJSONDecoder(v.Reader, 4096)
for {
// 这里就是返回info的地方
info, err := v.infoForData(ext.Raw, v.Source)
}
}

// 再往下一层看,来到mapper层,也就是kubernetes的资源对象映射关系
func (m *mapper) infoForData(data []byte, source string) (*Info, error){
// 这里就是我们返回Object的地方,其中GVK是Group/Version/Kind的缩写,后续我们会涉及
obj, gvk, err := m.decoder.Decode(data, nil, nil)
}

这时,我们想回头去看,这个mapper是在什么时候被定义的?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 在Builder初始化中,我们就找到了
func (b *Builder) Unstructured() *Builder {
b.mapper = &mapper{
localFn: b.isLocal,
restMapperFn: b.restMapperFn,
clientFn: b.getClient,
// 我们查找资源用到的是这个decoder
decoder: &metadataValidatingDecoder{unstructured.UnstructuredJSONScheme},
}
return b
}

// 逐层往下找,对应的Decode方法的实现,就是对应的数据解析成data:
func (s unstructuredJSONScheme) decode(data []byte) (runtime.Object, error) {
// 细节暂时忽略
}

Post

了解了REST ClientObject的大致产生逻辑后,我们再回过头来看发送的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// RESTful接口风格中,POST请求对应的就是CREATE方法
c.Post().
NamespaceIfScoped(namespace, m.NamespaceScoped).
Resource(resource).
VersionedParams(options, metav1.ParameterCodec).
Body(obj).
Do(context.TODO()).
Get()

// Do方法,发送请求
err := r.request(ctx, func(req *http.Request, resp *http.Response) {
result = r.transformResponse(resp, req)
})

// Get方法,获取请求的返回结果,用来打印状态
switch t := out.(type) {
case *metav1.Status:
if t.Status != metav1.StatusSuccess {
return nil, errors.FromObject(t)
}
}

Summary

到这里我们对kubectl的功能有了初步的了解,希望大家对以下的关键内容有所掌握:

  1. 命令行采用了cobra库,主要支持7个大类的命令;
  2. 掌握Visitor设计模式,这个是kubectl实现各类资源对象的解析和校验的核心;
  3. 初步了解RESTClientObject这两个对象,它们是贯穿kubernetes的核心概念;
  4. 调用逻辑
    1. cobra匹配子命令
    2. 用Visitor模式构建Builder
    3. 用RESTClient将Object发送到kube-apiserver

Github: https://github.com/Junedayday/code_reading

Blog: http://junes.tech/

Bilibili:https://space.bilibili.com/293775192

公众号:golangcoding

聚焦目标

理解启动kube-apiserver的权限相关的三个核心概念 Authentication/Authorization/Admission

目录

  1. kube-apiserver的启动
  2. kube-apiserver的三个Server
  3. KubeAPIServer的创建过程
    1. 通用配置概况
    2. 通用配置中的认证
    3. 通用配置中的授权
    4. 通用配置中的准入机制

Run

1
2
3
4
5
6
7
8
9
10
11
12
// 类似kubectl的源代码,kube-apiserver的命令行工具也使用了cobra,我们很快就能找到启动的入口
RunE: func(cmd *cobra.Command, args []string) error {
// 这里包含2个参数,前者是参数completedOptions,后者是一个stopCh <-chan struct{}
return Run(completedOptions, genericapiserver.SetupSignalHandler())
}

/*
在这里,我们可以和kubectl结合起来思考:
kubectl是一个命令行工具,执行完命令就退出;kube-apiserver是一个常驻的服务器进程,监听端口
这里引入了一个stopCh <-chan struct{},可以在启动后,用一个 <-stopCh 作为阻塞,使程序不退出
用channel阻塞进程退出,对比传统的方法 - 用一个永不退出的for循环,是一个很优雅的实现
*/

Three Servers

1
2
3
4
5
6
7
8
9
10
// 在CreateServerChain这个函数下,创建了3个server
func CreateServerChain(){
// API扩展服务,主要针对CRD
createAPIExtensionsServer(){}
// API核心服务,包括常见的Pod/Deployment/Service,我们今天的重点聚焦在这里
// 我会跳过很多非核心的配置参数,一开始就去研究细节,很影响整体代码的阅读效率
CreateKubeAPIServer(){}
// API聚合服务,主要针对metrics
createAggregatorServer(){}
}

KubeAPIServer

1
2
3
4
5
// 创建配置的流程
func CreateKubeAPIServerConfig(){
// 创建通用配置genericConfig
genericConfig, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
}

GenericConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 通用配置的创建
func buildGenericConfig(s *options.ServerRunOptions,proxyTransport *http.Transport){
// Insecure对应的非安全的通信,也就是HTTP
if lastErr = s.InsecureServing...
// Secure对应的就是HTTPS
if lastErr = s.SecureServing...
// OpenAPIConfig是对外提供的API文档
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig()
// 这一块是storageFactory的实例化,可以看到采用的是etcd作为存储方案
storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
storageFactory, lastErr = completedStorageFactoryConfig.New()
// Authentication 认证相关
if lastErr = s.Authentication.ApplyTo()...
// Authorization 授权相关
genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer()
// Admission 准入机制
err = s.Admission.ApplyTo()
}

Authentication

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func (o *BuiltInAuthenticationOptions) ApplyTo(){
// 前面都是对认证config进行参数设置,这里才是真正的实例化
authInfo.Authenticator, openAPIConfig.SecurityDefinitions, err = authenticatorConfig.New()
}

// New这块的代码,我们要抓住核心变量authenticators和tokenAuthenticators,也就是各种认证方法
func (config Config) New() (authenticator.Request, *spec.SecurityDefinitions, error) {
// 核心变量authenticators和tokenAuthenticators
var authenticators []authenticator.Request
var tokenAuthenticators []authenticator.Token

if config.RequestHeaderConfig != nil {
// 1. 添加requestHeader
authenticators = append(authenticators, authenticator.WrapAudienceAgnosticRequest(config.APIAudiences, requestHeaderAuthenticator))
}

if config.ClientCAContentProvider != nil {
// 2. 添加ClientCA
authenticators = append(authenticators, certAuth)
}

if len(config.TokenAuthFile) > 0 {
// 3. token 添加tokenfile
tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, tokenAuth))
}

// 4. token 添加 service account,分两种来源
if len(config.ServiceAccountKeyFiles) > 0 {
tokenAuthenticators = append(tokenAuthenticators, serviceAccountAuth)
}
if utilfeature.DefaultFeatureGate.Enabled(features.TokenRequest) && config.ServiceAccountIssuer != "" {
tokenAuthenticators = append(tokenAuthenticators, serviceAccountAuth)
}
if config.BootstrapToken {
if config.BootstrapTokenAuthenticator != nil {
// 5. token 添加 bootstrap
tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, config.BootstrapTokenAuthenticator))
}
}

if len(config.OIDCIssuerURL) > 0 && len(config.OIDCClientID) > 0 {
// 6. token 添加 oidc
Authenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, oidcAuth))
}
if len(config.WebhookTokenAuthnConfigFile) > 0 {
// 7. token 添加 webhook
tokenAuthenticators = append(tokenAuthenticators, webhookTokenAuth)
}

// 8. 组合tokenAuthenticators到tokenAuthenticators中
if len(tokenAuthenticators) > 0 {
tokenAuth := tokenunion.New(tokenAuthenticators...)
if config.TokenSuccessCacheTTL > 0 || config.TokenFailureCacheTTL > 0 {
tokenAuth = tokencache.New(tokenAuth, true, config.TokenSuccessCacheTTL, config.TokenFailureCacheTTL)
}
authenticators = append(authenticators, bearertoken.New(tokenAuth), websocket.NewProtocolAuthenticator(tokenAuth))
}

// 9. 没有任何认证方式且启用了Anonymous
if len(authenticators) == 0 {
if config.Anonymous {
return anonymous.NewAuthenticator(), &securityDefinitions, nil
}
return nil, &securityDefinitions, nil
}

// 10. 组合authenticators
authenticator := union.New(authenticators...)

return authenticator, &securityDefinitions, nil
}

复杂的Authentication模块的初始化顺序我们看完了,有初步的了解即可,没必要去强制记忆其中的加载顺序。

Authorization

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
func BuildAuthorizer(){
// 与上面一致,实例化是在这个New中
return authorizationConfig.New()
}

// 不得不说,Authorizer这块的阅读体验更好
func (config Config) New() (authorizer.Authorizer, authorizer.RuleResolver, error) {
// 必须传入一个Authorizer机制
if len(config.AuthorizationModes) == 0 {
return nil, nil, fmt.Errorf("at least one authorization mode must be passed")
}

var (
authorizers []authorizer.Authorizer
ruleResolvers []authorizer.RuleResolver
)

for _, authorizationMode := range config.AuthorizationModes {
// 具体的mode定义,可以跳转到对应的链接去看,今天不细讲
switch authorizationMode {
case modes.ModeNode:
authorizers = append(authorizers, nodeAuthorizer)
ruleResolvers = append(ruleResolvers, nodeAuthorizer)

case modes.ModeAlwaysAllow:
authorizers = append(authorizers, alwaysAllowAuthorizer)
ruleResolvers = append(ruleResolvers, alwaysAllowAuthorizer)

case modes.ModeAlwaysDeny:
authorizers = append(authorizers, alwaysDenyAuthorizer)
ruleResolvers = append(ruleResolvers, alwaysDenyAuthorizer)

case modes.ModeABAC:
authorizers = append(authorizers, abacAuthorizer)
ruleResolvers = append(ruleResolvers, abacAuthorizer)

case modes.ModeWebhook:
authorizers = append(authorizers, webhookAuthorizer)
ruleResolvers = append(ruleResolvers, webhookAuthorizer)

case modes.ModeRBAC:
authorizers = append(authorizers, rbacAuthorizer)
ruleResolvers = append(ruleResolvers, rbacAuthorizer)
default:
return nil, nil, fmt.Errorf("unknown authorization mode %s specified", authorizationMode)
}
}

return union.New(authorizers...), union.NewRuleResolvers(ruleResolvers...), nil
}

const (
// ModeAlwaysAllow is the mode to set all requests as authorized
ModeAlwaysAllow string = "AlwaysAllow"
// ModeAlwaysDeny is the mode to set no requests as authorized
ModeAlwaysDeny string = "AlwaysDeny"
// ModeABAC is the mode to use Attribute Based Access Control to authorize
ModeABAC string = "ABAC"
// ModeWebhook is the mode to make an external webhook call to authorize
ModeWebhook string = "Webhook"
// ModeRBAC is the mode to use Role Based Access Control to authorize
ModeRBAC string = "RBAC"
// ModeNode is an authorization mode that authorizes API requests made by kubelets.
ModeNode string = "Node"
)

Admission

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// 查看定义
err = s.Admission.ApplyTo()
func (a *AdmissionOptions) ApplyTo(){
return a.GenericAdmission.ApplyTo()
}

func (ps *Plugins) NewFromPlugins(){
for _, pluginName := range pluginNames {
// InitPlugin 为初始化的工作
plugin, err := ps.InitPlugin(pluginName, pluginConfig, pluginInitializer)
if err != nil {
return nil, err
}
}
}

func (ps *Plugins) InitPlugin(name string, config io.Reader, pluginInitializer PluginInitializer) (Interface, error){
// 获取plugin
plugin, found, err := ps.getPlugin(name, config)
}

// 查看一下Interface的定义,就是对准入机制的控制
// Interface is an abstract, pluggable interface for Admission Control decisions.
type Interface interface {
Handles(operation Operation) bool
}

// 再去看看获取plugin的地方
func (ps *Plugins) getPlugin(name string, config io.Reader) (Interface, bool, error) {
ps.lock.Lock()
defer ps.lock.Unlock()
// 我们再去研究ps.registry这个参数是在哪里被初始化的
f, found := ps.registry[name]
}

// 接下来,我们从kube-apiserver启动过程,逐步找到Admission被初始化的地方
// 启动命令
command := app.NewAPIServerCommand()
// server配置
s := options.NewServerRunOptions()
// admission选项
Admission: kubeoptions.NewAdmissionOptions()
// 注册准入机制
RegisterAllAdmissionPlugins(options.Plugins)
// 准入机制的所有内容
func RegisterAllAdmissionPlugins(plugins *admission.Plugins){
// 这里有很多plugin的注册
}

// 往上翻,我们能找到所有plugin,也就是准入机制的定义
var AllOrderedPlugins = []string{
admit.PluginName, // AlwaysAdmit
autoprovision.PluginName, // NamespaceAutoProvision
lifecycle.PluginName, // NamespaceLifecycle
exists.PluginName, // NamespaceExists
scdeny.PluginName, // SecurityContextDeny
antiaffinity.PluginName, // LimitPodHardAntiAffinityTopology
podpreset.PluginName, // PodPreset
limitranger.PluginName, // LimitRanger
serviceaccount.PluginName, // ServiceAccount
noderestriction.PluginName, // NodeRestriction
nodetaint.PluginName, // TaintNodesByCondition
alwayspullimages.PluginName, // AlwaysPullImages
imagepolicy.PluginName, // ImagePolicyWebhook
podsecuritypolicy.PluginName, // PodSecurityPolicy
podnodeselector.PluginName, // PodNodeSelector
podpriority.PluginName, // Priority
defaulttolerationseconds.PluginName, // DefaultTolerationSeconds
podtolerationrestriction.PluginName, // PodTolerationRestriction
exec.DenyEscalatingExec, // DenyEscalatingExec
exec.DenyExecOnPrivileged, // DenyExecOnPrivileged
eventratelimit.PluginName, // EventRateLimit
extendedresourcetoleration.PluginName, // ExtendedResourceToleration
label.PluginName, // PersistentVolumeLabel
setdefault.PluginName, // DefaultStorageClass
storageobjectinuseprotection.PluginName, // StorageObjectInUseProtection
gc.PluginName, // OwnerReferencesPermissionEnforcement
resize.PluginName, // PersistentVolumeClaimResize
runtimeclass.PluginName, // RuntimeClass
certapproval.PluginName, // CertificateApproval
certsigning.PluginName, // CertificateSigning
certsubjectrestriction.PluginName, // CertificateSubjectRestriction
defaultingressclass.PluginName, // DefaultIngressClass

// new admission plugins should generally be inserted above here
// webhook, resourcequota, and deny plugins must go at the end

mutatingwebhook.PluginName, // MutatingAdmissionWebhook
validatingwebhook.PluginName, // ValidatingAdmissionWebhook
resourcequota.PluginName, // ResourceQuota
deny.PluginName, // AlwaysDeny
}

Github: https://github.com/Junedayday/code_reading

Blog: http://junes.tech/

Bilibili:https://space.bilibili.com/293775192

公众号:golangcoding

聚焦目标

理解kubectl的核心实现之一:Visitor Design Pattern 访问者模式

目录

  1. 什么是访问者模式

  2. kubectl中的Visitor

  3. Visitor的链式处理

    1. 多个对象聚合为一个对象
      1. VisitorList
      2. EagerVisitorList
    2. 多个方法聚合为一个方法
      1. DecoratedVisitor
      2. ContinueOnErrorVisitor
    3. 将对象抽象为多个底层对象,逐个调用方法
      1. FlattenListVisitor
      2. FilteredVisitor
  4. Visitor的各类实现

    1. StreamVisitor
    2. FileVisitor
    3. URLVisitor
    4. KustomizeVisitor

visitor design pattern

在设计模式中,访问者模式的定义为:

允许一个或者多个操作应用到对象上,解耦操作和对象本身

那么,对一个程序来说,具体的表现就是:

  1. 表面:某个对象执行了一个方法
  2. 内部:对象内部调用了多个方法,最后统一返回结果

举个例子,

  1. 表面:调用一个查询订单的接口
  2. 内部:先从缓存中查询,没查到再去热点数据库查询,还没查到则去归档数据库里查询

Visitor

我们来看看kubeadm中的访问者模式的定义:

1
2
3
4
5
6
// Visitor 即为访问者这个对象
type Visitor interface {
Visit(VisitorFunc) error
}
// VisitorFunc对应这个对象的方法,也就是定义中的“操作”
type VisitorFunc func(*Info, error) error

基本的数据结构很简单,但从当前的数据结构来看,有两个问题:

  1. 单个操作 可以直接调用Visit方法,那多个操作如何实现呢?
  2. 在应用多个操作时,如果出现了error,该退出还是继续应用下一个操作呢?

Chained

VisitorList

封装多个Visitor为一个,出现错误就立刻中止并返回

1
2
3
4
5
6
7
8
9
10
11
12
// VisitorList定义为[]Visitor,又实现了Visit方法,也就是将多个[]Visitor封装为一个Visitor
type VisitorList []Visitor

// 发生error就立刻返回,不继续遍历
func (l VisitorList) Visit(fn VisitorFunc) error {
for i := range l {
if err := l[i].Visit(fn); err != nil {
return err
}
}
return nil
}

EagerVisitorList

封装多个Visitor为一个,出现错误暂存下来,全部遍历完再聚合所有的错误并返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// EagerVisitorList 也是将多个[]Visitor封装为一个Visitor
type EagerVisitorList []Visitor

// 返回的错误暂存到[]error中,统一聚合
func (l EagerVisitorList) Visit(fn VisitorFunc) error {
errs := []error(nil)
for i := range l {
if err := l[i].Visit(func(info *Info, err error) error {
if err != nil {
errs = append(errs, err)
return nil
}
if err := fn(info, nil); err != nil {
errs = append(errs, err)
}
return nil
}); err != nil {
errs = append(errs, err)
}
}
return utilerrors.NewAggregate(errs)
}

DecoratedVisitor

这里借鉴了装饰器的设计模式,将一个Visitor调用多个VisitorFunc方法,封装为调用一个VisitorFunc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 装饰器Visitor
type DecoratedVisitor struct {
visitor Visitor
decorators []VisitorFunc
}

// visitor遍历调用decorators中所有函数,有失败立即返回
func (v DecoratedVisitor) Visit(fn VisitorFunc) error {
return v.visitor.Visit(func(info *Info, err error) error {
if err != nil {
return err
}
for i := range v.decorators {
if err := v.decorators[i](info, nil); err != nil {
return err
}
}
return fn(info, nil)
})
}

ContinueOnErrorVisitor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 报错依旧继续
type ContinueOnErrorVisitor struct {
Visitor
}

// 报错不立即返回,聚合所有错误后返回
func (v ContinueOnErrorVisitor) Visit(fn VisitorFunc) error {
errs := []error{}
err := v.Visitor.Visit(func(info *Info, err error) error {
if err != nil {
errs = append(errs, err)
return nil
}
if err := fn(info, nil); err != nil {
errs = append(errs, err)
}
return nil
})
if err != nil {
errs = append(errs, err)
}
if len(errs) == 1 {
return errs[0]
}
return utilerrors.NewAggregate(errs)
}

FlattenListVisitor

将runtime.ObjectTyper解析成多个runtime.Object,再转换为多个Info,逐个调用VisitorFunc

1
2
3
4
5
type FlattenListVisitor struct {
visitor Visitor
typer runtime.ObjectTyper
mapper *mapper
}

FilteredVisitor

对Info资源的检验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 过滤的Info
type FilteredVisitor struct {
visitor Visitor
filters []FilterFunc
}

func (v FilteredVisitor) Visit(fn VisitorFunc) error {
return v.visitor.Visit(func(info *Info, err error) error {
if err != nil {
return err
}
for _, filter := range v.filters {
// 检验Info是否满足条件,出错则退出
ok, err := filter(info, nil)
if err != nil {
return err
}
if !ok {
return nil
}
}
return fn(info, nil)
})
}

Implements

StreamVisitor

最基础的Visitor

1
2
3
4
5
6
7
8
type StreamVisitor struct {
// 读取信息的来源,实现了Read这个接口,这个"流式"的概念,包括了常见的HTTP、文件、标准输入等各类输入
io.Reader
*mapper

Source string
Schema ContentValidator
}

FileVisitor

文件的访问,包括标准输入,底层调用StreamVisitor来访问

1
2
3
4
5
type FileVisitor struct {
// 表示文件路径或者STDIN
Path string
*StreamVisitor
}

URLVisitor

HTTP用GET方法获取数据,底层也是复用StreamVisitor

1
2
3
4
5
6
type URLVisitor struct {
URL *url.URL
*StreamVisitor
// 提供错误重试次数
HttpAttemptCount int
}

KustomizeVisitor

自定义的Visitor,针对自定义的文件系统

1
2
3
4
type KustomizeVisitor struct {
Path string
*StreamVisitor
}

Github: https://github.com/Junedayday/code_reading

Blog: http://junes.tech/

Bilibili:https://space.bilibili.com/293775192

公众号:golangcoding