Junedayday Blog

六月天天的个人博客

0%

聚焦目标

理解启动kube-apiserver的权限相关的三个核心概念 Authentication/Authorization/Admission

目录

  1. kube-apiserver的启动
  2. kube-apiserver的三个Server
  3. KubeAPIServer的创建过程
    1. 通用配置概况
    2. 通用配置中的认证
    3. 通用配置中的授权
    4. 通用配置中的准入机制

Run

1
2
3
4
5
6
7
8
9
10
11
12
// 类似kubectl的源代码,kube-apiserver的命令行工具也使用了cobra,我们很快就能找到启动的入口
RunE: func(cmd *cobra.Command, args []string) error {
// 这里包含2个参数,前者是参数completedOptions,后者是一个stopCh <-chan struct{}
return Run(completedOptions, genericapiserver.SetupSignalHandler())
}

/*
在这里,我们可以和kubectl结合起来思考:
kubectl是一个命令行工具,执行完命令就退出;kube-apiserver是一个常驻的服务器进程,监听端口
这里引入了一个stopCh <-chan struct{},可以在启动后,用一个 <-stopCh 作为阻塞,使程序不退出
用channel阻塞进程退出,对比传统的方法 - 用一个永不退出的for循环,是一个很优雅的实现
*/

Three Servers

1
2
3
4
5
6
7
8
9
10
// 在CreateServerChain这个函数下,创建了3个server
func CreateServerChain(){
// API扩展服务,主要针对CRD
createAPIExtensionsServer(){}
// API核心服务,包括常见的Pod/Deployment/Service,我们今天的重点聚焦在这里
// 我会跳过很多非核心的配置参数,一开始就去研究细节,很影响整体代码的阅读效率
CreateKubeAPIServer(){}
// API聚合服务,主要针对metrics
createAggregatorServer(){}
}

KubeAPIServer

1
2
3
4
5
// 创建配置的流程
func CreateKubeAPIServerConfig(){
// 创建通用配置genericConfig
genericConfig, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
}

GenericConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 通用配置的创建
func buildGenericConfig(s *options.ServerRunOptions,proxyTransport *http.Transport){
// Insecure对应的非安全的通信,也就是HTTP
if lastErr = s.InsecureServing...
// Secure对应的就是HTTPS
if lastErr = s.SecureServing...
// OpenAPIConfig是对外提供的API文档
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig()
// 这一块是storageFactory的实例化,可以看到采用的是etcd作为存储方案
storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
storageFactory, lastErr = completedStorageFactoryConfig.New()
// Authentication 认证相关
if lastErr = s.Authentication.ApplyTo()...
// Authorization 授权相关
genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer()
// Admission 准入机制
err = s.Admission.ApplyTo()
}

Authentication

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func (o *BuiltInAuthenticationOptions) ApplyTo(){
// 前面都是对认证config进行参数设置,这里才是真正的实例化
authInfo.Authenticator, openAPIConfig.SecurityDefinitions, err = authenticatorConfig.New()
}

// New这块的代码,我们要抓住核心变量authenticators和tokenAuthenticators,也就是各种认证方法
func (config Config) New() (authenticator.Request, *spec.SecurityDefinitions, error) {
// 核心变量authenticators和tokenAuthenticators
var authenticators []authenticator.Request
var tokenAuthenticators []authenticator.Token

if config.RequestHeaderConfig != nil {
// 1. 添加requestHeader
authenticators = append(authenticators, authenticator.WrapAudienceAgnosticRequest(config.APIAudiences, requestHeaderAuthenticator))
}

if config.ClientCAContentProvider != nil {
// 2. 添加ClientCA
authenticators = append(authenticators, certAuth)
}

if len(config.TokenAuthFile) > 0 {
// 3. token 添加tokenfile
tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, tokenAuth))
}

// 4. token 添加 service account,分两种来源
if len(config.ServiceAccountKeyFiles) > 0 {
tokenAuthenticators = append(tokenAuthenticators, serviceAccountAuth)
}
if utilfeature.DefaultFeatureGate.Enabled(features.TokenRequest) && config.ServiceAccountIssuer != "" {
tokenAuthenticators = append(tokenAuthenticators, serviceAccountAuth)
}
if config.BootstrapToken {
if config.BootstrapTokenAuthenticator != nil {
// 5. token 添加 bootstrap
tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, config.BootstrapTokenAuthenticator))
}
}

