Junedayday Blog

六月天天的个人博客

0%

注:本文的灵感来源于GOPHER 2020年大会陈皓的分享,原PPT的链接可能并不方便获取,所以我下载了一份PDF到git仓,方便大家阅读。我将结合自己的实际项目经历,与大家一起细品这份文档。

目录

ServerConfig

我们先来看看一个常见的HTTP服务器的配置,它区分了2个必填参数与4个非必填参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type ServerCfg struct {
Addr string // 必填
Port int // 必填
Protocol string // 非必填
Timeout time.Duration // 非必填
MaxConns int // 非必填
TLS *tls.Config // 非必填
}

// 我们要实现非常多种方法,来支持各种非必填的情况,示例如下
func NewServer(addr string, port int) (*Server, error) {}
func NewTLSServer(addr string, port int, tls *tls.Config) (*Server, error) {}
func NewServerWithTimeout(addr string, port int, timeout time.Duration) (*Server, error) {}
func NewTLSServerWithMaxConnAndTimeout(addr string, port int, maxconns int, timeout time.Duration, tls *tls.Config) (*Server, error) {}

SplitConfig

编程的一大重点,就是要 分离变化点和不变点。这里,我们可以将必填项认为是不变点,而非必填则是变化点。

我们将非必填的选项拆分出来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type Config struct {
Protocol string
Timeout time.Duration
MaxConns int
TLS *tls.Config
}

type Server struct {
Addr string
Port int
Conf *Config
}

func NewServer(addr string, port int, conf *Config) (*Server, error) {
return &Server{
Addr: addr,
Port: port,
Conf: conf,
}, nil
}

func main() {
srv1, _ := NewServer("localhost", 9000, nil)

conf := Config{Protocol: "tcp", Timeout: 60 * time.Second}
srv2, _ := NewServer("localhost", 9000, &conf)

fmt.Println(srv1, srv2)
}

到这里,其实已经满足大部分的开发需求了。那么,我们将进入今天的重点。

Functional Option

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
type Server struct {
Addr string
Port int
Protocol string
Timeout time.Duration
MaxConns int
TLS *tls.Config
}

// 定义一个Option类型的函数,它操作了Server这个对象
type Option func(*Server)

// 下面是对四个可选参数的配置函数
func Protocol(p string) Option {
return func(s *Server) {
s.Protocol = p
}
}

func Timeout(timeout time.Duration) Option {
return func(s *Server) {
s.Timeout = timeout
}
}

func MaxConns(maxconns int) Option {
return func(s *Server) {
s.MaxConns = maxconns
}
}

func TLS(tls *tls.Config) Option {
return func(s *Server) {
s.TLS = tls
}
}

// 用到了不定参数的特性,将任意个option应用到Server上
func NewServer(addr string, port int, options ...Option) (*Server, error) {
// 先填写默认值
srv := Server{
Addr: addr,
Port: port,
Protocol: "tcp",
Timeout: 30 * time.Second,
MaxConns: 1000,
TLS: nil,
}
// 应用任意个option
for _, option := range options {
option(&srv)
}
return &srv, nil
}

func main() {
s1, _ := NewServer("localhost", 1024)
s2, _ := NewServer("localhost", 2048, Protocol("udp"))
s3, _ := NewServer("0.0.0.0", 8080, Timeout(300*time.Second), MaxConns(1000))

fmt.Println(s1, s2, s3)
}

耗子哥给出了6个点,但我感受最深的是以下两点:

  1. 可读性强,将配置都转化成了对应的函数项option
  2. 扩展性好,新增参数只需要增加一个对应的方法

那么对应的代价呢?就是需要编写多个Option函数,代码量会有所增加。

如果大家对这个感兴趣,可以去看一下Rob Pike的这篇blog

Further

顺着耗子叔的例子,我们再思考一下,如果配置的过程中有参数限制,那么我们该怎么办呢?

首先,我们改造一下函数Option

1
2
// 返回错误
type OptionWithError func(*Server) error

然后,我们改造一下其中两个函数作为示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func Protocol(p string) OptionWithError {
return func(s *Server) error {
if p == "" {
return errors.New("empty protocol")
}
s.Protocol = p
return nil
}
}

func Timeout(timeout time.Duration) Option {
return func(s *Server) error {
if timeout.Seconds() < 1 {
return errors.New("time out should not less than 1s")
}
s.Timeout = timeout
return nil
}
}

我们再做一次改造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func NewServer(addr string, port int, options ...OptionWithError) (*Server, error) {
srv := Server{
Addr: addr,
Port: port,
Protocol: "tcp",
Timeout: 30 * time.Second,
MaxConns: 1000,
TLS: nil,
}
// 增加了一个参数验证的步骤
for _, option := range options {
if err := option(&srv); err != nil {
return nil, err
}
}
return &srv, nil
}

改造基本到此完成,希望能给大家带来一定的帮助。

Github: https://github.com/Junedayday/code_reading

Blog: http://junes.tech/

Bilibili:https://space.bilibili.com/293775192

公众号:golangcoding

注:本文的灵感来源于GOPHER 2020年大会陈皓的分享,原PPT的链接可能并不方便获取,所以我下载了一份PDF到git仓,方便大家阅读。我将结合自己的实际项目经历,与大家一起细品这份文档。

目录

Functional

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type Number struct {
a int
b string
c bool
d []int32
e error
}

func (n *Number) parse(r io.Reader) error {
if err := binary.Read(r, binary.BigEndian, &n.a); err != nil {
return err
}
if err := binary.Read(r, binary.BigEndian, &n.b); err != nil {
return err
}
if err := binary.Read(r, binary.BigEndian, &n.c); err != nil {
return err
}
if err := binary.Read(r, binary.BigEndian, &n.d); err != nil {
return err
}
if err := binary.Read(r, binary.BigEndian, &n.e); err != nil {
return err
}
return nil
}

引入了函数式编程的方式,我们看看有什么改变

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (n *Number) parse(r io.Reader) error {
// 先定义一个error
var err error

// 定义函数,注意这里的err的作用域是来自上面定义的
read := func(data interface{}) {
// 先检查error,如果已经有错误则不检查
if err != nil {
return
}
err = binary.Read(r, binary.BigEndian, data)
}

// 注意,这个函数的调用逻辑和之前的差别在于一点:
// 即使前面的发生了error,下面的函数也会被调用
read(&n.a)
read(&n.b)
read(&n.c)
read(&n.d)
read(&n.e)

return err
}

ErrorObject

先看一个标准库中的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func main() {
input := bytes.NewReader([]byte("hello"))

// 扫描数据,这里不会直接返回错误
scanner := bufio.NewScanner(input)
for scanner.Scan() {
token := scanner.Text()
fmt.Println(token)
}

// 从Err()方法中获取错误
if err := scanner.Err(); err != nil {
fmt.Println(err)
}
}

它的根本思想,是将error嵌入到了对象中。那我们借鉴一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type Reader struct {
r io.Reader
err error
}


func (r *Reader) read(data interface{}) {
if r.err == nil {
r.err = binary.Read(r.r, binary.BigEndian, data)
}
}

func (n *Number) parse(reader io.Reader) error {
r := Reader{r: reader}

r.read(&n.a)
r.read(&n.b)
r.read(&n.c)
r.read(&n.d)
r.read(&n.e)

return r.err
}

捎带提一句:个人不太喜欢上面scanner的错误处理方式,这个要求使用方对这个包很熟悉,否则很容易忘掉后面的错误处理逻辑。但后面处理错误的逻辑,就很直接地将错误返回,可读性很强。

Wrap

耗子叔给的例子是调用了github.com/pkg/errors下的wrap包,不过我更倾向于直接用原生的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func main() {
// 原始 error
err := errors.New("level 1")
fmt.Println(err)
// level 1

// wrap一下error,注意error的占位符是%w
wraped := fmt.Errorf("%v: %w", "level 2", err)
fmt.Println(wraped)
// level 2: level 1

// unwrap 后获得原来的错误
unwraped := errors.Unwrap(wraped)
fmt.Println(unwraped)
// level 1

// 过度unwrap会导致错误变成nil
unwraped2 := errors.Unwrap(unwraped)
fmt.Println(unwraped2)
// nil
}

但在实际项目实践中,Wrap的这个特性并不好用:

如何Wrap Error,在多人协同开发、多模块开发过程中,很难统一。而一旦不统一,容易出现示例中的过度Unwrap的情况。

所以,我认为与其花大精力在制定错误的标准上,还不如利用fmt.Errorf将错误信息直观地表述出来。

Github: https://github.com/Junedayday/code_reading

