2020
2121import java .io .IOException ;
2222import java .net .ServerSocket ;
23+ import java .util .Map ;
2324
2425import javax .net .ServerSocketFactory ;
2526
27+ import org .apache .kafka .clients .consumer .Consumer ;
28+ import org .apache .kafka .clients .consumer .ConsumerConfig ;
29+ import org .apache .kafka .clients .consumer .KafkaConsumer ;
30+ import org .apache .kafka .clients .producer .KafkaProducer ;
31+ import org .apache .kafka .clients .producer .Producer ;
32+ import org .apache .kafka .clients .producer .ProducerRecord ;
2633import org .junit .Test ;
2734import org .junit .runner .RunWith ;
2835
2936import org .springframework .beans .factory .annotation .Autowired ;
3037import org .springframework .context .annotation .Bean ;
3138import org .springframework .context .annotation .Configuration ;
39+ import org .springframework .kafka .test .utils .KafkaTestUtils ;
3240import org .springframework .test .context .junit4 .SpringRunner ;
3341
3442/**
4149@ RunWith (SpringRunner .class )
4250public class AddressableEmbeddedBrokerTests {
4351
52+ private static final String TEST_EMBEDDED = "testEmbedded" ;
53+
4454 @ Autowired
4555 private Config config ;
4656
@@ -56,14 +66,36 @@ public void testKafkaEmbedded() {
5666 .isEqualTo (System .getProperty (KafkaEmbedded .SPRING_EMBEDDED_ZOOKEEPER_CONNECT ));
5767 }
5868
69+ @ Test
70+ public void testLateStartedConsumer () throws Exception {
71+ Map <String , Object > consumerProps = KafkaTestUtils .consumerProps (TEST_EMBEDDED , "false" , this .broker );
72+ consumerProps .put (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , "earliest" );
73+ Consumer <Integer , String > consumer = new KafkaConsumer <>(consumerProps );
74+ this .broker .consumeFromAnEmbeddedTopic (consumer , TEST_EMBEDDED );
75+
76+ Producer <String , Object > producer = new KafkaProducer <>(KafkaTestUtils .producerProps (this .broker ));
77+ producer .send (new ProducerRecord <String , Object >(TEST_EMBEDDED , "foo" ));
78+ producer .close ();
79+ KafkaTestUtils .getSingleRecord (consumer , TEST_EMBEDDED );
80+
81+ consumerProps = KafkaTestUtils .consumerProps ("another" + TEST_EMBEDDED , "false" , this .broker );
82+ consumerProps .put (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , "earliest" );
83+ Consumer <Integer , String > consumer2 = new KafkaConsumer <>(consumerProps );
84+ this .broker .consumeFromAnEmbeddedTopic (consumer2 , TEST_EMBEDDED );
85+ KafkaTestUtils .getSingleRecord (consumer2 , TEST_EMBEDDED );
86+
87+ consumer .close ();
88+ consumer2 .close ();
89+ }
90+
5991 @ Configuration
6092 public static class Config {
6193
6294 private int port ;
6395
6496 @ Bean
6597 public KafkaEmbedded broker () throws IOException {
66- KafkaEmbedded broker = new KafkaEmbedded (1 );
98+ KafkaEmbedded broker = new KafkaEmbedded (1 , true , TEST_EMBEDDED );
6799 ServerSocket ss = ServerSocketFactory .getDefault ().createServerSocket (0 );
68100 this .port = ss .getLocalPort ();
69101 ss .close ();
0 commit comments