1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
| func NewFilteredPodInformer() cache.SharedIndexInformer { return cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.CoreV1().Pods(namespace).List(context.TODO(), options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.CoreV1().Pods(namespace).Watch(context.TODO(), options) }, }, &corev1.Pod{}, resyncPeriod, indexers, ) }
type PodInterface interface { List(ctx context.Context, opts metav1.ListOptions) (*v1.PodList, error) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) ... }
type pods struct { client rest.Interface ns string }
func (c *pods) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PodList, err error) { err = c.client.Get(). Namespace(c.ns). Resource("pods"). VersionedParams(&opts, scheme.ParameterCodec). Timeout(timeout). Do(ctx). Into(result) return } func (c *pods) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { return c.client.Get(). Namespace(c.ns). Resource("pods"). VersionedParams(&opts, scheme.ParameterCodec). Timeout(timeout). Watch(ctx) }
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KnownObjects: s.indexer, EmitDeltaTypeReplaced: true, }) cfg := &Config{ Queue: fifo, ... } func() { s.startedLock.Lock() defer s.startedLock.Unlock()
s.controller = New(cfg) s.controller.(*controller).clock = s.clock s.started = true }() s.controller.Run(stopCh) }
func (c *controller) Run(stopCh <-chan struct{}) { r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) r.ShouldResync = c.config.ShouldResync r.clock = c.clock if c.config.WatchErrorHandler != nil { r.watchErrorHandler = c.config.WatchErrorHandler }
c.reflectorMutex.Lock() c.reflector = r c.reflectorMutex.Unlock()
var wg wait.Group wg.StartWithChannel(stopCh, r.Run) wait.Until(c.processLoop, time.Second, stopCh) wg.Wait() }
|