Blog: http://junes.tech/

Bilibili:https://space.bilibili.com/293775192

公众号:golangcoding

注:本文的灵感来源于GOPHER 2020年大会陈皓的分享,原PPT的链接可能并不方便获取,所以我下载了一份PDF到git仓,方便大家阅读。我将结合自己的实际项目经历,与大家一起细品这份文档。

目录

Embedded

接口定义

1
2
3
4
5
6
7
8
// 定义了两种interface
type Painter interface {
Paint()
}

type Clicker interface {
Click()
}

Label 实现了 Painter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 标准组件,用于嵌入
type Widget struct {
X, Y int
}

// Label 实现了 Painter
type Label struct {
Widget // Embedding (delegation)
Text string // Aggregation
}

func (label Label) Paint() {
fmt.Printf("%p:Label.Paint(%q)\n", &label, label.Text)
}

ListBox实现了Painter和Clicker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// ListBox声明了Paint和Click,所以实现了Painter和Clicker
type ListBox struct {
Widget // Embedding (delegation)
Texts []string // Aggregation
Index int // Aggregation
}

func (listBox ListBox) Paint() {
fmt.Printf("ListBox.Paint(%q)\n", listBox.Texts)
}

func (listBox ListBox) Click() {
fmt.Printf("ListBox.Click(%q)\n", listBox.Texts)
}

Button也实现了Painter和Clicker

1
2
3
4
5
6
7
8
9
10
11
12
13
// Button 继承了Label,所以直接实现了Painter
// 接下来,Button又声明了Paint和Click,所以实现了Painter和Clicker,其中Paint方法被覆
type Button struct {
Label // Embedding (delegation)
}

func (button Button) Paint() { // Override
fmt.Printf("Button.Paint(%s)\n", button.Text)
}

func (button Button) Click() {
fmt.Printf("Button.Click(%s)\n", button.Text)
}

方法调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func main() {
label := Label{Widget{10, 70}, "Label"}
button1 := Button{Label{Widget{10, 70}, "OK"}}
button2 := Button{Label{Widget{50, 70}, "Cancel"}}
listBox := ListBox{Widget{10, 40},
[]string{"AL", "AK", "AZ", "AR"}, 0}

for _, painter := range []Painter{label, listBox, button1, button2} {
painter.Paint()
}

for _, widget := range []interface{}{label, listBox, button1, button2} {
// 默认都实现了Painter接口,可以直接调用
widget.(Painter).Paint()
if clicker, ok := widget.(Clicker); ok {
clicker.Click()
}
}
}

这个例子代码很多,我个人认为重点可以归纳为一句话:

用嵌入实现方法的继承,减少代码的冗余度

耗子叔的例子很精彩,不过我个人不太喜欢interface这个数据类型(main函数中),有没有什么优化的空间呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
// 定义两种方法的组合
type PaintClicker interface {
Painter
Clicker
}

func main() {
// 在上面的例子中,interface传参其实不太优雅,有没有更优雅的实现呢?那就用组合的interface
for _, widget := range []PaintClicker{listBox, button1, button2} {
widget.Paint()
widget.Click()
}
}

IoC

先看一个Int集合的最基本实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Int集合,用于最基础的增删查
type IntSet struct {
data map[int]bool
}

func NewIntSet() IntSet {
return IntSet{make(map[int]bool)}
}

func (set *IntSet) Add(x int) { set.data[x] = true }

func (set *IntSet) Delete(x int) { delete(set.data, x) }

func (set *IntSet) Contains(x int) bool { return set.data[x] }

现在,需求来了,我们希望对这个Int集合的操作是可撤销的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 可撤销的Int集合,依赖于IntSet,我们看看基本实现
type UndoableIntSet struct { // Poor style
IntSet // Embedding (delegation)
functions []func()
}

func NewUndoableIntSet() UndoableIntSet {
return UndoableIntSet{NewIntSet(), nil}
}

// 新增
// 不存在元素时:添加元素,并新增撤销函数:删除
// 存在元素时:不做任何操作,并新增撤销函数:空
func (set *UndoableIntSet) Add(x int) { // Override
if !set.Contains(x) {
set.data[x] = true
set.functions = append(set.functions, func() { set.Delete(x) })
} else {
set.functions = append(set.functions, nil)
}
}

// 删除,与新增相反
// 存在元素时:删除元素,并新增撤销函数:新增
// 不存在元素时:不做任何操作,并新增撤销函数:空
func (set *UndoableIntSet) Delete(x int) { // Override
if set.Contains(x) {
delete(set.data, x)
set.functions = append(set.functions, func() { set.Add(x) })
} else {
set.functions = append(set.functions, nil)
}
}

// 撤销:执行最后一个撤销函数function
func (set *UndoableIntSet) Undo() error {
if len(set.functions) == 0 {
return errors.New("No functions to undo")
}
index := len(set.functions) - 1
if function := set.functions[index]; function != nil {
function()
set.functions[index] = nil // For garbage collection
}
set.functions = set.functions[:index]
return nil
}

上面的实现是一种顺序逻辑的思路,整体还是挺麻烦的。有没有优化思路呢?

定义一下Undo这个结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type Undo []func()

func (undo *Undo) Add(function func()) {
*undo = append(*undo, function)
}

func (undo *Undo) Undo() error {
functions := *undo
if len(functions) == 0 {
return errors.New("No functions to undo")
}
index := len(functions) - 1
if function := functions[index]; function != nil {
function()
functions[index] = nil // For garbage collection
}
*undo = functions[:index]
return nil
}

细品一下这里的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type IntSet2 struct {
data map[int]bool
undo Undo
}

func NewIntSet2() IntSet2 {
return IntSet2{data: make(map[int]bool)}
}

func (set *IntSet2) Undo() error {
return set.undo.Undo()
}

func (set *IntSet2) Contains(x int) bool {
return set.data[x]
}

func (set *IntSet2) Add(x int) {
if !set.Contains(x) {
set.data[x] = true
set.undo.Add(func() { set.Delete(x) })
} else {
set.undo.Add(nil)
}
}

func (set *IntSet2) Delete(x int) {
if set.Contains(x) {
delete(set.data, x)
set.undo.Add(func() { set.Add(x) })
} else {
set.undo.Add(nil)
}
}

我们看一下,这块代码的前后逻辑有了啥变化:

  1. 之前,撤销函数是在Add/Delete时添加的,函数中包含了IntSet的操作,也就是 Undo依赖IntSet
  2. 而修改之后,撤销函数被抽象为Undo,撤销相关的工作直接调用Undo相关的工作即可,也就是 IntSet依赖Undo

我们再来分析一下

  • Undo是控制逻辑 - 撤销动作
  • IntSet是业务逻辑 - 保存数据的功能。

业务逻辑依赖控制逻辑,才能保证在复杂业务逻辑变化场景下,代码更健壮!

Github: https://github.com/Junedayday/code_reading

Blog: http://junes.tech/

Bilibili:https://space.bilibili.com/293775192

公众号:golangcoding

注:本文的灵感来源于GOPHER 2020年大会陈皓的分享,原PPT的链接可能并不方便获取,所以我下载了一份PDF到git仓,方便大家阅读。我将结合自己的实际项目经历,与大家一起细品这份文档。

目录

注:切勿过早优化!

Time

这部分的内容实战项目中用得不多,大家记住耗子叔总结出来的一个原则即可:

尽量用time.Timetime.Duration,如果必须用string,尽量用time.RFC3339

然而现实情况并没有那么理想,实际项目中用得最频繁,还是自定义的2006-01-02 15:04:05

1
time.Now().Format("2006-01-02 15:04:05")

Performance1

Itoa性能高于Sprint

主要性能差异是由于Sprint针对的是复杂的字符串拼接,底层有个buffer,会在它的基础上进行一些字符串的拼接;

Itoa直接通过一些位操作组合出字符串。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 170 ns/op
func Benchmark_Sprint(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = fmt.Sprint(rand.Int())
}
}

// 81.9 ns/op
func Benchmark_Itoa(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = strconv.Itoa(rand.Int())
}
}

减少string到byte的转换

主要了解go的string[]byte的转换还是比较耗性能的,但大部分情况下无法避免这种转换。

我们注意一种场景即可:从[]byte转换为string,再转换为[]byte

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 43.9 ns/op
func Benchmark_String2Bytes(b *testing.B) {
data := "Hello world"
w := ioutil.Discard
for i := 0; i < b.N; i++ {
w.Write([]byte(data))
}
}

