Junedayday Blog

六月天天的个人博客

0%

【K8s源码品读】010:Phase 1 - kube-scheduler - Informer是如何保存数据的

聚焦目标

了解Informer在发现资源变化后,是怎么处理的

目录

  1. 查看消费的过程
  2. 掌握Index数据结构
  3. 信息的分发distribute
  4. Informer的综合思考

Process

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
func (c *controller) processLoop() {
for {
// Pop出Object元素
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// 重新进队列
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}

// 去查看Pop的具体实现
func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
// 调用process去处理item,然后返回
item, ok := f.items[id]
delete(f.items, id)
err := process(item)
return item, err
}
}

// 然后去查一下 PopProcessFunc 的定义,在创建controller前
cfg := &Config{
Process: s.HandleDeltas,
}

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()

for _, d := range obj.(Deltas) {
switch d.Type {
// 增、改、替换、同步
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
// 先去indexer查询
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
// 如果数据已经存在,就执行Update逻辑
if err := s.indexer.Update(d.Object); err != nil {
return err
}

isSync := false
switch {
case d.Type == Sync:
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
// 分发Update事件
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
// 没查到数据,就执行Add操作
if err := s.indexer.Add(d.Object); err != nil {
return err
}
// 分发 Add 事件
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
// 删除
case Deleted:
// 去indexer删除
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
// 分发 delete 事件
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}

Index

Index 的定义为资源的本地存储,保持与etcd中的资源信息一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 我们去看看Index是怎么创建的
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
// indexer 的初始化
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
clock: realClock,
}
return sharedIndexInformer
}

// 生成一个map和func组合而成的Indexer
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}

// ThreadSafeStore的底层是一个并发安全的map,具体实现我们暂不考虑
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
return &threadSafeMap{
items: map[string]interface{}{},
indexers: indexers,
indices: indices,
}
}

distribute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 在上面的Process代码中,我们看到了将数据存储到Indexer后,调用了一个分发的函数
s.processor.distribute()

// 分发process的创建
func NewSharedIndexInformer() SharedIndexInformer {
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
}
return sharedIndexInformer
}

// sharedProcessor的结构
type sharedProcessor struct {
listenersStarted bool
// 读写锁
listenersLock sync.RWMutex
// 普通监听列表
listeners []*processorListener
// 同步监听列表
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
}

// 查看distribute函数
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// 将object分发到 同步监听 或者 普通监听 的列表
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}

// 这个add的操作是利用了channel
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}

Summary

  1. Informer 依赖于 Reflector 模块,它有个组件为 xxxInformer,如 podInformer
  2. 具体资源的 Informer 包含了一个连接到kube-apiserverclient,通过ListWatch接口查询资源变更情况
  3. 检测到资源发生变化后,通过Controller 将数据放入队列DeltaFIFOQueue里,生产阶段完成
  4. DeltaFIFOQueue的另一端,有消费者在不停地处理资源变化的事件,处理逻辑主要分2步
    1. 将数据保存到本地存储Indexer,它的底层实现是一个并发安全的threadSafeMap
    2. 有些组件需要实时关注资源变化,会实时监听listen,就将事件分发到对应注册上来的listener上,自行处理

Github: https://github.com/Junedayday/code_reading

Blog: http://junes.tech/

Bilibili:https://space.bilibili.com/293775192

公众号:golangcoding