-
Notifications
You must be signed in to change notification settings - Fork 933
Make consumer - consumer and poll wakeable #2126
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes critical usability issues where Consumer.poll() and Consumer.consume() would block indefinitely and not respond to Ctrl+C (KeyboardInterrupt) signals. The solution implements a "wakeable poll" pattern that chunks long timeouts into 200ms intervals and periodically checks for pending signals between chunks, allowing proper signal handling and graceful interruption.
Key Changes:
- Implemented helper functions
calculate_chunk_timeout()andcheck_signals_between_chunks()in C code to enable interruptible polling - Modified
Consumer.poll()andConsumer.consume()to use chunked polling with periodic signal checks - Added comprehensive test coverage for utility functions, interruptibility, edge cases, and message handling
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| tests/test_Consumer.py | Added utility helper send_sigint_after_delay() and 7 new test functions covering chunk timeout calculation, signal detection, utility function interaction, poll/consume interruptibility, and edge cases |
| src/confluent_kafka/src/Consumer.c | Implemented wakeable poll pattern with helper functions and refactored Consumer_poll() and Consumer_consume() to use chunked polling with signal checking |
| CHANGELOG.md | Documented the fix for blocking poll/consume operations |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| * | ||
| * Instead of a single blocking call to rd_kafka_consumer_poll() with the | ||
| * full timeout, this function: | ||
| * 1. Splits the timeout into 200ms chunks |
Copilot
AI
Nov 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The function documentation would benefit from documenting the CHUNK_TIMEOUT_MS constant value (200ms) in the description to match the implementation details mentioned in comment lines.
| * 1. Splits the timeout into 200ms chunks | |
| * 1. Splits the timeout into 200ms chunks (CHUNK_TIMEOUT_MS = 200ms) |
| return NULL; | ||
| } | ||
|
|
||
| /* Create Python list from messages */ |
Copilot
AI
Nov 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra whitespace before closing comment marker.
| /* Create Python list from messages */ | |
| /* Create Python list from messages */ |
MSeal
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One minor comment, tested out things locally with a 30s timeout with this vs master... much better experience of always being able to interrupt.
| time.sleep(delay_seconds) | ||
| try: | ||
| os.kill(os.getpid(), signal.SIGINT) | ||
| except Exception: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should probably catch the KeyboardInterrupt here instead so you don't mask other errors being raised instead
Summary
This PR fixes a critical usability issue where
Consumer.poll()andConsumer.consume()would block indefinitely and not respond to Ctrl+C (KeyboardInterrupt) signals.Fixes: #209 and #807
Problem
When calling
Consumer.poll()orConsumer.consume()with an infinite timeout (or very long timeout), the operations would block indefinitely in the underlying librdkafka C library. Because the Python Global Interpreter Lock (GIL) was released during these blocking calls, Python's signal handling mechanism couldn't detect Ctrl+C signals, making it impossible to gracefully interrupt the consumer.Solution
The fix implements a "wakeable poll" pattern that:
PyErr_CheckSignals()to detect pending KeyboardInterrupt signalsImpact
This fix significantly improves the developer and operational experience for applications using the Confluent Python Kafka client.
Before this fix:
After this fix:
Testing (
tests/test_Consumer.py)Utility function tests:
test_calculate_chunk_timeout_utility_function(): Tests chunk timeout calculation logictest_check_signals_between_chunks_utility_function(): Tests signal detection between chunkstest_wakeable_poll_utility_functions_interaction(): Tests interaction between both utilitiesPoll interruptibility tests:
test_poll_interruptibility_and_messages(): Tests poll() can be interrupted and still handles messages correctlytest_poll_edge_cases(): Tests edge cases (zero timeout, closed consumer, short timeouts)Consume interruptibility tests:
test_consume_interruptibility_and_messages(): Tests consume() can be interrupted and still handles messages correctlytest_consume_edge_cases(): Tests edge cases (zero timeout, invalid parameters, short timeouts)All tests use a helper function
send_sigint_after_delay()to simulate Ctrl+C in automated tests.Performance Impact
Manual Testing
test_consumer_consume.py