@@ -32,7 +32,7 @@ use tokio::{
3232 mpsc:: { self , unbounded_channel, UnboundedReceiver , UnboundedSender } ,
3333 oneshot,
3434 } ,
35- task:: { self , JoinError , JoinSet } ,
35+ task:: { self , JoinError , JoinHandle , JoinSet } ,
3636 time:: { self , sleep, MissedTickBehavior } ,
3737} ;
3838use tokio_stream:: wrappers:: UnboundedReceiverStream ;
@@ -61,14 +61,16 @@ pub async fn start_consumer(
6161 . expect ( "Can't subscribe to specified topics" ) ;
6262
6363 handle_os_signals ( event_sender. clone ( ) ) ;
64- poll_consumer_client ( consumer. clone ( ) , client_shutdown_receiver) ;
64+ let rdkafka_driver = poll_consumer_client ( consumer. clone ( ) , client_shutdown_receiver) ;
6565 handle_events (
6666 consumer,
6767 event_receiver,
6868 client_shutdown_sender,
6969 spawn_actors,
7070 )
71- . await
71+ . await ?;
72+ rdkafka_driver. await ?;
73+ Ok ( ( ) )
7274}
7375
7476pub fn handle_os_signals ( event_sender : UnboundedSender < ( Event , SyncSender < ( ) > ) > ) {
@@ -85,23 +87,28 @@ pub fn handle_os_signals(event_sender: UnboundedSender<(Event, SyncSender<()>)>)
8587pub fn poll_consumer_client (
8688 consumer : Arc < StreamConsumer < KafkaContext > > ,
8789 shutdown : oneshot:: Receiver < ( ) > ,
88- ) {
90+ ) -> JoinHandle < ( ) > {
8991 task:: spawn_blocking ( || {
9092 Handle :: current ( ) . block_on ( async move {
9193 let _guard = elegant_departure:: get_shutdown_guard ( ) . shutdown_on_drop ( ) ;
9294 select ! {
9395 biased;
94- _ = shutdown => {
95- debug!( "Received shutdown signal, commiting state in sync mode..." ) ;
96- let _ = consumer. commit_consumer_state( rdkafka:: consumer:: CommitMode :: Sync ) ;
97- }
96+ _ = shutdown => { }
9897 msg = consumer. recv( ) => {
9998 error!( "Got unexpected message from consumer client: {:?}" , msg) ;
10099 }
101- }
100+
101+ } ;
102+
103+ select ! {
104+ biased;
105+ _ = consumer. recv( ) => { }
106+ _ = sleep( Duration :: from_secs( 9 ) ) => { }
107+ } ;
108+
102109 debug ! ( "Shutdown complete" ) ;
103110 } ) ;
104- } ) ;
111+ } )
105112}
106113
107114#[ derive( Debug ) ]
@@ -118,8 +125,20 @@ impl KafkaContext {
118125impl ClientContext for KafkaContext { }
119126
120127impl ConsumerContext for KafkaContext {
121- #[ instrument( skip_all) ]
122- fn pre_rebalance ( & self , _: & BaseConsumer < Self > , rebalance : & Rebalance ) {
128+ #[ instrument( skip( self , base_consumer) ) ]
129+ fn pre_rebalance ( & self , base_consumer : & BaseConsumer < Self > , rebalance : & Rebalance ) {
130+ if let Rebalance :: Assign ( tpl) = rebalance {
131+ if tpl. count ( ) == 0 {
132+ return ;
133+ }
134+ }
135+ base_consumer
136+ . pause (
137+ & base_consumer
138+ . assignment ( )
139+ . expect ( "Unable to fetch assigned TPL" ) ,
140+ )
141+ . expect ( "Unable to pause consumer" ) ;
123142 let ( rendezvous_sender, rendezvous_receiver) = sync_channel ( 0 ) ;
124143 match rebalance {
125144 Rebalance :: Assign ( tpl) => {
@@ -149,6 +168,31 @@ impl ConsumerContext for KafkaContext {
149168 }
150169 }
151170
171+ #[ instrument( skip( self , base_consumer) ) ]
172+ fn post_rebalance ( & self , base_consumer : & BaseConsumer < Self > , rebalance : & Rebalance ) {
173+ if let Rebalance :: Assign ( tpl) = rebalance {
174+ if tpl. count ( ) == 0 {
175+ return ;
176+ }
177+ }
178+ let assignment = base_consumer
179+ . assignment ( )
180+ . expect ( "Failed to get assigned TPL" ) ;
181+ if assignment. count ( ) != 0 {
182+ base_consumer
183+ . seek_partitions (
184+ base_consumer
185+ . committed ( rdkafka:: util:: Timeout :: Never )
186+ . expect ( "Failed to get commited TPL" ) ,
187+ rdkafka:: util:: Timeout :: Never ,
188+ )
189+ . expect ( "Failed to seek to commited offset" ) ;
190+ base_consumer
191+ . resume ( & assignment)
192+ . expect ( "Failed to resume consumer" ) ;
193+ }
194+ }
195+
152196 #[ instrument( skip( self ) ) ]
153197 fn commit_callback ( & self , result : KafkaResult < ( ) > , _offsets : & TopicPartitionList ) {
154198 debug ! ( "Got commit callback" ) ;
@@ -336,7 +380,7 @@ pub async fn handle_events(
336380
337381 let mut state = ConsumerState :: Ready ;
338382
339- while let ConsumerState :: Ready { .. } | ConsumerState :: Consuming { .. } = state {
383+ while let ConsumerState :: Ready | ConsumerState :: Consuming { .. } = state {
340384 select ! {
341385 res = match state {
342386 ConsumerState :: Consuming ( ref mut handles, _) => Either :: Left ( handles. join_next( ) ) ,
@@ -352,20 +396,30 @@ pub async fn handle_events(
352396 } ;
353397 info!( "Received event: {:?}" , event) ;
354398 state = match ( state, event) {
355- ( ConsumerState :: Ready , Event :: Assign ( assigned ) ) => {
356- ConsumerState :: Consuming ( spawn_actors( consumer. clone( ) , & assigned ) , assigned )
399+ ( ConsumerState :: Ready , Event :: Assign ( tpl ) ) => {
400+ ConsumerState :: Consuming ( spawn_actors( consumer. clone( ) , & tpl ) , tpl )
357401 }
358402 ( ConsumerState :: Ready , Event :: Revoke ( _) ) => {
359403 unreachable!( "Got partition revocation before the consumer has started" )
360404 }
361405 ( ConsumerState :: Ready , Event :: Shutdown ) => ConsumerState :: Stopped ,
362- ( ConsumerState :: Consuming ( _, _) , Event :: Assign ( _) ) => {
363- unreachable!( "Got partition assignment after the consumer has started" )
406+ ( ConsumerState :: Consuming ( handles, mut tpl) , Event :: Assign ( mut assigned) ) => {
407+ assert!(
408+ tpl. is_disjoint( & assigned) ,
409+ "Newly assigned TPL should be disjoint from TPL we're consuming from"
410+ ) ;
411+ debug!(
412+ "{} additional topic partitions added after assignment" ,
413+ assigned. len( )
414+ ) ;
415+ tpl. append( & mut assigned) ;
416+ handles. shutdown( CALLBACK_DURATION ) . await ;
417+ ConsumerState :: Consuming ( spawn_actors( consumer. clone( ) , & tpl) , tpl)
364418 }
365419 ( ConsumerState :: Consuming ( handles, tpl) , Event :: Revoke ( revoked) ) => {
366420 assert!(
367- tpl == revoked,
368- "Revoked TPL should be equal to the subset of TPL we're consuming from"
421+ revoked. is_subset ( & tpl ) ,
422+ "Revoked TPL should be a subset of TPL we're consuming from"
369423 ) ;
370424 handles. shutdown( CALLBACK_DURATION ) . await ;
371425 ConsumerState :: Ready
@@ -734,7 +788,7 @@ impl CommitClient for StreamConsumer<KafkaContext> {
734788 }
735789}
736790
737- #[ derive( Default ) ]
791+ #[ derive( Default , Debug ) ]
738792struct HighwaterMark {
739793 data : HashMap < ( String , i32 ) , i64 > ,
740794}
@@ -779,6 +833,7 @@ pub async fn commit(
779833 while let Some ( msgs) = receiver. recv ( ) . await {
780834 let mut highwater_mark = HighwaterMark :: new ( ) ;
781835 msgs. 0 . iter ( ) . for_each ( |msg| highwater_mark. track ( msg) ) ;
836+ debug ! ( "Store: {:?}" , highwater_mark) ;
782837 consumer. store_offsets ( & highwater_mark. into ( ) ) . unwrap ( ) ;
783838 }
784839 debug ! ( "Shutdown complete" ) ;
0 commit comments