// 3.06 ns/op
func Benchmark_Bytes(b *testing.B) {
data := []byte("Hello world")
w := ioutil.Discard
for i := 0; i < b.N; i++ {
w.Write(data)
}
}

切片能声明cap的,尽量初始化时声明

了解slice的扩容机制就能很容易地理解。切片越长,影响越大。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
var size = 1000

// 4494 ns/op
func Benchmark_NoCap(b *testing.B) {
for n := 0; n < b.N; n++ {
data := make([]int, 0)
for k := 0; k < size; k++ {
data = append(data, k)
}
}
}

// 2086 ns/op
func Benchmark_Cap(b *testing.B) {
for n := 0; n < b.N; n++ {
data := make([]int, 0, size)
for k := 0; k < size; k++ {
data = append(data, k)
}
}
}

避免用string做大量字符串的拼接

频繁拼接字符串的场景并不多,了解即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
var strLen = 10000

// 0.0107 ns/op
func Benchmark_StringAdd(b *testing.B) {
var str string
for n := 0; n < strLen; n++ {
str += "x"
}
}

// 0.000154 ns/op
func Benchmark_StringBuilder(b *testing.B) {
var builder strings.Builder
for n := 0; n < strLen; n++ {
builder.WriteString("x")
}
}

// 0.000118 ns/op
func Benchmark_BytesBuffer(b *testing.B) {
var buffer bytes.Buffer
for n := 0; n < strLen; n++ {
buffer.WriteString("x")
}
}

Performance2

并行操作用sync.WaitGroup控制

热点内存分配用sync.Pool

注意一下,一定要是热点,千万不要 过早优化

倾向于使用lock-free的atomic包

除了常用的CAS操作,还有atomic.ValueStoreLoad操作,这里简单地放个实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func main() {
v := atomic.Value{}
type demo struct {
a int
b string
}

v.Store(&demo{
a: 1,
b: "hello",
})

data, ok := v.Load().(*demo)
fmt.Println(data, ok)
// &{1 hello} true
}

复杂场景下,还是建议用mutex

对磁盘的大量读写用bufio包

bufio.NewReader()bufio.NewWriter()

对正则表达式不要重复compile

1
2
3
4
5
6
7
// 如果匹配的格式不会变化,全局只初始化一次即可
var compiled = regexp.MustCompile(`^[a-z]+[0-9]+$`)

func main() {
fmt.Println(compiled.MatchString("test123"))
fmt.Println(compiled.MatchString("test1234"))
}

用protobuf替换json

go项目内部通信尽量用protobuf,但如果是对外提供api,比如web前端,json格式更方便。

map的key尽量用int来代替string

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
var size = 1000000

// 0.0442 ns/op
func Benchmark_MapInt(b *testing.B) {
var m = make(map[int]struct{})
for i := 0; i < size; i++ {
m[i] = struct{}{}
}
b.ResetTimer()
for n := 0; n < size; n++ {
_, _ = m[n]
}
}

// 0.180 ns/op
func Benchmark_MapString(b *testing.B) {
var m = make(map[string]struct{})
for i := 0; i < size; i++ {
m[strconv.Itoa(i)] = struct{}{}
}
b.ResetTimer()
for n := 0; n < size; n++ {
_, _ = m[strconv.Itoa(n)]
}
}

示例中strconv.Itoa函数对性能多少有点影响,但可以看到stringint的差距是在数量级的。

Further

PPT中给出了8个扩展阅读,大家根据情况自行阅读。

如果说你的时间只够读一个材料的话,我推荐大家反复品读一下Effective Go

Github: https://github.com/Junedayday/code_reading

Blog: http://junes.tech/

Bilibili:https://space.bilibili.com/293775192

公众号:golangcoding

注:本文的灵感来源于GOPHER 2020年大会陈皓的分享,原PPT的链接可能并不方便获取,所以我下载了一份PDF到git仓,方便大家阅读。我将结合自己的实际项目经历,与大家一起细品这份文档。

目录

Slice Internal

关于Slice的实现,我之前有一讲专门分析过底层实现。考虑到很多朋友没有细看,那我就再简单地讲一下。

1
2
3
4
5
type slice struct {
array unsafe.Pointer // Slice底层保存数据的指针
len int // 当前使用的长度
cap int // 分配的长度
}

掌握Slice的底层实现,能让你真正理解一些看似“奇怪的”现象:

1
2
3
4
5
6
7
8
9
10
11
12
13
func main(){
foo := make([]int, 5)
foo[3] = 42
foo[4] = 100

bar := foo[1:4]
bar[1] = 99

fmt.Println(foo)
// [0 0 99 42 100]
fmt.Println(bar)
// [0 99 42]
}

Tip: bar和foo是共享slice结构体底层的array的,所以修改了bar数组,foo也会变化

1
2
3
4
5
6
7
8
9
10
func main(){
a := make([]int, 32)
b := a[1:16]

a = append(a, 1)
a[2] = 42

fmt.Println(b)
// [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
}

Tip: a和b原来是共享array的,但在a = append(a, 1)后发生了扩容,a和b指向的array发生了变化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 func main(){
path := []byte("AAAA/BBBBBBBBB")
sepIndex := bytes.IndexByte(path,'/')
dir1 := path[:sepIndex]
dir2 := path[sepIndex+1:]
fmt.Println(cap(dir1),cap(dir2))
// 14 9
fmt.Println("dir1 =>",string(dir1))
// dir1 => AAAA
fmt.Println("dir2 =>",string(dir2))
// dir2 => BBBBBBBBB

dir1 = append(dir1,"suffix"...)
fmt.Println("dir1 =>",string(dir1))
// dir1 => AAAAsuffix
fmt.Println("dir2 =>",string(dir2))
// dir2 => uffixBBBB
}

Tip: 核心点在于理解dir1和dir2的cap分别是14和9。由于dir1的当前len=4,append的长度=6,4+6<14,所以不会发生扩容

Deep Comparison

我们先看一下示例,data结构体中四个注释为not comparable表示无法直接用 == 符号对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type data struct {
num int // ok
checks [10]func() bool // not comparable
doit func() bool // not comparable
m map[string]string // not comparable
bytes []byte // not comparable
}

func main() {
v1 := data{}
v2 := data{}
fmt.Println("v1 == v2:", reflect.DeepEqual(v1, v2))
// prints: v1 == v2: true

m1 := map[string]string{"one": "a", "two": "b"}
m2 := map[string]string{"two": "b", "one": "a"}
fmt.Println("m1 == m2:", reflect.DeepEqual(m1, m2))
// prints: m1 == m2: true

s1 := []int{1, 2, 3}
s2 := []int{1, 2, 3}
fmt.Println("s1 == s2:", reflect.DeepEqual(s1, s2))
// prints: s1 == s2: true
}

Tip: 示例比较复杂,其实要表达的内容比较简单:

函数、map、切片(不包括数组)以及它们的复合结构(如函数的数组),无法直接对比,只能用 reflect.DeepEqual

Function vs Receiver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type Person struct {
Name string
Sexual string
Age int
}

func PrintPerson(p *Person) { fmt.Printf("Name=%s, Sexual=%s, Age=%d\n", p.Name, p.Sexual, p.Age) }
func (p *Person) Print() { fmt.Printf("Name=%s, Sexual=%s, Age=%d\n", p.Name, p.Sexual, p.Age) }

func main() {
var p = Person{
Name: "Hao Chen", Sexual: "Male", Age: 44,
}

PrintPerson(&p)
// Name=Hao Chen, Sexual=Male, Age=44
p.Print()
// Name=Hao Chen, Sexual=Male, Age=44
}

Tip: 示例比较简单,但其中蕴含的意义非常大,如对Person这个对象的抽象、简化代码等。

另外值得一提的是,Go编译器会根据方法 func (p *Person) Print() 的定义,将 p.Print()中的p从Person转换为*Person

Interface Patterns

这个模块非常重要,希望大家倒一杯水,细细品尝。

示例是一个很简单的interface实现,用来打印接口,我们看看代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type Country struct {
Name string
}

type City struct {
Name string
}

type Printable interface {
PrintStr()
}

func (c Country) PrintStr() {
fmt.Println(c.Name)
}

func (c City) PrintStr() {
fmt.Println(c.Name)
}

func main() {
c1 := Country{"China"}
c2 := City{"Beijing"}

var cList = []Printable{c1, c2}
for _, v := range cList {
v.PrintStr()
}
}

那么,这时问题来了,如果我要实现N个Printable,就要定义N个strcut+N个PrintStr()方法。

前者的工作不能避免,而后者能否简化?那么示例来了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type WithName struct {
Name string
}

type Country struct {
WithName
}