if len(config.OIDCIssuerURL) > 0 && len(config.OIDCClientID) > 0 {
// 6. token 添加 oidc
Authenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, oidcAuth))
}
if len(config.WebhookTokenAuthnConfigFile) > 0 {
// 7. token 添加 webhook
tokenAuthenticators = append(tokenAuthenticators, webhookTokenAuth)
}

// 8. 组合tokenAuthenticators到tokenAuthenticators中
if len(tokenAuthenticators) > 0 {
tokenAuth := tokenunion.New(tokenAuthenticators...)
if config.TokenSuccessCacheTTL > 0 || config.TokenFailureCacheTTL > 0 {
tokenAuth = tokencache.New(tokenAuth, true, config.TokenSuccessCacheTTL, config.TokenFailureCacheTTL)
}
authenticators = append(authenticators, bearertoken.New(tokenAuth), websocket.NewProtocolAuthenticator(tokenAuth))
}

// 9. 没有任何认证方式且启用了Anonymous
if len(authenticators) == 0 {
if config.Anonymous {
return anonymous.NewAuthenticator(), &securityDefinitions, nil
}
return nil, &securityDefinitions, nil
}

// 10. 组合authenticators
authenticator := union.New(authenticators...)

return authenticator, &securityDefinitions, nil
}

复杂的Authentication模块的初始化顺序我们看完了,有初步的了解即可,没必要去强制记忆其中的加载顺序。

Authorization

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
func BuildAuthorizer(){
// 与上面一致,实例化是在这个New中
return authorizationConfig.New()
}

// 不得不说,Authorizer这块的阅读体验更好
func (config Config) New() (authorizer.Authorizer, authorizer.RuleResolver, error) {
// 必须传入一个Authorizer机制
if len(config.AuthorizationModes) == 0 {
return nil, nil, fmt.Errorf("at least one authorization mode must be passed")
}

var (
authorizers []authorizer.Authorizer
ruleResolvers []authorizer.RuleResolver
)

for _, authorizationMode := range config.AuthorizationModes {
// 具体的mode定义,可以跳转到对应的链接去看,今天不细讲
switch authorizationMode {
case modes.ModeNode:
authorizers = append(authorizers, nodeAuthorizer)
ruleResolvers = append(ruleResolvers, nodeAuthorizer)

case modes.ModeAlwaysAllow:
authorizers = append(authorizers, alwaysAllowAuthorizer)
ruleResolvers = append(ruleResolvers, alwaysAllowAuthorizer)

case modes.ModeAlwaysDeny:
authorizers = append(authorizers, alwaysDenyAuthorizer)
ruleResolvers = append(ruleResolvers, alwaysDenyAuthorizer)

case modes.ModeABAC:
authorizers = append(authorizers, abacAuthorizer)
ruleResolvers = append(ruleResolvers, abacAuthorizer)

case modes.ModeWebhook:
authorizers = append(authorizers, webhookAuthorizer)
ruleResolvers = append(ruleResolvers, webhookAuthorizer)

case modes.ModeRBAC:
authorizers = append(authorizers, rbacAuthorizer)
ruleResolvers = append(ruleResolvers, rbacAuthorizer)
default:
return nil, nil, fmt.Errorf("unknown authorization mode %s specified", authorizationMode)
}
}

return union.New(authorizers...), union.NewRuleResolvers(ruleResolvers...), nil
}

const (
// ModeAlwaysAllow is the mode to set all requests as authorized
ModeAlwaysAllow string = "AlwaysAllow"
// ModeAlwaysDeny is the mode to set no requests as authorized
ModeAlwaysDeny string = "AlwaysDeny"
// ModeABAC is the mode to use Attribute Based Access Control to authorize
ModeABAC string = "ABAC"
// ModeWebhook is the mode to make an external webhook call to authorize
ModeWebhook string = "Webhook"
// ModeRBAC is the mode to use Role Based Access Control to authorize
ModeRBAC string = "RBAC"
// ModeNode is an authorization mode that authorizes API requests made by kubelets.
ModeNode string = "Node"
)

Admission

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// 查看定义
err = s.Admission.ApplyTo()
func (a *AdmissionOptions) ApplyTo(){
return a.GenericAdmission.ApplyTo()
}

