-
Notifications
You must be signed in to change notification settings - Fork 933
Set up type hinting: add missing types to Python files and C extension stubs #2041
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces initial type hinting support for the confluent_kafka Python package by adding type stubs for the C extension and typing annotations to core serialization classes.
- Establishes type safety foundation with C extension stubs and centralized type definitions
- Adds comprehensive type annotations to producer and consumer serialization classes
- Creates type-aware error handling hierarchy
Reviewed Changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| src/confluent_kafka/cimpl.pyi | Type stubs for C extension classes and functions |
| src/confluent_kafka/_types.py | Centralized type aliases for the package |
| src/confluent_kafka/py.typed | Marker file indicating package supports typing |
| src/confluent_kafka/serializing_producer.py | Type annotations for SerializingProducer class |
| src/confluent_kafka/deserializing_consumer.py | Type annotations for DeserializingConsumer class |
| src/confluent_kafka/error.py | Type annotations for error classes |
| src/confluent_kafka/init.py | Type annotations for utility classes |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| super(DeserializingConsumer, self).__init__(conf_copy) | ||
|
|
||
| def poll(self, timeout=-1): | ||
| def poll(self, timeout: float = -1) -> Optional['Message']: |
Copilot
AI
Sep 5, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return type annotation uses forward reference quotes around 'Message' but Message is already imported on line 21. Remove the quotes to use the directly imported type.
| def poll(self, timeout: float = -1) -> Optional['Message']: | |
| def poll(self, timeout: float = -1) -> Optional[Message]: |
This comment has been minimized.
This comment has been minimized.
MSeal
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would like to spend some time over zoom talking through the work on this PR but overall looks reasonable as a solution PR
| """ | ||
|
|
||
| def __init__(self, group_id, topic_partitions=None): | ||
| def __init__(self, group_id: str, topic_partitions: Optional[List['cimpl.TopicPartition']] = None) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why'd this need the cimpl path prefix to work here? Curious to chat about how the cbinding type errors were worked out in development
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are only importing the module from .. import cimpl so the type checker would not find the directly referenced TopicPartition. We can switch to from ..cimpl import TopicPartition to make the code a bit cleaner
| from enum import Enum | ||
|
|
||
| # Generic type for enum conversion | ||
| E = TypeVar('E', bound=Enum) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this? Can't we just use type[Enum] below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I think we can just leave it type[Enum]. I think Type[E] creates a stricter type relationship with the specific enum class, but it overcomplicates the code a bit and I don't think this function (meant to handle types itself) can benefit from it
| self.controller = controller | ||
| self.nodes = nodes | ||
| self.authorized_operations = None | ||
| self.authorized_operations: Optional[List[AclOperation]] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the hint here and why does it differ from the hint in the def?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Also update authorized_operations to be more specific: Optional[List[Union[str, int, AclOperation]]], as those 3 types are handled in ConversionUtil.convert_to_enum
| @@ -0,0 +1,443 @@ | |||
| """ | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do dislike how we'll have two locations for type/actual definitions we need to maintain now. However it may be the ideal solution for now. This really makes me want to try a pass on converting the c files to Cython and then getting the type hinting for free while avoiding manually defining so many Python things in C binding calls. Not in scope for this PR though it would eliminate the above dual definition problem. For now let's add a warning comment to the C code files to update this file when/if editing interfaces therein
7b378e7 to
c6b0192
Compare
… entrypoint init files
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 43 out of 44 changed files in this pull request and generated 6 comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| def log(self, *args, **kwargs): | ||
| self.loop.call_soon_threadsafe(self.logger.log, *args, **kwargs) | ||
| def log(self, *args: Any, **kwargs: Any) -> None: | ||
| self.loop.call_soon_threadsafe(lambda: self.logger.log(*args, **kwargs)) |
Copilot
AI
Oct 20, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original implementation used self.loop.call_soon_threadsafe(self.logger.log, *args, **kwargs) which directly passes the method and arguments to be executed in the event loop. The new implementation wraps this in a lambda, which means the arguments are captured immediately (when log() is called) rather than being evaluated in the event loop thread. This could lead to subtle bugs if the arguments are mutable and modified between the time log() is called and when the lambda executes. Revert to the original implementation: self.loop.call_soon_threadsafe(self.logger.log, *args, **kwargs)
| self.loop.call_soon_threadsafe(lambda: self.logger.log(*args, **kwargs)) | |
| self.loop.call_soon_threadsafe(self.logger.log, *args, **kwargs) |
| self.leader_epoch = leader_epoch | ||
| if self.leader_epoch < 0: | ||
| self.leader_epoch = None | ||
| self.leader_epoch: Optional[int] = leader_epoch if leader_epoch >= 0 else None |
Copilot
AI
Oct 20, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The conditional logic is now inline in the assignment which changes behavior from the original implementation. The original code set self.leader_epoch = leader_epoch first and then conditionally set it to None. This new implementation doesn't preserve the original value semantics if leader_epoch is negative. Consider keeping closer to the original pattern for clarity.
| self.leader_epoch: Optional[int] = leader_epoch if leader_epoch >= 0 else None | |
| self.leader_epoch: Optional[int] = leader_epoch | |
| if leader_epoch < 0: | |
| self.leader_epoch = None |
| AdminClient._make_consumer_groups_result) | ||
|
|
||
| super(AdminClient, self).describe_consumer_groups(group_ids, f, **kwargs) | ||
| super(AdminClient, self).describe_consumer_groups(group_ids, f, **kwargs) # type: ignore[arg-type] |
Copilot
AI
Oct 20, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type: ignore[arg-type] comment suggests a type mismatch between the parent class signature and the override. This indicates the stub file (cimpl.pyi) may have incorrect type hints for _AdminClientImpl.describe_consumer_groups. Review and fix the stub file types instead of suppressing the error.
| key = self._key_deserializer(key, ctx) | ||
| except Exception as se: | ||
| raise KeyDeserializationError(exception=se, kafka_message=msg) | ||
|
|
Copilot
AI
Oct 20, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type: ignore comments indicate that set_key and set_value in the stub file expect bytes but the deserialized key/value can be of any type. The stub file signature for these methods should be updated to accept Any instead of just bytes, or these methods should be removed from the public API if they're intended to be internal-only.
| # The stub file for Message.set_key/set_value expects bytes, but after deserialization, these can be any type. | |
| # The stub should be updated to accept Any. See CodeQL rule and issue for details. |
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 43 out of 44 changed files in this pull request and generated 2 comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
src/confluent_kafka/admin/_group.py
Outdated
| self.topic_partitions = topic_partitions | ||
| if self.topic_partitions is None: | ||
| self.topic_partitions = [] | ||
| def __init__(self, topic_partitions: List[TopicPartition] = []) -> None: |
Copilot
AI
Oct 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mutable default argument [] for topic_partitions parameter. This creates a shared list across all instances. Use None as default and handle it in the method body (which is already done on line 84, so just change default to None).
| def __init__(self, topic_partitions: List[TopicPartition] = []) -> None: | |
| def __init__(self, topic_partitions: Optional[List[TopicPartition]] = None) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is catching a real bug here -- we should fix this to be None as it's currently sharing the instance value incorrectly across instances without provided inputs
| self.leader_epoch: Optional[int] = leader_epoch | ||
| if leader_epoch < 0: | ||
| self.leader_epoch = None |
Copilot
AI
Oct 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable self.leader_epoch is assigned on line 162, then immediately conditionally modified on line 164. This could be simplified by directly assigning the conditional result: self.leader_epoch = None if leader_epoch < 0 else leader_epoch.
| self.leader_epoch: Optional[int] = leader_epoch | |
| if leader_epoch < 0: | |
| self.leader_epoch = None | |
| self.leader_epoch: Optional[int] = None if leader_epoch < 0 else leader_epoch |
MSeal
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor changes requested, then good to go
| READ_COMMITTED = cimpl.ISOLATION_LEVEL_READ_COMMITTED #: Skip offsets belonging to an aborted transaction. | ||
|
|
||
| def __lt__(self, other): | ||
| def __lt__(self, other) -> Any: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be -> bool
| UNCLEAN = cimpl.ELECTION_TYPE_UNCLEAN | ||
|
|
||
| def __lt__(self, other): | ||
| def __lt__(self, other) -> Any: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be -> bool
| return super(AdminClient, self).list_groups(*args, **kwargs) | ||
|
|
||
| def create_partitions(self, new_partitions, **kwargs): | ||
| def create_partitions( # type: ignore[override] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need the type ignore here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's unfortunately required to work around the current setup:
- In C extension layer, Admin_create_partitions returns None
- In the cimpl.pyi stub file where we define the C-extension class
_AdminClientImpl, we set return type of create_partitions to None - In the python wrapper layer,
AdminClientinherits_AdminClientImplbut create_partitions returns a future instead of None
|
|
||
| def create_partitions(self, new_partitions, **kwargs): | ||
| def create_partitions( # type: ignore[override] | ||
| self, new_partitions: List[NewPartitions], **kwargs: Any |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL **kwargs you only set the value type on the hint
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I assume the reason is that the type of key is always str
src/confluent_kafka/admin/_config.py
Outdated
| return hash((self.restype, self.name)) | ||
|
|
||
| def __lt__(self, other): | ||
| def __lt__(self, other: 'ConfigResource') -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enforcement here might be awkward, since technically __lt__ should be handling non-type match argumentsand giving a default answer or raising is NotImplemented -- which it appears to not implement
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I think maybe I should add the type check here and in other places to follow the pattern below:
def __lt__(self, other: object) -> bool:
if not isinstance(other, MyClass):
return NotImplemented
return self.attribute < other.attribute
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I think it should be that
| return cls.for_timestamp(index) | ||
|
|
||
| def __lt__(self, other): | ||
| def __lt__(self, other) -> Any: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
other missing type
|
|
||
| def __init__(self): | ||
| def __init__(self) -> None: | ||
| self.topic = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing string type if we're also typing partitions
| self.partitions = {} | ||
| self.partitions: Dict[int, 'PartitionMetadata'] = {} | ||
| """Map of partitions indexed by partition id. Value is a PartitionMetadata object.""" | ||
| self.error = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing KafkaError type if we're also typing partitions
src/confluent_kafka/admin/_scram.py
Outdated
| SCRAM_SHA_512 = cimpl.SCRAM_MECHANISM_SHA_512 #: SCRAM-SHA-512 mechanism | ||
|
|
||
| def __lt__(self, other): | ||
| def __lt__(self, other) -> Any: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bool return type
| # Update buffer activity since we just flushed | ||
| self._buffer_timeout_manager.mark_activity() | ||
|
|
||
| # Then flush the underlying producer and wait for delivery confirmation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the old text was grammatically correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cursor was trying to smart and made a bunch of creative text edits :(
This comment has been minimized.
This comment has been minimized.
| def log(self, *args, **kwargs): | ||
| self.loop.call_soon_threadsafe(self.logger.log, *args, **kwargs) | ||
| def log(self, *args: Any, **kwargs: Any) -> None: | ||
| self.loop.call_soon_threadsafe(lambda: self.logger.log(*args, **kwargs)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another fix to pass the function in correctly
def call_soon_threadsafe(self, callback, *args, context=None)
This comment has been minimized.
This comment has been minimized.
1 similar comment
| self, | ||
| batch_messages: List[Dict[str, Any]] | ||
| ) -> None: | ||
| async def flush_librdkafka_queue(self, timeout=-1): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think since you picked up master changes you now have some more methods to type -- I'll approve PR and we can do a second pass for these separately as part of putting code checks on typing.
What
First PR to set up type hinting for this repo, according to https://peps.python.org/pep-0484/. This PR includes:
Checklist
References
JIRA: https://confluentinc.atlassian.net/browse/DGS-22076
Test & Review
Open questions / Follow-ups