@@ -32,7 +32,7 @@ use tokio::{
3232        mpsc:: { self ,  unbounded_channel,  UnboundedReceiver ,  UnboundedSender } , 
3333        oneshot, 
3434    } , 
35-     task:: { self ,  JoinError ,  JoinSet } , 
35+     task:: { self ,  JoinError ,  JoinHandle ,   JoinSet } , 
3636    time:: { self ,  sleep,  MissedTickBehavior } , 
3737} ; 
3838use  tokio_stream:: wrappers:: UnboundedReceiverStream ; 
@@ -61,14 +61,16 @@ pub async fn start_consumer(
6161        . expect ( "Can't subscribe to specified topics" ) ; 
6262
6363    handle_os_signals ( event_sender. clone ( ) ) ; 
64-     poll_consumer_client ( consumer. clone ( ) ,  client_shutdown_receiver) ; 
64+     let  rdkafka_driver =  poll_consumer_client ( consumer. clone ( ) ,  client_shutdown_receiver) ; 
6565    handle_events ( 
6666        consumer, 
6767        event_receiver, 
6868        client_shutdown_sender, 
6969        spawn_actors, 
7070    ) 
71-     . await 
71+     . await ?; 
72+     rdkafka_driver. await ?; 
73+     Ok ( ( ) ) 
7274} 
7375
7476pub  fn  handle_os_signals ( event_sender :  UnboundedSender < ( Event ,  SyncSender < ( ) > ) > )  { 
@@ -85,23 +87,28 @@ pub fn handle_os_signals(event_sender: UnboundedSender<(Event, SyncSender<()>)>)
8587pub  fn  poll_consumer_client ( 
8688    consumer :  Arc < StreamConsumer < KafkaContext > > , 
8789    shutdown :  oneshot:: Receiver < ( ) > , 
88- )  { 
90+ )  ->  JoinHandle < ( ) >   { 
8991    task:: spawn_blocking ( || { 
9092        Handle :: current ( ) . block_on ( async  move  { 
9193            let  _guard = elegant_departure:: get_shutdown_guard ( ) . shutdown_on_drop ( ) ; 
9294            select !  { 
9395                biased; 
94-                 _ = shutdown => { 
95-                     debug!( "Received shutdown signal, commiting state in sync mode..." ) ; 
96-                     let  _ = consumer. commit_consumer_state( rdkafka:: consumer:: CommitMode :: Sync ) ; 
97-                 } 
96+                 _ = shutdown => { } 
9897                msg = consumer. recv( )  => { 
9998                    error!( "Got unexpected message from consumer client: {:?}" ,  msg) ; 
10099                } 
101-             } 
100+ 
101+             } ; 
102+ 
103+             select !  { 
104+                 biased; 
105+                 _ = consumer. recv( )  => { } 
106+                 _ = sleep( Duration :: from_secs( 30 ) )  => { } 
107+             } ; 
108+ 
102109            debug ! ( "Shutdown complete" ) ; 
103110        } ) ; 
104-     } ) ; 
111+     } ) 
105112} 
106113
107114#[ derive( Debug ) ]  
@@ -118,8 +125,20 @@ impl KafkaContext {
118125impl  ClientContext  for  KafkaContext  { } 
119126
120127impl  ConsumerContext  for  KafkaContext  { 
121-     #[ instrument( skip_all) ]  
122-     fn  pre_rebalance ( & self ,  _:  & BaseConsumer < Self > ,  rebalance :  & Rebalance )  { 
128+     #[ instrument( skip( self ,  base_consumer) ) ]  
129+     fn  pre_rebalance ( & self ,  base_consumer :  & BaseConsumer < Self > ,  rebalance :  & Rebalance )  { 
130+         if  let  Rebalance :: Assign ( tpl)  = rebalance { 
131+             if  tpl. count ( )  == 0  { 
132+                 return ; 
133+             } 
134+         } 
135+         base_consumer
136+             . pause ( 
137+                 & base_consumer
138+                     . assignment ( ) 
139+                     . expect ( "Unable to fetch assigned TPL" ) , 
140+             ) 
141+             . expect ( "Unable to pause consumer" ) ; 
123142        let  ( rendezvous_sender,  rendezvous_receiver)  = sync_channel ( 0 ) ; 
124143        match  rebalance { 
125144            Rebalance :: Assign ( tpl)  => { 
@@ -149,6 +168,31 @@ impl ConsumerContext for KafkaContext {
149168        } 
150169    } 
151170
171+     #[ instrument( skip( self ,  base_consumer) ) ]  
172+     fn  post_rebalance ( & self ,  base_consumer :  & BaseConsumer < Self > ,  rebalance :  & Rebalance )  { 
173+         if  let  Rebalance :: Assign ( tpl)  = rebalance { 
174+             if  tpl. count ( )  == 0  { 
175+                 return ; 
176+             } 
177+         } 
178+         let  assignment = base_consumer
179+             . assignment ( ) 
180+             . expect ( "Failed to get assigned TPL" ) ; 
181+         if  assignment. count ( )  != 0  { 
182+             base_consumer
183+                 . seek_partitions ( 
184+                     base_consumer
185+                         . committed ( rdkafka:: util:: Timeout :: Never ) 
186+                         . expect ( "Failed to get commited TPL" ) , 
187+                     rdkafka:: util:: Timeout :: Never , 
188+                 ) 
189+                 . expect ( "Failed to seek to commited offset" ) ; 
190+             base_consumer
191+                 . resume ( & assignment) 
192+                 . expect ( "Failed to resume consumer" ) ; 
193+         } 
194+     } 
195+ 
152196    #[ instrument( skip( self ) ) ]  
153197    fn  commit_callback ( & self ,  result :  KafkaResult < ( ) > ,  _offsets :  & TopicPartitionList )  { 
154198        debug ! ( "Got commit callback" ) ; 
@@ -336,7 +380,7 @@ pub async fn handle_events(
336380
337381    let  mut  state = ConsumerState :: Ready ; 
338382
339-     while  let  ConsumerState :: Ready  {  ..  }   | ConsumerState :: Consuming  {  .. }  = state { 
383+     while  let  ConsumerState :: Ready  | ConsumerState :: Consuming  {  .. }  = state { 
340384        select !  { 
341385            res = match  state { 
342386                ConsumerState :: Consuming ( ref mut  handles,  _)  => Either :: Left ( handles. join_next( ) ) , 
@@ -352,20 +396,30 @@ pub async fn handle_events(
352396                } ; 
353397                info!( "Received event: {:?}" ,  event) ; 
354398                state = match  ( state,  event)  { 
355-                     ( ConsumerState :: Ready ,  Event :: Assign ( assigned ) )  => { 
356-                         ConsumerState :: Consuming ( spawn_actors( consumer. clone( ) ,  & assigned ) ,  assigned ) 
399+                     ( ConsumerState :: Ready ,  Event :: Assign ( tpl ) )  => { 
400+                         ConsumerState :: Consuming ( spawn_actors( consumer. clone( ) ,  & tpl ) ,  tpl ) 
357401                    } 
358402                    ( ConsumerState :: Ready ,  Event :: Revoke ( _) )  => { 
359403                        unreachable!( "Got partition revocation before the consumer has started" ) 
360404                    } 
361405                    ( ConsumerState :: Ready ,  Event :: Shutdown )  => ConsumerState :: Stopped , 
362-                     ( ConsumerState :: Consuming ( _,  _) ,  Event :: Assign ( _) )  => { 
363-                         unreachable!( "Got partition assignment after the consumer has started" ) 
406+                     ( ConsumerState :: Consuming ( handles,  mut  tpl) ,  Event :: Assign ( mut  assigned) )  => { 
407+                         assert!( 
408+                             tpl. is_disjoint( & assigned) , 
409+                             "Newly assigned TPL should be disjoint from TPL we're consuming from" 
410+                         ) ; 
411+                         debug!( 
412+                             "{} additional topic partitions added after assignment" , 
413+                             assigned. len( ) 
414+                         ) ; 
415+                         tpl. append( & mut  assigned) ; 
416+                         handles. shutdown( CALLBACK_DURATION ) . await ; 
417+                         ConsumerState :: Consuming ( spawn_actors( consumer. clone( ) ,  & tpl) ,  tpl) 
364418                    } 
365419                    ( ConsumerState :: Consuming ( handles,  tpl) ,  Event :: Revoke ( revoked) )  => { 
366420                        assert!( 
367-                             tpl ==  revoked, 
368-                             "Revoked TPL should be equal to the  subset of TPL we're consuming from" 
421+                             revoked. is_subset ( & tpl ) , 
422+                             "Revoked TPL should be a  subset of TPL we're consuming from" 
369423                        ) ; 
370424                        handles. shutdown( CALLBACK_DURATION ) . await ; 
371425                        ConsumerState :: Ready 
@@ -734,7 +788,7 @@ impl CommitClient for StreamConsumer<KafkaContext> {
734788    } 
735789} 
736790
737- #[ derive( Default ) ]  
791+ #[ derive( Default ,   Debug ) ]  
738792struct  HighwaterMark  { 
739793    data :  HashMap < ( String ,  i32 ) ,  i64 > , 
740794} 
@@ -779,6 +833,7 @@ pub async fn commit(
779833    while  let  Some ( msgs)  = receiver. recv ( ) . await  { 
780834        let  mut  highwater_mark = HighwaterMark :: new ( ) ; 
781835        msgs. 0 . iter ( ) . for_each ( |msg| highwater_mark. track ( msg) ) ; 
836+         debug ! ( "Store: {:?}" ,  highwater_mark) ; 
782837        consumer. store_offsets ( & highwater_mark. into ( ) ) . unwrap ( ) ; 
783838    } 
784839    debug ! ( "Shutdown complete" ) ; 
0 commit comments