func (ps *Plugins) NewFromPlugins(){
for _, pluginName := range pluginNames {
// InitPlugin 为初始化的工作
plugin, err := ps.InitPlugin(pluginName, pluginConfig, pluginInitializer)
if err != nil {
return nil, err
}
}
}

func (ps *Plugins) InitPlugin(name string, config io.Reader, pluginInitializer PluginInitializer) (Interface, error){
// 获取plugin
plugin, found, err := ps.getPlugin(name, config)
}

// 查看一下Interface的定义,就是对准入机制的控制
// Interface is an abstract, pluggable interface for Admission Control decisions.
type Interface interface {
Handles(operation Operation) bool
}

// 再去看看获取plugin的地方
func (ps *Plugins) getPlugin(name string, config io.Reader) (Interface, bool, error) {
ps.lock.Lock()
defer ps.lock.Unlock()
// 我们再去研究ps.registry这个参数是在哪里被初始化的
f, found := ps.registry[name]
}

// 接下来,我们从kube-apiserver启动过程,逐步找到Admission被初始化的地方
// 启动命令
command := app.NewAPIServerCommand()
// server配置
s := options.NewServerRunOptions()
// admission选项
Admission: kubeoptions.NewAdmissionOptions()
// 注册准入机制
RegisterAllAdmissionPlugins(options.Plugins)
// 准入机制的所有内容
func RegisterAllAdmissionPlugins(plugins *admission.Plugins){
// 这里有很多plugin的注册
}

// 往上翻,我们能找到所有plugin,也就是准入机制的定义
var AllOrderedPlugins = []string{
admit.PluginName, // AlwaysAdmit
autoprovision.PluginName, // NamespaceAutoProvision
lifecycle.PluginName, // NamespaceLifecycle
exists.PluginName, // NamespaceExists
scdeny.PluginName, // SecurityContextDeny
antiaffinity.PluginName, // LimitPodHardAntiAffinityTopology
podpreset.PluginName, // PodPreset
limitranger.PluginName, // LimitRanger
serviceaccount.PluginName, // ServiceAccount
noderestriction.PluginName, // NodeRestriction
nodetaint.PluginName, // TaintNodesByCondition
alwayspullimages.PluginName, // AlwaysPullImages
imagepolicy.PluginName, // ImagePolicyWebhook
podsecuritypolicy.PluginName, // PodSecurityPolicy
podnodeselector.PluginName, // PodNodeSelector
podpriority.PluginName, // Priority
defaulttolerationseconds.PluginName, // DefaultTolerationSeconds
podtolerationrestriction.PluginName, // PodTolerationRestriction
exec.DenyEscalatingExec, // DenyEscalatingExec
exec.DenyExecOnPrivileged, // DenyExecOnPrivileged
eventratelimit.PluginName, // EventRateLimit
extendedresourcetoleration.PluginName, // ExtendedResourceToleration
label.PluginName, // PersistentVolumeLabel
setdefault.PluginName, // DefaultStorageClass
storageobjectinuseprotection.PluginName, // StorageObjectInUseProtection
gc.PluginName, // OwnerReferencesPermissionEnforcement
resize.PluginName, // PersistentVolumeClaimResize
runtimeclass.PluginName, // RuntimeClass
certapproval.PluginName, // CertificateApproval
certsigning.PluginName, // CertificateSigning
certsubjectrestriction.PluginName, // CertificateSubjectRestriction
defaultingressclass.PluginName, // DefaultIngressClass

// new admission plugins should generally be inserted above here
// webhook, resourcequota, and deny plugins must go at the end

mutatingwebhook.PluginName, // MutatingAdmissionWebhook
validatingwebhook.PluginName, // ValidatingAdmissionWebhook
resourcequota.PluginName, // ResourceQuota
deny.PluginName, // AlwaysDeny
}

Github: https://github.com/Junedayday/code_reading

Blog: http://junes.tech/

Bilibili:https://space.bilibili.com/293775192

公众号:golangcoding

聚焦目标

理解kubectl的核心实现之一:Visitor Design Pattern 访问者模式

