Skip to content

Commit 10db6e9

Browse files
garyrussellartembilan
authored andcommitted
GH-650: Pause/Resume doc example
Resolves #650
1 parent 0ffccba commit 10db6e9

File tree

1 file changed

+59
-0
lines changed

1 file changed

+59
-0
lines changed

src/reference/asciidoc/kafka.adoc

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1476,6 +1476,65 @@ However, the consumers might not have actually paused yet; `isConsumerPaused()`
14761476

14771477
In addition, also since _2.1.5_, `ConsumerPausedEvent` s and `ConsumerResumedEvent` s are published with the container as the `source` property and the `TopicPatition` s involved in the `partitions` s property.
14781478

1479+
This simple Spring Boot application demonstrates using the container registry to get a reference to a `@KafkaListener` method's container and pausing/resuming its consumers, as well as receiving the corresponding events.
1480+
1481+
[source, java]
1482+
----
1483+
@SpringBootApplication
1484+
public class Application implements ApplicationListener<KafkaEvent> {
1485+
1486+
public static void main(String[] args) {
1487+
SpringApplication.run(Application.class, args).close();
1488+
}
1489+
1490+
@Override
1491+
public void onApplicationEvent(KafkaEvent event) {
1492+
System.out.println(event);
1493+
}
1494+
1495+
@Bean
1496+
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
1497+
KafkaTemplate<String, String> template) {
1498+
return args -> {
1499+
template.send("pause.resume.topic", "foo");
1500+
Thread.sleep(10_000);
1501+
System.out.println("pausing");
1502+
registry.getListenerContainer("pause.resume").pause();
1503+
Thread.sleep(10_000);
1504+
template.send("pause.resume.topic", "bar");
1505+
Thread.sleep(10_000);
1506+
System.out.println("resuming");
1507+
registry.getListenerContainer("pause.resume").resume();
1508+
Thread.sleep(10_000);
1509+
};
1510+
}
1511+
1512+
@KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
1513+
public void listen(String in) {
1514+
System.out.println(in);
1515+
}
1516+
1517+
@Bean
1518+
public NewTopic topic() {
1519+
return new NewTopic("pause.resume.topic", 2, (short) 1);
1520+
}
1521+
1522+
}
1523+
----
1524+
1525+
With results:
1526+
1527+
[source]
1528+
----
1529+
partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
1530+
foo
1531+
pausing
1532+
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
1533+
resuming
1534+
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
1535+
bar
1536+
----
1537+
14791538
[[serdes]]
14801539
==== Serialization/Deserialization and Message Conversion
14811540

0 commit comments

Comments
 (0)