type City struct {
WithName
}

type Printable interface {
PrintStr()
}

func (c WithName) PrintStr() {
fmt.Println(c.Name)
}

func main() {
c1 := Country{WithName{"China"}}
c2 := City{WithName{"Beijing"}}

var cList = []Printable{c1, c2}
for _, v := range cList {
v.PrintStr()
}
}

Tip: 核心就是用 embedded 的特性来删除冗余的代码。当然,代价是初始化会稍微麻烦点。


这时候,陈皓又给出了一个例子,即打印的内容会根据具体的实现不同时,无法直接用WithName来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type Country struct {
Name string
}

type City struct {
Name string
}

type Printable interface {
PrintStr()
}

func (c Country) PrintStr() {
fmt.Println("Country:", c.Name)
}

func (c City) PrintStr() {
fmt.Println("City:", c.Name)
}

func main() {
c1 := Country{"China"}
c2 := City{"Beijing"}

var cList = []Printable{c1, c2}
for _, v := range cList {
v.PrintStr()
}
}

首先,我们要明确是否有必要优化。如果只有示例中这么几行代码,我们完全没必要改写。那如果系统真复杂到一定程度,我们该怎么办呢?

这是一个很发散性的问题,我这里给出一个个人比较常用的解决方案,作为参考。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
type WithTypeName struct {
Type string
Name string
}

type Country struct {
WithTypeName
}

func NewCountry(name string) Printable {
return Country{WithTypeName{"Country", name}}
}

type City struct {
WithTypeName
}

func NewCity(name string) Printable {
return City{WithTypeName{"City", name}}
}

type Printable interface {
PrintStr()
}

func (c WithTypeName) PrintStr() {
fmt.Printf("%s:%s\n", c.Type, c.Name)
}

func main() {
c1 := NewCountry("China")
c2 := NewCity("Beijing")

var cList = []Printable{c1, c2}
for _, v := range cList {
v.PrintStr()
}
}

Tip: 这种方法的好处有很多(先不谈弊端),比如可以将具体的实现CountryCity私有化,不对外暴露实现细节。今天不做细谈。

最后,送上一句经典:

Program to an interface, not an implementation

Github: https://github.com/Junedayday/code_reading

Blog: http://junes.tech/

Bilibili:https://space.bilibili.com/293775192

公众号:golangcoding

聚焦目标

理解 kubelet 的运行机制

目录

  1. 运行的主函数
  2. 运行kubelet
  3. 核心数据管理Kubelet
  4. 同步循环
  5. 处理pod的同步工作
  6. 总结

Run

从主函数找到run函数,代码较长,我精简了一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
// 一长串的配置初始化与验证

// done channel,用来通知运行结束
done := make(chan struct{})

// 注册到configz模块
err = initConfigz(&s.KubeletConfiguration)
if err != nil {
klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)
}

// 获取节点的相关信息
hostName, err := nodeutil.GetHostname(s.HostnameOverride)
if err != nil {
return err
}
nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
if err != nil {
return err
}

switch {
// 独立运行模式
case standaloneMode:
// 对客户端进行初始化
case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
}

// cgroup 相关初始化
var cgroupRoots []string
nodeAllocatableRoot := cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupsPerQOS, s.CgroupDriver)
cgroupRoots = append(cgroupRoots, nodeAllocatableRoot)
kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
if err != nil {
klog.Warningf("failed to get the kubelet's cgroup: %v. Kubelet system container metrics may be missing.", err)
} else if kubeletCgroup != "" {
cgroupRoots = append(cgroupRoots, kubeletCgroup)
}

runtimeCgroup, err := cm.GetRuntimeContainer(s.ContainerRuntime, s.RuntimeCgroups)
if err != nil {
klog.Warningf("failed to get the container runtime's cgroup: %v. Runtime system container metrics may be missing.", err)
} else if runtimeCgroup != "" {
cgroupRoots = append(cgroupRoots, runtimeCgroup)
}

if s.SystemCgroups != "" {
cgroupRoots = append(cgroupRoots, s.SystemCgroups)
}

// 下面一大块都是对 ContainerManager 的初始化
if kubeDeps.ContainerManager == nil {
if s.CgroupsPerQOS && s.CgroupRoot == "" {
klog.Info("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /")
s.CgroupRoot = "/"
}

// cpu相关信息
var reservedSystemCPUs cpuset.CPUSet

// ContainerManager的实例化
kubeDeps.ContainerManager, err = cm.NewContainerManager(
kubeDeps.Mounter,
kubeDeps.CAdvisorInterface,
// Node 相关配置
cm.NodeConfig{},
s.FailSwapOn,
devicePluginEnabled,
kubeDeps.Recorder)

if err != nil {
return err
}
}

// 内存OOM相关
oomAdjuster := kubeDeps.OOMAdjuster
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
klog.Warning(err)
}

// 预初始化Runtime
err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration,
kubeDeps, &s.ContainerRuntimeOptions,
s.ContainerRuntime,
s.RuntimeCgroups,
s.RemoteRuntimeEndpoint,
s.RemoteImageEndpoint,
s.NonMasqueradeCIDR)
if err != nil {
return err
}

// 运行Kubelet
if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
return err
}

// 通知deamon的systemd
go daemon.SdNotify(false, "READY=1")

// 阻塞
select {
case <-done:
break
case <-ctx.Done():
break
}

return nil
}

RunKubelet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
// 获取节点信息
hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
if err != nil {
return err
}
nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
if err != nil {
return err
}
hostnameOverridden := len(kubeServer.HostnameOverride) > 0

// 创建并初始化 kubelet
k, err := createAndInitKubelet()
if err != nil {
return fmt.Errorf("failed to create kubelet: %v", err)
}

if runOnce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %v", err)
}
klog.Info("Started kubelet as runonce")
} else {
// 开始kubelet
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)
klog.Info("Started kubelet")
}
return nil
}

// 开始运行,都是并发的
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) {
// 运行
go k.Run(podCfg.Updates())

// 开启kubelet的http服务端
if enableServer {
go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth,
enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling, kubeCfg.EnableSystemLogHandler)

}
// 只读端口
if kubeCfg.ReadOnlyPort > 0 {
go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)
}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
go k.ListenAndServePodResources()
}
}

Kubelet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// 这里的k是一个interface定义,我们需要回头看看
type Bootstrap interface {
GetConfiguration() kubeletconfiginternal.KubeletConfiguration
BirthCry()
StartGarbageCollection()
ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableCAdvisorJSONEndpoints, enableDebuggingHandlers, enableContentionProfiling, enableSystemLogHandler bool)
ListenAndServeReadOnly(address net.IP, port uint, enableCAdvisorJSONEndpoints bool)
ListenAndServePodResources()
Run(<-chan kubetypes.PodUpdate)
RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error)
}

// 查看对应的实例化函数
func createAndInitKubelet() (k kubelet.Bootstrap, err error) {
k, err = kubelet.NewMainKubelet()
return k, nil
}

func NewMainKubelet() (*Kubelet, error) {
// 参数的初始化

// klet 的实例化结构
klet := &Kubelet{}

// 下面是klet中各种参数的填充
return klet, nil
}

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
// 内部模块的初始化
if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
klog.Fatal(err)
}

go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

if kl.kubeClient != nil {
// 与kube-apiserver同步节点状态
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
go kl.fastStatusUpdateOnce()
go kl.nodeLeaseController.Run(wait.NeverStop)
}
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

if kl.makeIPTablesUtilChains {
kl.initNetworkUtil()
}

// 一个kill pod的goroutine
go wait.Until(kl.podKiller.PerformPodKillingWork, 1*time.Second, wait.NeverStop)

kl.statusManager.Start()
kl.probeManager.Start()

if kl.runtimeClassManager != nil {
kl.runtimeClassManager.Start(wait.NeverStop)
}

kl.pleg.Start()
// 同步的主逻辑
kl.syncLoop(updates, kl)
}

syncLoop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
// 开始运行kubelet的主同步循环
klog.Info("Starting kubelet main sync loop.")

// ticker每秒一次
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
// housekeeping 清理周期
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()

for {
kl.syncLoopMonitor.Store(kl.clock.Now())
// 同步
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}

// 这里的3个channel比较重要:configCh用于配置,syncCh用于触发同步,housekeepingCh用于触发清理
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
// config channel关闭
if !open {
klog.Errorf("Update channel is closed. Exiting the sync loop.")
return false
}
// 对应不同的操作
switch u.Op {
case kubetypes.ADD:
klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
klog.Errorf("Kubelet does not support snapshot update")
default:
klog.Errorf("Invalid event type received: %d.", u.Op)
}