目录

  1. 什么是访问者模式

  2. kubectl中的Visitor

  3. Visitor的链式处理

    1. 多个对象聚合为一个对象
      1. VisitorList
      2. EagerVisitorList
    2. 多个方法聚合为一个方法
      1. DecoratedVisitor
      2. ContinueOnErrorVisitor
    3. 将对象抽象为多个底层对象,逐个调用方法
      1. FlattenListVisitor
      2. FilteredVisitor
  4. Visitor的各类实现

    1. StreamVisitor
    2. FileVisitor
    3. URLVisitor
    4. KustomizeVisitor

visitor design pattern

在设计模式中,访问者模式的定义为:

允许一个或者多个操作应用到对象上,解耦操作和对象本身

那么,对一个程序来说,具体的表现就是:

  1. 表面:某个对象执行了一个方法
  2. 内部:对象内部调用了多个方法,最后统一返回结果

举个例子,

  1. 表面:调用一个查询订单的接口
  2. 内部:先从缓存中查询,没查到再去热点数据库查询,还没查到则去归档数据库里查询

Visitor

我们来看看kubeadm中的访问者模式的定义:

1
2
3
4
5
6
// Visitor 即为访问者这个对象
type Visitor interface {
Visit(VisitorFunc) error
}
// VisitorFunc对应这个对象的方法,也就是定义中的“操作”
type VisitorFunc func(*Info, error) error

基本的数据结构很简单,但从当前的数据结构来看,有两个问题:

  1. 单个操作 可以直接调用Visit方法,那多个操作如何实现呢?
  2. 在应用多个操作时,如果出现了error,该退出还是继续应用下一个操作呢?

Chained

VisitorList

封装多个Visitor为一个,出现错误就立刻中止并返回

1
2
3
4
5
6
7
8
9
10
11
12
// VisitorList定义为[]Visitor,又实现了Visit方法,也就是将多个[]Visitor封装为一个Visitor
type VisitorList []Visitor

// 发生error就立刻返回,不继续遍历
func (l VisitorList) Visit(fn VisitorFunc) error {
for i := range l {
if err := l[i].Visit(fn); err != nil {
return err
}
}
return nil
}

EagerVisitorList

封装多个Visitor为一个,出现错误暂存下来,全部遍历完再聚合所有的错误并返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// EagerVisitorList 也是将多个[]Visitor封装为一个Visitor
type EagerVisitorList []Visitor

// 返回的错误暂存到[]error中,统一聚合
func (l EagerVisitorList) Visit(fn VisitorFunc) error {
errs := []error(nil)
for i := range l {
if err := l[i].Visit(func(info *Info, err error) error {
if err != nil {
errs = append(errs, err)
return nil
}
if err := fn(info, nil); err != nil {
errs = append(errs, err)
}
return nil
}); err != nil {
errs = append(errs, err)
}
}
return utilerrors.NewAggregate(errs)
}

DecoratedVisitor

这里借鉴了装饰器的设计模式,将一个Visitor调用多个VisitorFunc方法,封装为调用一个VisitorFunc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 装饰器Visitor
type DecoratedVisitor struct {
visitor Visitor
decorators []VisitorFunc
}

// visitor遍历调用decorators中所有函数,有失败立即返回
func (v DecoratedVisitor) Visit(fn VisitorFunc) error {
return v.visitor.Visit(func(info *Info, err error) error {
if err != nil {
return err
}
for i := range v.decorators {
if err := v.decorators[i](info, nil); err != nil {
return err
}
}
return fn(info, nil)
})
}

ContinueOnErrorVisitor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 报错依旧继续
type ContinueOnErrorVisitor struct {
Visitor
}

// 报错不立即返回,聚合所有错误后返回
func (v ContinueOnErrorVisitor) Visit(fn VisitorFunc) error {
errs := []error{}
err := v.Visitor.Visit(func(info *Info, err error) error {
if err != nil {
errs = append(errs, err)
return nil
}
if err := fn(info, nil); err != nil {
errs = append(errs, err)
}
return nil
})
if err != nil {
errs = append(errs, err)
}
if len(errs) == 1 {
return errs[0]
}
return utilerrors.NewAggregate(errs)
}

FlattenListVisitor

将runtime.ObjectTyper解析成多个runtime.Object,再转换为多个Info,逐个调用VisitorFunc

1
2
3
4
5
type FlattenListVisitor struct {
visitor Visitor
typer runtime.ObjectTyper
mapper *mapper
}

