diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 1f520a8d08..f798e0c3d2 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -117,9 +117,6 @@ type InformersMap struct { // paramCodec is used by list and watch paramCodec runtime.ParameterCodec - // stop is the stop channel to stop informers - stop <-chan struct{} - // resync is the base frequency the informers are resynced // a 10 percent jitter will be added to the resync period between informers // so that all informers will not send list requests simultaneously. @@ -128,13 +125,22 @@ type InformersMap struct { // mu guards access to the map mu sync.RWMutex - // start is true if the informers have been started + // started is true if the informers have been started started bool // startWait is a channel that is closed after the // informer has been started. startWait chan struct{} + // waitGroup is the wait group that is used to wait for all informers to stop + waitGroup sync.WaitGroup + + // stopped is true if the informers have been stopped + stopped bool + + // ctx is the context to stop informers + ctx context.Context + // namespace is the namespace that all ListWatches are restricted to // default or empty string means all namespaces namespace string @@ -157,28 +163,47 @@ func (ip *InformersMap) Start(ctx context.Context) error { ip.mu.Lock() defer ip.mu.Unlock() - // Set the stop channel so it can be passed to informers that are added later - ip.stop = ctx.Done() + // Set the context so it can be passed to informers that are added later + ip.ctx = ctx // Start each informer for _, i := range ip.informers.Structured { - go i.Informer.Run(ctx.Done()) + ip.startInformerLocked(i.Informer) } for _, i := range ip.informers.Unstructured { - go i.Informer.Run(ctx.Done()) + ip.startInformerLocked(i.Informer) } for _, i := range ip.informers.Metadata { - go i.Informer.Run(ctx.Done()) + ip.startInformerLocked(i.Informer) } // Set started to true so we immediately start any informers added later. ip.started = true close(ip.startWait) }() - <-ctx.Done() + <-ctx.Done() // Block until the context is done + ip.mu.Lock() + ip.stopped = true // Set stopped to true so we don't start any new informers + ip.mu.Unlock() + ip.waitGroup.Wait() // Block until all informers have stopped return nil } +func (ip *InformersMap) startInformerLocked(informer cache.SharedIndexInformer) { + // Don't start the informer in case we are already waiting for the items in + // the waitGroup to finish, since waitGroups don't support waiting and adding + // at the same time. + if ip.stopped { + return + } + + ip.waitGroup.Add(1) + go func() { + defer ip.waitGroup.Done() + informer.Run(ip.ctx.Done()) + }() +} + func (ip *InformersMap) waitForStarted(ctx context.Context) bool { select { case <-ip.startWait: @@ -307,20 +332,15 @@ func (ip *InformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtim } ip.informersByType(obj)[gvk] = i - // Start the Informer if need by - // TODO(seans): write thorough tests and document what happens here - can you add indexers? - // can you add eventhandlers? + // Start the informer in case the InformersMap has started, otherwise it will be + // started when the InformersMap starts. if ip.started { - go i.Informer.Run(ip.stop) + ip.startInformerLocked(i.Informer) } return i, ip.started, nil } func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Object) (*cache.ListWatch, error) { - // TODO(vincepri): Wire the context in here and don't use TODO(). - // Can we use the context from the Get call? - ctx := context.TODO() - // Kubernetes APIs work against Resources, not GroupVersionKinds. Map the // groupVersionKind to the Resource API we will use. mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) @@ -351,16 +371,16 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime return &cache.ListWatch{ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { if namespace != "" { - return resources.Namespace(namespace).List(ctx, opts) + return resources.Namespace(namespace).List(ip.ctx, opts) } - return resources.List(ctx, opts) + return resources.List(ip.ctx, opts) }, // Setup the watch function WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { if namespace != "" { - return resources.Namespace(namespace).Watch(ctx, opts) + return resources.Namespace(namespace).Watch(ip.ctx, opts) } - return resources.Watch(ctx, opts) + return resources.Watch(ip.ctx, opts) }, }, nil // @@ -386,9 +406,9 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime err error ) if namespace != "" { - list, err = resources.Namespace(namespace).List(ctx, opts) + list, err = resources.Namespace(namespace).List(ip.ctx, opts) } else { - list, err = resources.List(ctx, opts) + list, err = resources.List(ip.ctx, opts) } if list != nil { for i := range list.Items { @@ -400,9 +420,9 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime // Setup the watch function WatchFunc: func(opts metav1.ListOptions) (watcher watch.Interface, err error) { if namespace != "" { - watcher, err = resources.Namespace(namespace).Watch(ctx, opts) + watcher, err = resources.Namespace(namespace).Watch(ip.ctx, opts) } else { - watcher, err = resources.Watch(ctx, opts) + watcher, err = resources.Watch(ip.ctx, opts) } if err != nil { return nil, err @@ -433,7 +453,7 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime // Create the resulting object, and execute the request. res := listObj.DeepCopyObject() - if err := req.Do(ctx).Into(res); err != nil { + if err := req.Do(ip.ctx).Into(res); err != nil { return nil, err } return res, nil @@ -446,7 +466,7 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime req.Namespace(namespace) } // Call the watch. - return req.Watch(ctx) + return req.Watch(ip.ctx) }, }, nil }