kl.sourcesReady.AddSource(u.Source)

case e := <-plegCh:

case <-syncCh:
// 获取需要同步的pod,里面的逻辑暂不细看
// 我们在这里接收到示例中要创建的nginx pod
podsToSync := kl.getPodsToSync()
if len(podsToSync) == 0 {
break
}
klog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))
// 开始处理
handler.HandlePodSyncs(podsToSync)
case update := <-kl.livenessManager.Updates():

case <-housekeepingCh:
if !kl.sourcesReady.AllReady() {
// 清理没有ready,直接跳过
klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
} else {
// 开始清理pod
klog.V(4).Infof("SyncLoop (housekeeping)")
if err := handler.HandlePodCleanups(); err != nil {
klog.Errorf("Failed cleaning pods: %v", err)
}
}
}
return true
}

handler

往前查找代码,handler就是Kubelet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
// 获取pod,然后分发
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
}
}

func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
// 调用UpdatePod的函数
kl.podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
OnCompleteFunc: func(err error) {
if err != nil {
metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start))
}
},
})
}

// 查到初始化的地方 klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
pod := options.Pod
uid := pod.UID
var podUpdates chan UpdatePodOptions
var exists bool

p.podLock.Lock()
defer p.podLock.Unlock()
// 当pod不存在时,满足示例,是新建的pod
if podUpdates, exists = p.podUpdates[uid]; !exists {
podUpdates = make(chan UpdatePodOptions, 1)
p.podUpdates[uid] = podUpdates

// 并发处理
go func() {
defer runtime.HandleCrash()
p.managePodLoop(podUpdates)
}()
}
if !p.isWorking[pod.UID] {
p.isWorking[pod.UID] = true
podUpdates <- *options
} else {
update, found := p.lastUndeliveredWorkUpdate[pod.UID]
if !found || update.UpdateType != kubetypes.SyncPodKill {
p.lastUndeliveredWorkUpdate[pod.UID] = *options
}
}
}

func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
var lastSyncTime time.Time
for update := range podUpdates {
err := func() error {
// 同步pod的函数
err = p.syncPodFn(syncPodOptions{
mirrorPod: update.MirrorPod,
pod: update.Pod,
podStatus: status,
killPodOptions: update.KillPodOptions,
updateType: update.UpdateType,
})
lastSyncTime = time.Now()
return err
}()

p.wrapUp(update.Pod.UID, err)
}
}

// 找到syncPodFn被实例化的函数
func (kl *Kubelet) syncPod(o syncPodOptions) error {

// 这里有一长串逻辑,不方便阅读,我们只关注最核心的部分

// 调用 container runtime进行创建pod,再往下就是容器相关了
result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
kl.reasonCache.Update(pod.UID, result)
if err := result.Error(); err != nil {
for _, r := range result.SyncResults {
if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {
return err
}
}
return nil
}
return nil
}

Summary

  1. kubelet是kubernetes的Node节点上的管理者

  2. kubelet接收来自kube-apiserver上的pod消息,用Ticker这种周期性的方式触发同步函数

  3. kubelet会异步地对容器进行管理,调用对应容器的接口(Container Runtime Interface)

Github: https://github.com/Junedayday/code_reading

Blog: http://junes.tech/

Bilibili:https://space.bilibili.com/293775192

公众号:golangcoding

聚焦目标

理解 kube-controller-manager 的运行机制

目录

  1. 运行的主函数
  2. 控制器的启动函数
  3. 引入概念ReplicaSet
  4. 查看ReplicaSetController
  5. ReplicaSet的核心实现函数
  6. 总结

Run

我们找到了对应的主函数,看看其中的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
// configz 模块,在kube-scheduler分析中已经了解
if cfgz, err := configz.New(ConfigzName); err == nil {
cfgz.Set(c.ComponentConfig)
} else {
klog.Errorf("unable to register configz: %v", err)
}

// 健康监测与http服务,跳过
var checks []healthz.HealthChecker
var unsecuredMux *mux.PathRecorderMux

run := func(ctx context.Context) {
rootClientBuilder := controller.SimpleControllerClientBuilder{
ClientConfig: c.Kubeconfig,
}

// client认证相关
var clientBuilder controller.ControllerClientBuilder

// 创建controller的上下文context
controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
if err != nil {
klog.Fatalf("error building controller context: %v", err)
}
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController

if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
klog.Fatalf("error starting controllers: %v", err)
}

// 这里的 InformerFactory 和我们在kube-scheduler中看的 SharedInformerFactory 基本一致
controllerContext.InformerFactory.Start(controllerContext.Stop)
controllerContext.ObjectOrMetadataInformerFactory.Start(controllerContext.Stop)
close(controllerContext.InformersStarted)

select {}
}

// 是否进行选举
if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
run(context.TODO())
panic("unreachable")
}

// 拼接出一个全局唯一的id
id, err := os.Hostname()
if err != nil {
return err
}
id = id + "_" + string(uuid.NewUUID())

rl, err := resourcelock.New(c.ComponentConfig.Generic.LeaderElection.ResourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
c.ComponentConfig.Generic.LeaderElection.ResourceName,
c.LeaderElectionClient.CoreV1(),
c.LeaderElectionClient.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: c.EventRecorder,
})
if err != nil {
klog.Fatalf("error creating lock: %v", err)
}

// 正常情况下都是阻塞在RunOrDie这个函数中,不停地进行选举相关的工作
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
// 开始成为Leader的时候,调用run函数
OnStartedLeading: run,
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
},
WatchDog: electionChecker,
Name: "kube-controller-manager",
})
panic("unreachable")
}

StartControllers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error {
// 关键性的循环,启动每个controllers,key为控制器名字,value为初始化函数
for controllerName, initFn := range controllers {
// 是否允许启动
if !ctx.IsControllerEnabled(controllerName) {
klog.Warningf("%q is disabled", controllerName)
continue
}
time.Sleep(wait.Jitter(ctx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
klog.V(1).Infof("Starting %q", controllerName)
// 调用init函数进行启动
debugHandler, started, err := initFn(ctx)
if err != nil {
klog.Errorf("Error starting %q", controllerName)
return err
}
if !started {
klog.Warningf("Skipping %q", controllerName)
continue
}
// 注册对应controller到debug的url中
if debugHandler != nil && unsecuredMux != nil {
basePath := "/debug/controllers/" + controllerName
unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler))
unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler))
}
klog.Infof("Started %q", controllerName)
}

return nil
}

// 我们再去传入controller的函数去看看,对应的controller有哪些,这里有我们很多常见的概念,今天不一一细讲
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
controllers := map[string]InitFunc{}
controllers["endpoint"] = startEndpointController
controllers["endpointslice"] = startEndpointSliceController
controllers["endpointslicemirroring"] = startEndpointSliceMirroringController
controllers["replicationcontroller"] = startReplicationController
controllers["podgc"] = startPodGCController
controllers["resourcequota"] = startResourceQuotaController
controllers["namespace"] = startNamespaceController
controllers["serviceaccount"] = startServiceAccountController
controllers["garbagecollector"] = startGarbageCollectorController
controllers["daemonset"] = startDaemonSetController
controllers["job"] = startJobController
controllers["deployment"] = startDeploymentController
controllers["replicaset"] = startReplicaSetController
controllers["horizontalpodautoscaling"] = startHPAController
controllers["disruption"] = startDisruptionController
controllers["statefulset"] = startStatefulSetController
controllers["cronjob"] = startCronJobController
controllers["csrsigning"] = startCSRSigningController
controllers["csrapproving"] = startCSRApprovingController
controllers["csrcleaner"] = startCSRCleanerController
controllers["ttl"] = startTTLController
controllers["bootstrapsigner"] = startBootstrapSignerController
controllers["tokencleaner"] = startTokenCleanerController
controllers["nodeipam"] = startNodeIpamController
controllers["nodelifecycle"] = startNodeLifecycleController
if loopMode == IncludeCloudLoops {
controllers["service"] = startServiceController
controllers["route"] = startRouteController
controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
}
controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
controllers["attachdetach"] = startAttachDetachController
controllers["persistentvolume-expander"] = startVolumeExpandController
controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
controllers["pvc-protection"] = startPVCProtectionController
controllers["pv-protection"] = startPVProtectionController
controllers["ttl-after-finished"] = startTTLAfterFinishedController
controllers["root-ca-cert-publisher"] = startRootCACertPublisher
controllers["ephemeral-volume"] = startEphemeralVolumeController

return controllers
}

ReplicaSet

