@@ -117,9 +117,6 @@ type InformersMap struct {
117117 // paramCodec is used by list and watch
118118 paramCodec runtime.ParameterCodec
119119
120- // stop is the stop channel to stop informers
121- stop <- chan struct {}
122-
123120 // resync is the base frequency the informers are resynced
124121 // a 10 percent jitter will be added to the resync period between informers
125122 // so that all informers will not send list requests simultaneously.
@@ -128,13 +125,22 @@ type InformersMap struct {
128125 // mu guards access to the map
129126 mu sync.RWMutex
130127
131- // start is true if the informers have been started
128+ // started is true if the informers have been started
132129 started bool
133130
134131 // startWait is a channel that is closed after the
135132 // informer has been started.
136133 startWait chan struct {}
137134
135+ // waitGroup is the wait group that is used to wait for all informers to stop
136+ waitGroup sync.WaitGroup
137+
138+ // stopped is true if the informers have been stopped
139+ stopped bool
140+
141+ // ctx is the context to stop informers
142+ ctx context.Context
143+
138144 // namespace is the namespace that all ListWatches are restricted to
139145 // default or empty string means all namespaces
140146 namespace string
@@ -158,24 +164,42 @@ func (ip *InformersMap) Start(ctx context.Context) error {
158164 defer ip .mu .Unlock ()
159165
160166 // Set the stop channel so it can be passed to informers that are added later
161- ip .stop = ctx .Done ()
167+ ip .ctx = ctx
168+
169+ ip .waitGroup .Add (len (ip .informers .Structured ) + len (ip .informers .Unstructured ) + len (ip .informers .Metadata ))
162170
163171 // Start each informer
164172 for _ , i := range ip .informers .Structured {
165- go i .Informer .Run (ctx .Done ())
173+ i := i
174+ go func () {
175+ defer ip .waitGroup .Done ()
176+ i .Informer .Run (ctx .Done ())
177+ }()
166178 }
167179 for _ , i := range ip .informers .Unstructured {
168- go i .Informer .Run (ctx .Done ())
180+ i := i
181+ go func () {
182+ defer ip .waitGroup .Done ()
183+ i .Informer .Run (ctx .Done ())
184+ }()
169185 }
170186 for _ , i := range ip .informers .Metadata {
171- go i .Informer .Run (ctx .Done ())
187+ i := i
188+ go func () {
189+ defer ip .waitGroup .Done ()
190+ i .Informer .Run (ctx .Done ())
191+ }()
172192 }
173193
174194 // Set started to true so we immediately start any informers added later.
175195 ip .started = true
176196 close (ip .startWait )
177197 }()
178- <- ctx .Done ()
198+ <- ctx .Done () // Block until the context is done
199+ ip .mu .Lock ()
200+ ip .stopped = true // Set stopped to true so we don't start any new informers
201+ ip .mu .Unlock ()
202+ ip .waitGroup .Wait () // Block until all informers have stopped
179203 return nil
180204}
181205
@@ -260,6 +284,10 @@ func (ip *InformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtim
260284 ip .mu .Lock ()
261285 defer ip .mu .Unlock ()
262286
287+ if ip .stopped {
288+ return nil , false , ip .ctx .Err ()
289+ }
290+
263291 // Check the cache to see if we already have an Informer. If we do, return the Informer.
264292 // This is for the case where 2 routines tried to get the informer when it wasn't in the map
265293 // so neither returned early, but the first one created it.
@@ -311,16 +339,16 @@ func (ip *InformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtim
311339 // TODO(seans): write thorough tests and document what happens here - can you add indexers?
312340 // can you add eventhandlers?
313341 if ip .started {
314- go i .Informer .Run (ip .stop )
342+ ip .waitGroup .Add (1 )
343+ go func () {
344+ defer ip .waitGroup .Done ()
345+ i .Informer .Run (ip .ctx .Done ())
346+ }()
315347 }
316348 return i , ip .started , nil
317349}
318350
319351func (ip * InformersMap ) makeListWatcher (gvk schema.GroupVersionKind , obj runtime.Object ) (* cache.ListWatch , error ) {
320- // TODO(vincepri): Wire the context in here and don't use TODO().
321- // Can we use the context from the Get call?
322- ctx := context .TODO ()
323-
324352 // Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
325353 // groupVersionKind to the Resource API we will use.
326354 mapping , err := ip .mapper .RESTMapping (gvk .GroupKind (), gvk .Version )
@@ -351,16 +379,16 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
351379 return & cache.ListWatch {
352380 ListFunc : func (opts metav1.ListOptions ) (runtime.Object , error ) {
353381 if namespace != "" {
354- return resources .Namespace (namespace ).List (ctx , opts )
382+ return resources .Namespace (namespace ).List (ip . ctx , opts )
355383 }
356- return resources .List (ctx , opts )
384+ return resources .List (ip . ctx , opts )
357385 },
358386 // Setup the watch function
359387 WatchFunc : func (opts metav1.ListOptions ) (watch.Interface , error ) {
360388 if namespace != "" {
361- return resources .Namespace (namespace ).Watch (ctx , opts )
389+ return resources .Namespace (namespace ).Watch (ip . ctx , opts )
362390 }
363- return resources .Watch (ctx , opts )
391+ return resources .Watch (ip . ctx , opts )
364392 },
365393 }, nil
366394 //
@@ -386,9 +414,9 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
386414 err error
387415 )
388416 if namespace != "" {
389- list , err = resources .Namespace (namespace ).List (ctx , opts )
417+ list , err = resources .Namespace (namespace ).List (ip . ctx , opts )
390418 } else {
391- list , err = resources .List (ctx , opts )
419+ list , err = resources .List (ip . ctx , opts )
392420 }
393421 if list != nil {
394422 for i := range list .Items {
@@ -400,9 +428,9 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
400428 // Setup the watch function
401429 WatchFunc : func (opts metav1.ListOptions ) (watcher watch.Interface , err error ) {
402430 if namespace != "" {
403- watcher , err = resources .Namespace (namespace ).Watch (ctx , opts )
431+ watcher , err = resources .Namespace (namespace ).Watch (ip . ctx , opts )
404432 } else {
405- watcher , err = resources .Watch (ctx , opts )
433+ watcher , err = resources .Watch (ip . ctx , opts )
406434 }
407435 if err != nil {
408436 return nil , err
@@ -433,7 +461,7 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
433461
434462 // Create the resulting object, and execute the request.
435463 res := listObj .DeepCopyObject ()
436- if err := req .Do (ctx ).Into (res ); err != nil {
464+ if err := req .Do (ip . ctx ).Into (res ); err != nil {
437465 return nil , err
438466 }
439467 return res , nil
@@ -446,7 +474,7 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
446474 req .Namespace (namespace )
447475 }
448476 // Call the watch.
449- return req .Watch (ctx )
477+ return req .Watch (ip . ctx )
450478 },
451479 }, nil
452480 }
0 commit comments