FilteredVisitor

对Info资源的检验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 过滤的Info
type FilteredVisitor struct {
visitor Visitor
filters []FilterFunc
}

func (v FilteredVisitor) Visit(fn VisitorFunc) error {
return v.visitor.Visit(func(info *Info, err error) error {
if err != nil {
return err
}
for _, filter := range v.filters {
// 检验Info是否满足条件,出错则退出
ok, err := filter(info, nil)
if err != nil {
return err
}
if !ok {
return nil
}
}
return fn(info, nil)
})
}

Implements

StreamVisitor

最基础的Visitor

1
2
3
4
5
6
7
8
type StreamVisitor struct {
// 读取信息的来源,实现了Read这个接口,这个"流式"的概念,包括了常见的HTTP、文件、标准输入等各类输入
io.Reader
*mapper

Source string
Schema ContentValidator
}

FileVisitor

文件的访问,包括标准输入,底层调用StreamVisitor来访问

1
2
3
4
5
type FileVisitor struct {
// 表示文件路径或者STDIN
Path string
*StreamVisitor
}

URLVisitor

HTTP用GET方法获取数据,底层也是复用StreamVisitor

1
2
3
4
5
6
type URLVisitor struct {
URL *url.URL
*StreamVisitor
// 提供错误重试次数
HttpAttemptCount int
}

KustomizeVisitor

自定义的Visitor,针对自定义的文件系统

1
2
3
4
type KustomizeVisitor struct {
Path string
*StreamVisitor
}

Github: https://github.com/Junedayday/code_reading

Blog: http://junes.tech/

Bilibili:https://space.bilibili.com/293775192

公众号:golangcoding

聚焦目标

我们的目标是查看kubectl create -f nginx_pod.yaml 这个命令是怎么运行的。

目录

  1. main函数入口

  2. 传入参数与子命令的匹配

  3. kubectl命令的初始化

  4. 查看create子命令

  5. runCreate的创建逻辑

main

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func main() {
// 如果不调用rand.Seed,每次重新运行这个main函数,rand下的函数返回值始终一致
// Seed即随机的种子,每次用时间戳作为种子,就能保证随机性
rand.Seed(time.Now().UnixNano())

// 创建了kubectl命令的默认参数
command := cmd.NewDefaultKubectlCommand()

// TODO: once we switch everything over to Cobra commands, we can go back to calling
// cliflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
// normalize func and add the go flag set by hand.
pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
pflag.CommandLine.AddGoFlagSet(goflag.CommandLine)
// cliflag.InitFlags()

// 日志的初始化与退出
logs.InitLogs()
defer logs.FlushLogs()

// 运行command
if err := command.Execute(); err != nil {
os.Exit(1)
}
}

match

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// k8s的命令行工具采用了 cobra 库,具有命令提示等强大功能,比go语言自带的flag强大很多,可参考 github.com/spf13/cobra
func NewDefaultKubectlCommand() *cobra.Command {
return NewDefaultKubectlCommandWithArgs(NewDefaultPluginHandler(plugin.ValidPluginFilenamePrefixes), os.Args, os.Stdin, os.Stdout, os.Stderr)
}

func NewDefaultKubectlCommandWithArgs(pluginHandler PluginHandler, args []string, in io.Reader, out, errout io.Writer) *cobra.Command {
// 初始化NewKubectlCommand,采用标准输入、输出、错误输出
cmd := NewKubectlCommand(in, out, errout)

if pluginHandler == nil {
return cmd
}

if len(args) > 1 {
// 这里为传入的参数,即 create -f nginx_pod.yaml 部分
cmdPathPieces := args[1:]

// 调用cobra的Find去匹配args
if _, _, err := cmd.Find(cmdPathPieces); err != nil {
if err := HandlePluginCommand(pluginHandler, cmdPathPieces); err != nil {
fmt.Fprintf(errout, "%v\n", err)
os.Exit(1)
}
}
}

return cmd
}

command