由于我们的示例是创建一个nginx的pod,涉及到kube-controller-manager的内容很少。

但是,为了加深大家对 kube-controller-manager 的认识,我们引入一个新的概念 - ReplicaSet,下面是官方说明:

A ReplicaSet’s purpose is to maintain a stable set of replica Pods running at any given time. As such, it is often used to guarantee the availability of a specified number of identical Pods.

ReplicaSet 的目的是维护一组在任何时候都处于运行状态的 Pod 副本的稳定集合。 因此,它通常用来保证给定数量的、完全相同的 Pod 的可用性。

简单来说,ReplicaSet 就是用来生成指定个数的Pod。

ReplicaSetController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {
return nil, false, nil
}

// 用goroutine异步运行,包含了 ReplicaSet和Pod 的两个Informer
// 这一点很好理解:我们是要控制ReplicaSet声明的数量和运行的Pod数量一致,需要同时观察者两种资源
go replicaset.NewReplicaSetController(
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
replicaset.BurstReplicas,
).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
return nil, true, nil
}

// 运行函数
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer rsc.queue.ShutDown()

controllerName := strings.ToLower(rsc.Kind)
klog.Infof("Starting %v controller", controllerName)
defer klog.Infof("Shutting down %v controller", controllerName)

if !cache.WaitForNamedCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
return
}

for i := 0; i < workers; i++ {
// 工作的函数
go wait.Until(rsc.worker, time.Second, stopCh)
}

<-stopCh
}

func (rsc *ReplicaSetController) worker() {
// 继续查找实现
for rsc.processNextWorkItem() {
}
}

func (rsc *ReplicaSetController) processNextWorkItem() bool {
// 这里也有个queue的概念,可以类比kube-scheduler中的实现
// 不同的是,这里的queue是 workqueue.RateLimitingInterface ,也就是限制速率的,具体实现今天不细看

// 获取元素
key, quit := rsc.queue.Get()
if quit {
return false
}
defer rsc.queue.Done(key)

// 处理对应的元素
err := rsc.syncHandler(key.(string))
if err == nil {
rsc.queue.Forget(key)
return true
}

utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
rsc.queue.AddRateLimited(key)

return true
}

// 再回过头,去查看syncHandler的具体实现
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {

rsc.syncHandler = rsc.syncReplicaSet

return rsc
}

syncReplicaSet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
}()

// 从key中拆分出 namespace 和 name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}

// 根据name,从 Lister 获取对应的 ReplicaSets 信息
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
if errors.IsNotFound(err) {
klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)
rsc.expectations.DeleteExpectations(key)
return nil
}
if err != nil {
return err
}

rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
// 获取 selector (k8s 是根据selector中的label来匹配 ReplicaSets 和 Pod 的)
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err))
return nil
}

// 根据namespace和labels获取所有的pod
allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
if err != nil {
return err
}

// 过滤无效的pod
filteredPods := controller.FilterActivePods(allPods)

// 根据selector再过滤pod
filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
if err != nil {
return err
}

var manageReplicasErr error
if rsNeedsSync && rs.DeletionTimestamp == nil {
// 管理 ReplicaSet,下面详细分析
manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
}
rs = rs.DeepCopy()
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)

// 更新状态
updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
if err != nil {
return err
}
if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
}
return manageReplicasErr
}

// 我们再一起看看,当Pod数量和ReplicaSet中声明的不同时,是怎么工作的
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
// diff = 当前pod数 - 期望pod数
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
rsKey, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
return nil
}

// diff小于0,表示需要扩容,即新增Pod
if diff < 0 {

// 具体的实现暂时不细看

// diff 大于0,即需要缩容
} else if diff > 0 {

}

return nil
}

Summary

kube-controller-manager 的核心思想是: 根据期望状态当前状态,管理Kubernetes中的资源。

以ReplicaSet为例,它对比了定义声明的Pod数当前集群中满足条件的Pod数,进行相对应的扩缩容。

Github: https://github.com/Junedayday/code_reading

Blog: http://junes.tech/

Bilibili:https://space.bilibili.com/293775192

公众号:golangcoding

聚焦目标

理解一个pod的被调度的大致流程

目录

  1. 分析Scheduler的结构体
  2. 往SchedulingQueue里
  3. 调度一个pod对象
    1. 调度计算结果 - ScheduleResult
    2. 初步推算 - Assume
    3. 实际绑定 - Bind
  4. 将绑定成功后的数据更新到etcd
  5. pod绑定Node的总结

Scheduler

在前面,我们了解了Pod调度算法的注册Informer机制来监听kube-apiserver上的资源变化,今天这一讲,我们就将两者串联起来,看看在kube-scheduler中,Informer监听到资源变化后,如何用调度算法将pod进行调度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// 在运行 kube-scheduler 的初期,我们创建了一个Scheduler的数据结构,回头再看看有什么和pod调度算法相关的
type Scheduler struct {
SchedulerCache internalcache.Cache
Algorithm core.ScheduleAlgorithm

// 获取下一个需要调度的Pod
NextPod func() *framework.QueuedPodInfo

Error func(*framework.QueuedPodInfo, error)
StopEverything <-chan struct{}

// 等待调度的Pod队列,我们重点看看这个队列是什么
SchedulingQueue internalqueue.SchedulingQueue

Profiles profile.Map
scheduledPodsHasSynced func() bool
client clientset.Interface
}

// Scheduler的实例化函数
func New(){
var sched *Scheduler
switch {
// 从 Provider 创建
case source.Provider != nil:
sc, err := configurator.createFromProvider(*source.Provider)
sched = sc
// 从文件或者ConfigMap中创建
case source.Policy != nil:
sc, err := configurator.createFromConfig(*policy)
sched = sc
default:
return nil, fmt.Errorf("unsupported algorithm source: %v", source)
}
}

// 两个创建方式,底层都是调用的 create 函数
func (c *Configurator) createFromProvider(providerName string) (*Scheduler, error) {
return c.create()
}
func (c *Configurator) createFromConfig(policy schedulerapi.Policy) (*Scheduler, error){
return c.create()
}

func (c *Configurator) create() (*Scheduler, error) {
// 实例化 podQueue
podQueue := internalqueue.NewSchedulingQueue(
lessFn,
internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),
internalqueue.WithPodNominator(nominator),
)

return &Scheduler{
SchedulerCache: c.schedulerCache,
Algorithm: algo,
Profiles: profiles,
// NextPod 函数依赖于 podQueue
NextPod: internalqueue.MakeNextPodFunc(podQueue),
Error: MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),
StopEverything: c.StopEverything,
// 调度队列被赋值为podQueue
SchedulingQueue: podQueue,
}, nil
}

// 再看看这个调度队列的初始化函数,从命名可以看到是一个优先队列,它的实现细节暂不细看
// 结合实际情况思考下,pod会有重要程度的区分,所以调度的顺序需要考虑优先级的
func NewSchedulingQueue(lessFn framework.LessFunc, opts ...Option) SchedulingQueue {
return NewPriorityQueue(lessFn, opts...)
}

SchedulingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// 在上面实例化Scheduler后,有个注册事件 Handler 的函数:addAllEventHandlers(sched, informerFactory, podInformer)
func addAllEventHandlers(
sched *Scheduler,
informerFactory informers.SharedInformerFactory,
podInformer coreinformers.PodInformer,
) {
/*
函数前后有很多注册的Handler,但是和未调度pod添加到队列相关的,只有这个
*/
podInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
// 定义过滤函数:必须为未调度的pod
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return !assignedPod(t) && responsibleForPod(t, sched.Profiles)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
return !assignedPod(pod) && responsibleForPod(pod, sched.Profiles)
}
utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
return false
default:
utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
return false
}
},
// 增改删三个操作对应的Handler,操作到对应的Queue
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: sched.addPodToSchedulingQueue,
UpdateFunc: sched.updatePodInSchedulingQueue,
DeleteFunc: sched.deletePodFromSchedulingQueue,
},
},
)
}

// 牢记我们第一阶段要分析的对象:create nginx pod,所以进入这个add的操作,对应加入到队列
func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
pod := obj.(*v1.Pod)
klog.V(3).Infof("add event for unscheduled pod %s/%s", pod.Namespace, pod.Name)
// 加入到队列
if err := sched.SchedulingQueue.Add(pod); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
}
}

// 入队操作我们清楚了,那出队呢?我们回过头去看看上面定义的NextPod的方法实现
func MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo {
return func() *framework.QueuedPodInfo {
// 从队列中弹出
podInfo, err := queue.Pop()
if err == nil {
klog.V(4).Infof("About to try and schedule pod %v/%v", podInfo.Pod.Namespace, podInfo.Pod.Name)
return podInfo
}
klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err)
return nil
}
}

