@@ -94,7 +94,8 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
9494 },
9595 Workers : 0 ,
9696 },
97- DataDir : config .DataDir ,
97+ DataDir : config .DataDir ,
98+ QueueName : config .Name + "-level" ,
9899 }
99100
100101 levelQueue , err := NewLevelQueue (wrappedHandle , levelCfg , exemplar )
@@ -172,16 +173,18 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) {
172173 atShutdown (q .Shutdown )
173174 atTerminate (q .Terminate )
174175
175- if lq , ok := q .internal .(* LevelQueue ); ok && lq .byteFIFO .Len (lq .shutdownCtx ) != 0 {
176+ if lq , ok := q .internal .(* LevelQueue ); ok && lq .byteFIFO .Len (lq .terminateCtx ) != 0 {
176177 // Just run the level queue - we shut it down once it's flushed
177178 go q .internal .Run (func (_ func ()) {}, func (_ func ()) {})
178179 go func () {
179- for ! q .IsEmpty () {
180- _ = q . internal .Flush (0 )
180+ for ! lq .IsEmpty () {
181+ _ = lq .Flush (0 )
181182 select {
182183 case <- time .After (100 * time .Millisecond ):
183- case <- q .internal .(* LevelQueue ).shutdownCtx .Done ():
184- log .Warn ("LevelQueue: %s shut down before completely flushed" , q .internal .(* LevelQueue ).Name ())
184+ case <- lq .shutdownCtx .Done ():
185+ if lq .byteFIFO .Len (lq .terminateCtx ) > 0 {
186+ log .Warn ("LevelQueue: %s shut down before completely flushed" , q .internal .(* LevelQueue ).Name ())
187+ }
185188 return
186189 }
187190 }
@@ -316,10 +319,22 @@ func (q *PersistableChannelQueue) Shutdown() {
316319 // Redirect all remaining data in the chan to the internal channel
317320 log .Trace ("PersistableChannelQueue: %s Redirecting remaining data" , q .delayedStarter .name )
318321 close (q .channelQueue .dataChan )
322+ countOK , countLost := 0 , 0
319323 for data := range q .channelQueue .dataChan {
320- _ = q .internal .Push (data )
324+ err := q .internal .Push (data )
325+ if err != nil {
326+ log .Error ("PersistableChannelQueue: %s Unable redirect %v due to: %v" , q .delayedStarter .name , data , err )
327+ countLost ++
328+ } else {
329+ countOK ++
330+ }
321331 atomic .AddInt64 (& q .channelQueue .numInQueue , - 1 )
322332 }
333+ if countLost > 0 {
334+ log .Warn ("PersistableChannelQueue: %s %d will be restored on restart, %d lost" , q .delayedStarter .name , countOK , countLost )
335+ } else if countOK > 0 {
336+ log .Warn ("PersistableChannelQueue: %s %d will be restored on restart" , q .delayedStarter .name , countOK )
337+ }
323338 log .Trace ("PersistableChannelQueue: %s Done Redirecting remaining data" , q .delayedStarter .name )
324339
325340 log .Debug ("PersistableChannelQueue: %s Shutdown" , q .delayedStarter .name )
0 commit comments