代码较长,我选择关键性的内容进行讲解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
func NewKubectlCommand(in io.Reader, out, err io.Writer) *cobra.Command {
warningHandler := rest.NewWarningWriter(err, rest.WarningWriterOptions{Deduplicate: true, Color: term.AllowsColorOutput(err)})
warningsAsErrors := false

// 创建主命令
cmds := &cobra.Command{
Use: "kubectl",
Short: i18n.T("kubectl controls the Kubernetes cluster manager"),
Long: templates.LongDesc(`
kubectl controls the Kubernetes cluster manager.

Find more information at:
https://kubernetes.io/docs/reference/kubectl/overview/`),
Run: runHelp,
// 初始化后,在运行指令前的钩子
PersistentPreRunE: func(*cobra.Command, []string) error {
rest.SetDefaultWarningHandler(warningHandler)
// 这里是做pprof性能分析,跳转到对应代码可以看到,我们可以用参数 --profile xxx 来采集性能指标,默认保存在当前目录下的profile.pprof中
return initProfiling()
},
// 运行指令后的钩子
PersistentPostRunE: func(*cobra.Command, []string) error {
// 保存pprof性能分析指标
if err := flushProfiling(); err != nil {
return err
}
// 打印warning条数
if warningsAsErrors {
count := warningHandler.WarningCount()
switch count {
case 0:
// no warnings
case 1:
return fmt.Errorf("%d warning received", count)
default:
return fmt.Errorf("%d warnings received", count)
}
}
return nil
},
// bash自动补齐功能,可通过 kubectl completion bash 命令查看
// 具体安装可参考 https://kubernetes.io/docs/tasks/tools/install-kubectl/#enabling-shell-autocompletion
BashCompletionFunction: bashCompletionFunc,
}

// 实例化Factory接口,工厂模式
f := cmdutil.NewFactory(matchVersionKubeConfigFlags)

// 省略实例化的过程代码

// kubectl定义了7类命令,结合Message和各个子命令的package名来看
groups := templates.CommandGroups{
{
// 1. 初级命令,包括 create/expose/run/set
Message: "Basic Commands (Beginner):",
Commands: []*cobra.Command{
create.NewCmdCreate(f, ioStreams),
expose.NewCmdExposeService(f, ioStreams),
run.NewCmdRun(f, ioStreams),
set.NewCmdSet(f, ioStreams),
},
},
{
// 2. 中级命令,包括explain/get/edit/delete
Message: "Basic Commands (Intermediate):",
Commands: []*cobra.Command{
explain.NewCmdExplain("kubectl", f, ioStreams),
get.NewCmdGet("kubectl", f, ioStreams),
edit.NewCmdEdit(f, ioStreams),
delete.NewCmdDelete(f, ioStreams),
},
},
{
// 3. 部署命令,包括 rollout/scale/autoscale
Message: "Deploy Commands:",
Commands: []*cobra.Command{
rollout.NewCmdRollout(f, ioStreams),
scale.NewCmdScale(f, ioStreams),
autoscale.NewCmdAutoscale(f, ioStreams),
},
},
{
// 4. 集群管理命令,包括 cerfificate/cluster-info/top/cordon/drain/taint
Message: "Cluster Management Commands:",
Commands: []*cobra.Command{
certificates.NewCmdCertificate(f, ioStreams),
clusterinfo.NewCmdClusterInfo(f, ioStreams),
top.NewCmdTop(f, ioStreams),
drain.NewCmdCordon(f, ioStreams),
drain.NewCmdUncordon(f, ioStreams),
drain.NewCmdDrain(f, ioStreams),
taint.NewCmdTaint(f, ioStreams),
},
},
{
// 5. 故障排查和调试,包括 describe/logs/attach/exec/port-forward/proxy/cp/auth
Message: "Troubleshooting and Debugging Commands:",
Commands: []*cobra.Command{
describe.NewCmdDescribe("kubectl", f, ioStreams),
logs.NewCmdLogs(f, ioStreams),
attach.NewCmdAttach(f, ioStreams),
cmdexec.NewCmdExec(f, ioStreams),
portforward.NewCmdPortForward(f, ioStreams),
proxy.NewCmdProxy(f, ioStreams),
cp.NewCmdCp(f, ioStreams),
auth.NewCmdAuth(f, ioStreams),
},
},
{
// 6. 高级命令,包括diff/apply/patch/replace/wait/convert/kustomize
Message: "Advanced Commands:",
Commands: []*cobra.Command{
diff.NewCmdDiff(f, ioStreams),
apply.NewCmdApply("kubectl", f, ioStreams),
patch.NewCmdPatch(f, ioStreams),
replace.NewCmdReplace(f, ioStreams),
wait.NewCmdWait(f, ioStreams),
convert.NewCmdConvert(f, ioStreams),
kustomize.NewCmdKustomize(ioStreams),
},
},
{
// 7. 设置命令,包括label,annotate,completion
Message: "Settings Commands:",
Commands: []*cobra.Command{
label.NewCmdLabel(f, ioStreams),
annotate.NewCmdAnnotate("kubectl", f, ioStreams),
completion.NewCmdCompletion(ioStreams.Out, ""),
},
},
}
groups.Add(cmds)

filters := []string{"options"}

// alpha相关的子命令
alpha := cmdpkg.NewCmdAlpha(f, ioStreams)
if !alpha.HasSubCommands() {
filters = append(filters, alpha.Name())
}

templates.ActsAsRootCommand(cmds, filters, groups...)

// 代码补全相关
for name, completion := range bashCompletionFlags {
if cmds.Flag(name) != nil {
if cmds.Flag(name).Annotations == nil {
cmds.Flag(name).Annotations = map[string][]string{}
}
cmds.Flag(name).Annotations[cobra.BashCompCustom] = append(
cmds.Flag(name).Annotations[cobra.BashCompCustom],
completion,
)
}
}

// 添加其余子命令,包括 alpha/config/plugin/version/api-versions/api-resources/options
cmds.AddCommand(alpha)
cmds.AddCommand(cmdconfig.NewCmdConfig(f, clientcmd.NewDefaultPathOptions(), ioStreams))
cmds.AddCommand(plugin.NewCmdPlugin(f, ioStreams))
cmds.AddCommand(version.NewCmdVersion(f, ioStreams))
cmds.AddCommand(apiresources.NewCmdAPIVersions(f, ioStreams))
cmds.AddCommand(apiresources.NewCmdAPIResources(f, ioStreams))
cmds.AddCommand(options.NewCmdOptions(ioStreams.Out))

return cmds
}