scheduleOne

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// 了解入队和出队操作后,我们看一下Scheduler运行的过程
func (sched *Scheduler) Run(ctx context.Context) {
if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
return
}
sched.SchedulingQueue.Run()
// 调度一个pod对象
wait.UntilWithContext(ctx, sched.scheduleOne, 0)
sched.SchedulingQueue.Close()
}

// 接下来scheduleOne方法代码很长,我们一步一步来看
func (sched *Scheduler) scheduleOne(ctx context.Context) {
// podInfo 就是从队列中获取到的pod对象
podInfo := sched.NextPod()
// 检查pod的有效性
if podInfo == nil || podInfo.Pod == nil {
return
}
pod := podInfo.Pod
// 根据定义的 pod.Spec.SchedulerName 查到对应的profile
prof, err := sched.profileForPod(pod)
if err != nil {
klog.Error(err)
return
}
// 可以跳过调度的情况,一般pod进不来
if sched.skipPodSchedule(prof, pod) {
return
}

// 调用调度算法,获取结果
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
if err != nil {
/*
出现调度失败的情况:
这个时候可能会触发抢占preempt,抢占是一套复杂的逻辑,后面我们专门会讲
目前假设各类资源充足,能正常调度
*/
}
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))

// assumePod 是假设这个Pod按照前面的调度算法分配后,进行验证
assumedPodInfo := podInfo.DeepCopy()
assumedPod := assumedPodInfo.Pod
// SuggestedHost 为建议的分配的Host
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
if err != nil {
// 失败就重新分配,不考虑这种情况
}

// 运行相关插件的代码先跳过

// 异步绑定pod
go func() {

// 有一系列的检查工作

// 真正做绑定的动作
err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state)
if err != nil {
// 错误处理,清除状态并重试
} else {
// 打印结果,调试时将log level调整到2以上
if klog.V(2).Enabled() {
klog.InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
}
// metrics中记录相关的监控指标
metrics.PodScheduled(prof.Name, metrics.SinceInSeconds(start))
metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts))
metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))

// 运行绑定后的插件
prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
}
}()
}

ScheduleResult

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// 调用算法下的Schedule
func New(){
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
}

func (c *Configurator) create() (*Scheduler, error) {
algo := core.NewGenericScheduler(
c.schedulerCache,
c.nodeInfoSnapshot,
extenders,
c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
c.disablePreemption,
c.percentageOfNodesToScore,
)
return &Scheduler{
Algorithm: algo,
}, nil
}

// genericScheduler 的 Schedule 的实现
func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
// 对 pod 进行 pvc 的信息检查
if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
return result, err
}
// 对当前的信息做一个快照
if err := g.snapshot(); err != nil {
return result, err
}
// Node 节点数量为0,表示无可用节点
if g.nodeInfoSnapshot.NumNodes() == 0 {
return result, ErrNoNodesAvailable
}
// Predict阶段:找到所有满足调度条件的节点feasibleNodes,不满足的就直接过滤
feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod)
// 没有可用节点直接报错
if len(feasibleNodes) == 0 {
return result, &FitError{
Pod: pod,
NumAllNodes: g.nodeInfoSnapshot.NumNodes(),
FilteredNodesStatuses: filteredNodesStatuses,
}
}
// 只有一个节点就直接选用
if len(feasibleNodes) == 1 {
return ScheduleResult{
SuggestedHost: feasibleNodes[0].Name,
EvaluatedNodes: 1 + len(filteredNodesStatuses),
FeasibleNodes: 1,
}, nil
}
// Priority阶段:通过打分,找到一个分数最高、也就是最优的节点
priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, feasibleNodes)
host, err := g.selectHost(priorityList)

return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(feasibleNodes) + len(filteredNodesStatuses),
FeasibleNodes: len(feasibleNodes),
}, err
}

/*
Predict 和 Priority 是选择调度节点的两个关键性步骤, 它的底层调用了各种algorithm算法。我们暂时不细看。
以我们前面讲到过的 NodeName 算法为例,节点必须与 NodeName 匹配,它是属于Predict阶段的。
*/

Assume

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
// 将 host 填入到 pod spec字段的nodename,假定分配到对应的节点上
assumed.Spec.NodeName = host
// 调用 SchedulerCache 下的 AssumePod
if err := sched.SchedulerCache.AssumePod(assumed); err != nil {
klog.Errorf("scheduler cache AssumePod failed: %v", err)
return err
}
if sched.SchedulingQueue != nil {
sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
}
return nil
}

// 回头去找 SchedulerCache 初始化的地方
func (c *Configurator) create() (*Scheduler, error) {
return &Scheduler{
SchedulerCache: c.schedulerCache,
}, nil
}

func New() (*Scheduler, error) {
// 这里就是初始化的实例 schedulerCache
schedulerCache := internalcache.New(30*time.Second, stopEverything)
configurator := &Configurator{
schedulerCache: schedulerCache,
}
}

// 看看AssumePod做了什么
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
// 获取 pod 的 uid
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
// 加锁操作,保证并发情况下的一致性
cache.mu.Lock()
defer cache.mu.Unlock()
// 根据 uid 找不到 pod 当前的状态
if _, ok := cache.podStates[key]; ok {
return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)
}

// 把 Assume Pod 的信息放到对应 Node 节点中
cache.addPod(pod)
// 把 pod 状态设置为 Assume 成功
ps := &podState{
pod: pod,
}
cache.podStates[key] = ps
cache.assumedPods[key] = true
return nil
}

Bind

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (sched *Scheduler) bind(ctx context.Context, prof *profile.Profile, assumed *v1.Pod, targetNode string, state *framework.CycleState) (err error) {
start := time.Now()
// 把 assumed 的 pod 信息保存下来
defer func() {
sched.finishBinding(prof, assumed, targetNode, start, err)
}()
// 阶段1: 运行扩展绑定进行验证,如果已经绑定报错
bound, err := sched.extendersBinding(assumed, targetNode)
if bound {
return err
}
// 阶段2:运行绑定插件验证状态
bindStatus := prof.RunBindPlugins(ctx, state, assumed, targetNode)
if bindStatus.IsSuccess() {
return nil
}
if bindStatus.Code() == framework.Error {
return bindStatus.AsError()
}
return fmt.Errorf("bind status: %s, %v", bindStatus.Code().String(), bindStatus.Message())
}

Update To Etcd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 这块的代码我不做细致的逐层分析了,大家根据兴趣自行探索
func (b DefaultBinder) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
klog.V(3).Infof("Attempting to bind %v/%v to %v", p.Namespace, p.Name, nodeName)
binding := &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID},
Target: v1.ObjectReference{Kind: "Node", Name: nodeName},
}
// ClientSet就是访问kube-apiserver的客户端,将数据更新上去
err := b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
return nil
}

Summary

今天这一次分享比较长,我们一起来总结一下:

  1. Pod的调度是通过一个队列SchedulingQueue异步工作的
    1. 监听到对应pod事件后,放入队列
    2. 有个消费者从队列中获取pod,进行调度
  2. 单个pod的调度主要分为3个步骤:
    1. 根据Predict和Priority两个阶段,调用各自的算法插件,选择最优的Node
    2. Assume这个Pod被调度到对应的Node,保存到cache
    3. 用extender和plugins进行验证,如果通过则绑定
  3. 绑定成功后,将数据通过client向kube-apiserver发送,更新etcd

Github: https://github.com/Junedayday/code_reading

Blog: http://junes.tech/

Bilibili:https://space.bilibili.com/293775192

公众号:golangcoding

聚焦目标

了解Informer在发现资源变化后,是怎么处理的

目录

  1. 查看消费的过程
  2. 掌握Index数据结构
  3. 信息的分发distribute
  4. Informer的综合思考

Process

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
func (c *controller) processLoop() {
for {
// Pop出Object元素
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// 重新进队列
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}

// 去查看Pop的具体实现
func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
// 调用process去处理item,然后返回
item, ok := f.items[id]
delete(f.items, id)
err := process(item)
return item, err
}
}

// 然后去查一下 PopProcessFunc 的定义,在创建controller前
cfg := &Config{
Process: s.HandleDeltas,
}

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()

for _, d := range obj.(Deltas) {
switch d.Type {
// 增、改、替换、同步
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
// 先去indexer查询
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
// 如果数据已经存在,就执行Update逻辑
if err := s.indexer.Update(d.Object); err != nil {
return err
}

isSync := false
switch {
case d.Type == Sync:
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
// 分发Update事件
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
// 没查到数据,就执行Add操作
if err := s.indexer.Add(d.Object); err != nil {
return err
}
// 分发 Add 事件
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
// 删除
case Deleted:
// 去indexer删除
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
// 分发 delete 事件
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}

Index

Index 的定义为资源的本地存储,保持与etcd中的资源信息一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 我们去看看Index是怎么创建的
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
// indexer 的初始化
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
clock: realClock,
}
return sharedIndexInformer
}

// 生成一个map和func组合而成的Indexer
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}

// ThreadSafeStore的底层是一个并发安全的map,具体实现我们暂不考虑
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
return &threadSafeMap{
items: map[string]interface{}{},
indexers: indexers,
indices: indices,
}
}

distribute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 在上面的Process代码中,我们看到了将数据存储到Indexer后,调用了一个分发的函数
s.processor.distribute()

// 分发process的创建
func NewSharedIndexInformer() SharedIndexInformer {
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
}
return sharedIndexInformer
}

// sharedProcessor的结构
type sharedProcessor struct {
listenersStarted bool
// 读写锁
listenersLock sync.RWMutex
// 普通监听列表
listeners []*processorListener
// 同步监听列表
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
}

// 查看distribute函数
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// 将object分发到 同步监听 或者 普通监听 的列表
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}

// 这个add的操作是利用了channel
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}

Summary

  1. Informer 依赖于 Reflector 模块,它有个组件为 xxxInformer,如 podInformer
  2. 具体资源的 Informer 包含了一个连接到kube-apiserverclient,通过ListWatch接口查询资源变更情况
  3. 检测到资源发生变化后,通过Controller 将数据放入队列DeltaFIFOQueue里,生产阶段完成
  4. DeltaFIFOQueue的另一端,有消费者在不停地处理资源变化的事件,处理逻辑主要分2步
    1. 将数据保存到本地存储Indexer,它的底层实现是一个并发安全的threadSafeMap
    2. 有些组件需要实时关注资源变化,会实时监听listen,就将事件分发到对应注册上来的listener上,自行处理

Github: https://github.com/Junedayday/code_reading

Blog: http://junes.tech/

Bilibili:https://space.bilibili.com/293775192

公众号:golangcoding

聚焦目标

了解Informer是如何从kube-apiserver监听资源变化的情况

目录

  1. 什么是Informer
  2. Shared Informer的实现
  3. PodInformer的背后的实现
  4. 聚焦Reflect结构
  5. 本节小节

Informer

什么是Informer?这一节,我将先抛开代码,重点讲一下这个Informer,因为它是理解k8s运行机制的核心概念。

我没有在官方文档中找到Informer的明确定义,中文直译为通知器。从这个链接中,我们可以看到一个自定义资源的的处理流程。

我简单概况下,Informer的核心功能是 获取并监听(ListAndWatch)对应资源的增删改,触发相应的事件操作(ResourceEventHandler)

Shared Informer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/*
client 是连接到 kube-apiserver 的客户端。
我们要理解k8s的设计:
1. etcd是核心的数据存储,对资源的修改会进行持久化
2. 只有kube-apiserver可以访问etcd
所以,kube-scheduler要了解资源的变化情况,只能通过kube-apiserver
*/

// 定义了 Shared Informer,其中这个client是用来连接kube-apiserver的
c.InformerFactory = informers.NewSharedInformerFactory(client, 0)

// 这里解答了为什么叫shared:一个资源会对应多个Informer,会导致效率低下,所以让一个资源对应一个sharedInformer,而一个sharedInformer内部自己维护多个Informer
type sharedInformerFactory struct {
client kubernetes.Interface
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
lock sync.Mutex
defaultResync time.Duration
customResync map[reflect.Type]time.Duration
// 这个map就是维护多个Informer的关键实现
informers map[reflect.Type]cache.SharedIndexInformer
startedInformers map[reflect.Type]bool
}

// 运行函数
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
// goroutine异步处理
go informer.Run(stopCh)
// 标记为已经运行,这样即使下次Start也不会重复运行
f.startedInformers[informerType] = true
}
}
}

// 查找对应的informer
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
// 找到就直接返回
informerType := reflect.TypeOf(obj)
informer, exists := f.informers[informerType]
if exists {
return informer
}

resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}
// 没找到就会新建
informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
}

// SharedInformerFactory 是 sharedInformerFactory 的接口定义
type SharedInformerFactory interface {
// 我们这一阶段关注的Pod的Informer,属于核心资源
Core() core.Interface
}

// core.Interface的定义
type Interface interface {
// V1 provides access to shared informers for resources in V1.
V1() v1.Interface
}

// v1.Interface 的定义
type Interface interface {
// Pod的定义
Pods() PodInformer
}

// PodInformer 是对应的接口
type PodInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.PodLister
}
// podInformer 是具体的实现
type podInformer struct {
factory internalinterfaces.SharedInformerFactory
tweakListOptions internalinterfaces.TweakListOptionsFunc
namespace string
}

// 最后,我们可以看到podInformer调用了InformerFor函数进行了添加
func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

PodInformer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// 实例化PodInformer,把对应的List/Watch操作方法传入到实例化函数,生成统一的SharedIndexInformer接口
func NewFilteredPodInformer() cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
// List和Watch实现从PodInterface里面查询
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
},
},
&corev1.Pod{},
resyncPeriod,
indexers,
)
}

// 我们先看看Pod基本的List和Watch是怎么定义的
// Pod基本的增删改查等操作
type PodInterface interface {
List(ctx context.Context, opts metav1.ListOptions) (*v1.PodList, error)
Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
...
}
// pods 是PodInterface的实现
type pods struct {
client rest.Interface
ns string
}

// List 和 Watch 是依赖客户端,也就是从kube-apiserver中查询的
func (c *pods) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PodList, err error) {
err = c.client.Get().
Namespace(c.ns).
Resource("pods").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Do(ctx).
Into(result)
return
}
func (c *pods) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
return c.client.Get().
Namespace(c.ns).
Resource("pods").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Watch(ctx)
}

// 在上面,我们看到了异步运行Informer的代码 go informer.Run(stopCh),我们看看是怎么run的
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
// 这里有个 DeltaFIFO 的对象,
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})
// 传入这个fifo到cfg
cfg := &Config{
Queue: fifo,
...
}
// 新建controller
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()

s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// 运行controller
s.controller.Run(stopCh)
}

// Controller的运行
func (c *controller) Run(stopCh <-chan struct{}) {
//
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}

c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()

var wg wait.Group
// 生产,往Queue里放数据
wg.StartWithChannel(stopCh, r.Run)
// 消费,从Queue消费数据
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}

Reflect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// 我们再回头看看这个Reflect结构
r := NewReflector(
// ListerWatcher 我们已经有了解,就是通过client监听kube-apiserver暴露出来的Resource
c.config.ListerWatcher,
c.config.ObjectType,
// Queue 是我们前文看到的一个 DeltaFIFOQueue,认为这是一个先进先出的队列
c.config.Queue,
c.config.FullResyncPeriod,
)

func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
// 调用了ListAndWatch
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// watchHandler顾名思义,就是Watch到对应的事件,调用对应的Handler
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
switch {
case isExpiredError(err):
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
}
}
return nil
}
}
}

func (r *Reflector) watchHandler() error {
loop:
for {
// 一个经典的GO语言select监听多channel的模式
select {
// 整体的step channel
case <-stopCh:
return errorStopRequested
// 错误相关的error channel
case err := <-errc:
return err
// 接收事件event的channel
case event, ok := <-w.ResultChan():
// channel被关闭,退出loop
if !ok {
break loop
}

// 一系列的资源验证代码跳过

switch event.Type {
// 增删改三种Event,分别对应到去store,即DeltaFIFO中,操作object
case watch.Added:
err := r.store.Add(event.Object)
case watch.Modified:
err := r.store.Update(event.Object)
case watch.Deleted:
err := r.store.Delete(event.Object)
case watch.Bookmark:
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
}
}
}
return nil
}

Summary

  1. Informer 依赖于 Reflector 模块,它有个组件为 xxxInformer,如 podInformer
  2. 具体资源的 Informer 包含了一个连接到kube-apiserverclient,通过ListWatch接口查询资源变更情况
  3. 检测到资源发生变化后,通过Controller 将数据放入队列DeltaFIFOQueue里,生产阶段完成

Github: https://github.com/Junedayday/code_reading

Blog: http://junes.tech/

Bilibili:https://space.bilibili.com/293775192

公众号:golangcoding