create

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func NewCmdCreate(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
// create子命令的相关选项
o := NewCreateOptions(ioStreams)

// create子命令的相关说明
cmd := &cobra.Command{
Use: "create -f FILENAME",
DisableFlagsInUseLine: true,
Short: i18n.T("Create a resource from a file or from stdin."),
Long: createLong,
Example: createExample,
// 验证参数并运行
Run: func(cmd *cobra.Command, args []string) {
if cmdutil.IsFilenameSliceEmpty(o.FilenameOptions.Filenames, o.FilenameOptions.Kustomize) {
ioStreams.ErrOut.Write([]byte("Error: must specify one of -f and -k\n\n"))
defaultRunFunc := cmdutil.DefaultSubCommandRun(ioStreams.ErrOut)
defaultRunFunc(cmd, args)
return
}
cmdutil.CheckErr(o.Complete(f, cmd))
cmdutil.CheckErr(o.ValidateArgs(cmd, args))
// 核心的运行代码逻辑是在这里的RunCreate
cmdutil.CheckErr(o.RunCreate(f, cmd))
},
}

o.RecordFlags.AddFlags(cmd)

usage := "to use to create the resource"
// 加入文件名选项的flag -f,保存到o.FilenameOptions.Filenames中,对应上面
cmdutil.AddFilenameOptionFlags(cmd, &o.FilenameOptions, usage)
cmdutil.AddValidateFlags(cmd)
cmd.Flags().BoolVar(&o.EditBeforeCreate, "edit", o.EditBeforeCreate, "Edit the API resource before creating")
cmd.Flags().Bool("windows-line-endings", runtime.GOOS == "windows",
"Only relevant if --edit=true. Defaults to the line ending native to your platform.")
cmdutil.AddApplyAnnotationFlags(cmd)
cmdutil.AddDryRunFlag(cmd)
cmd.Flags().StringVarP(&o.Selector, "selector", "l", o.Selector, "Selector (label query) to filter on, supports '=', '==', and '!='.(e.g. -l key1=value1,key2=value2)")
cmd.Flags().StringVar(&o.Raw, "raw", o.Raw, "Raw URI to POST to the server. Uses the transport specified by the kubeconfig file.")
cmdutil.AddFieldManagerFlagVar(cmd, &o.fieldManager, "kubectl-create")

o.PrintFlags.AddFlags(cmd)

// create的子命令,指定create对象
cmd.AddCommand(NewCmdCreateNamespace(f, ioStreams))
cmd.AddCommand(NewCmdCreateQuota(f, ioStreams))
cmd.AddCommand(NewCmdCreateSecret(f, ioStreams))
cmd.AddCommand(NewCmdCreateConfigMap(f, ioStreams))
cmd.AddCommand(NewCmdCreateServiceAccount(f, ioStreams))
cmd.AddCommand(NewCmdCreateService(f, ioStreams))
cmd.AddCommand(NewCmdCreateDeployment(f, ioStreams))
cmd.AddCommand(NewCmdCreateClusterRole(f, ioStreams))
cmd.AddCommand(NewCmdCreateClusterRoleBinding(f, ioStreams))
cmd.AddCommand(NewCmdCreateRole(f, ioStreams))
cmd.AddCommand(NewCmdCreateRoleBinding(f, ioStreams))
cmd.AddCommand(NewCmdCreatePodDisruptionBudget(f, ioStreams))
cmd.AddCommand(NewCmdCreatePriorityClass(f, ioStreams))
cmd.AddCommand(NewCmdCreateJob(f, ioStreams))
cmd.AddCommand(NewCmdCreateCronJob(f, ioStreams))
return cmd
}

runCreate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (o *CreateOptions) RunCreate(f cmdutil.Factory, cmd *cobra.Command) error {
// f为传入的Factory,主要是封装了与kube-apiserver交互客户端

schema, err := f.Validator(cmdutil.GetFlagBool(cmd, "validate"))
if err != nil {
return err
}

cmdNamespace, enforceNamespace, err := f.ToRawKubeConfigLoader().Namespace()
if err != nil {
return err
}

// 实例化Builder,这块的逻辑比较复杂,我们先关注文件部分
r := f.NewBuilder().
Unstructured().
Schema(schema).
ContinueOnError().
NamespaceParam(cmdNamespace).DefaultNamespace().
// 读取文件信息,发现除了支持简单的本地文件,也支持标准输入和http/https协议访问的文件,保存为Visitor
FilenameParam(enforceNamespace, &o.FilenameOptions).
LabelSelectorParam(o.Selector).
Flatten().
Do()
err = r.Err()
if err != nil {
return err
}

count := 0
// 调用visit函数,创建资源
err = r.Visit(func(info *resource.Info, err error) error {
// 打印结果 xxxx created
return o.PrintObj(info.Object)
})
return nil
}

Github: https://github.com/Junedayday/code_reading

Blog: http://junes.tech/

Bilibili:https://space.bilibili.com/293775192

公众号:golangcoding

部署Kubernetes集群的方法(建议用kubeadm),详细可参考我的博客,或者可直接参考官方文档

本次分析的源码基于release-1.19。

后续版本如果对某个模块有大改动的话,大家也可以提醒我进行更新

确立目标

  1. 创建pod的全流程入手,了解各组件的工作内容,组件主要包括
    1. kubectl
    2. kube-apiserver
    3. etcd
    4. kube-controller
    5. kube-scheduler
    6. kubelet
  2. 核心模块引用的库有基本的认识,为后续深入做好铺垫
  3. 结合源码,掌握kubernetes的核心概念

写一个Yaml

1
2
3
4
5
6
7
8
apiVersion: v1
kind: Pod
metadata:
name: nginx-pod
spec:
containers:
- name: nginx
image: nginx:1.8

部署Pod

1
2
3
kubectl create -f nginx_pod.yaml

pod/nginx-pod created

提示创建成功

查询Pod

1
2
3
4
kubectl get pods

NAME READY STATUS RESTARTS AGE
nginx-pod 1/1 Running 0 4m22s

打印出状态:

  • NAME - nginx-pod就是对应上面 metadata.name
  • READY - 就绪的个数
  • STATUS - 当前的状态,RUNNING表示运行中
  • RESTARTS - 重启的次数
  • AGE - 运行的次数

完结撒花

整个操作就这么结束了~

后续的分析,都是基于这个nginx pod的创建示例来的。

Github: https://github.com/Junedayday/code_reading

Blog: http://junes.tech/

Bilibili:https://space.bilibili.com/293775192

公众号:golangcoding