Skip to content

Conversation

@fangnx
Copy link
Member

@fangnx fangnx commented Sep 5, 2025

What

First PR to set up type hinting for this repo, according to https://peps.python.org/pep-0484/. This PR includes:

  • C extension stubs (cimpl.pyi). Note that stubgen didn't work well with our confluent_kafka module and only generated the stubs partially, so some manual parsing of C code (with AI) was used to fill in the gaps
  • Add types to Python files in the repository
  • Verify the types with mypy type checker: most files are validated successfully except for schema-registry module which is already typed prior to this PR. I will investigate in the follow-up PR and see if those are real issues or false positives

Checklist

  • Contains customer facing changes? Including API/behavior changes
  • Did you add sufficient unit test and/or integration test coverage for this PR?
    • If not, please explain why it is not required

References

JIRA: https://confluentinc.atlassian.net/browse/DGS-22076

Test & Review

Open questions / Follow-ups

@confluent-cla-assistant
Copy link

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

@fangnx fangnx marked this pull request as ready for review September 5, 2025 18:45
Copilot AI review requested due to automatic review settings September 5, 2025 18:45
@fangnx fangnx requested review from a team and MSeal as code owners September 5, 2025 18:45

This comment was marked as outdated.

@fangnx fangnx changed the title Add type hinting Set up type hinting: C extension stubs + typing for serializing_producer + deserializing_consumer Sep 5, 2025
@fangnx fangnx requested a review from Copilot September 5, 2025 20:00
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces initial type hinting support for the confluent_kafka Python package by adding type stubs for the C extension and typing annotations to core serialization classes.

  • Establishes type safety foundation with C extension stubs and centralized type definitions
  • Adds comprehensive type annotations to producer and consumer serialization classes
  • Creates type-aware error handling hierarchy

Reviewed Changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
src/confluent_kafka/cimpl.pyi Type stubs for C extension classes and functions
src/confluent_kafka/_types.py Centralized type aliases for the package
src/confluent_kafka/py.typed Marker file indicating package supports typing
src/confluent_kafka/serializing_producer.py Type annotations for SerializingProducer class
src/confluent_kafka/deserializing_consumer.py Type annotations for DeserializingConsumer class
src/confluent_kafka/error.py Type annotations for error classes
src/confluent_kafka/init.py Type annotations for utility classes

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

super(DeserializingConsumer, self).__init__(conf_copy)

def poll(self, timeout=-1):
def poll(self, timeout: float = -1) -> Optional['Message']:
Copy link

Copilot AI Sep 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return type annotation uses forward reference quotes around 'Message' but Message is already imported on line 21. Remove the quotes to use the directly imported type.

Suggested change
def poll(self, timeout: float = -1) -> Optional['Message']:
def poll(self, timeout: float = -1) -> Optional[Message]:

Copilot uses AI. Check for mistakes.
@sonarqube-confluent

This comment has been minimized.

Copy link
Contributor

@MSeal MSeal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would like to spend some time over zoom talking through the work on this PR but overall looks reasonable as a solution PR

"""

def __init__(self, group_id, topic_partitions=None):
def __init__(self, group_id: str, topic_partitions: Optional[List['cimpl.TopicPartition']] = None) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why'd this need the cimpl path prefix to work here? Curious to chat about how the cbinding type errors were worked out in development

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are only importing the module from .. import cimpl so the type checker would not find the directly referenced TopicPartition. We can switch to from ..cimpl import TopicPartition to make the code a bit cleaner

from enum import Enum

# Generic type for enum conversion
E = TypeVar('E', bound=Enum)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this? Can't we just use type[Enum] below?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think we can just leave it type[Enum]. I think Type[E] creates a stricter type relationship with the specific enum class, but it overcomplicates the code a bit and I don't think this function (meant to handle types itself) can benefit from it

self.controller = controller
self.nodes = nodes
self.authorized_operations = None
self.authorized_operations: Optional[List[AclOperation]] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the hint here and why does it differ from the hint in the def?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Also update authorized_operations to be more specific: Optional[List[Union[str, int, AclOperation]]], as those 3 types are handled in ConversionUtil.convert_to_enum

@@ -0,0 +1,443 @@
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do dislike how we'll have two locations for type/actual definitions we need to maintain now. However it may be the ideal solution for now. This really makes me want to try a pass on converting the c files to Cython and then getting the type hinting for free while avoiding manually defining so many Python things in C binding calls. Not in scope for this PR though it would eliminate the above dual definition problem. For now let's add a warning comment to the C code files to update this file when/if editing interfaces therein

@fangnx fangnx requested a review from MSeal September 9, 2025 21:21
@sonarqube-confluent

This comment has been minimized.

@sonarqube-confluent

This comment has been minimized.

@sonarqube-confluent

This comment has been minimized.

@fangnx fangnx changed the title Set up type hinting: C extension stubs + typing for serializing_producer + deserializing_consumer Set up type hinting Oct 16, 2025
@sonarqube-confluent

This comment has been minimized.

@sonarqube-confluent

This comment has been minimized.

@fangnx fangnx changed the title Set up type hinting Set up type hinting: add missing types to Python files and C extension stubs Oct 20, 2025
@fangnx fangnx requested a review from Copilot October 20, 2025 22:15
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 43 out of 44 changed files in this pull request and generated 6 comments.


Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

def log(self, *args, **kwargs):
self.loop.call_soon_threadsafe(self.logger.log, *args, **kwargs)
def log(self, *args: Any, **kwargs: Any) -> None:
self.loop.call_soon_threadsafe(lambda: self.logger.log(*args, **kwargs))
Copy link

Copilot AI Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original implementation used self.loop.call_soon_threadsafe(self.logger.log, *args, **kwargs) which directly passes the method and arguments to be executed in the event loop. The new implementation wraps this in a lambda, which means the arguments are captured immediately (when log() is called) rather than being evaluated in the event loop thread. This could lead to subtle bugs if the arguments are mutable and modified between the time log() is called and when the lambda executes. Revert to the original implementation: self.loop.call_soon_threadsafe(self.logger.log, *args, **kwargs)

Suggested change
self.loop.call_soon_threadsafe(lambda: self.logger.log(*args, **kwargs))
self.loop.call_soon_threadsafe(self.logger.log, *args, **kwargs)

Copilot uses AI. Check for mistakes.
self.leader_epoch = leader_epoch
if self.leader_epoch < 0:
self.leader_epoch = None
self.leader_epoch: Optional[int] = leader_epoch if leader_epoch >= 0 else None
Copy link

Copilot AI Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The conditional logic is now inline in the assignment which changes behavior from the original implementation. The original code set self.leader_epoch = leader_epoch first and then conditionally set it to None. This new implementation doesn't preserve the original value semantics if leader_epoch is negative. Consider keeping closer to the original pattern for clarity.

Suggested change
self.leader_epoch: Optional[int] = leader_epoch if leader_epoch >= 0 else None
self.leader_epoch: Optional[int] = leader_epoch
if leader_epoch < 0:
self.leader_epoch = None

Copilot uses AI. Check for mistakes.
AdminClient._make_consumer_groups_result)

super(AdminClient, self).describe_consumer_groups(group_ids, f, **kwargs)
super(AdminClient, self).describe_consumer_groups(group_ids, f, **kwargs) # type: ignore[arg-type]
Copy link

Copilot AI Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type: ignore[arg-type] comment suggests a type mismatch between the parent class signature and the override. This indicates the stub file (cimpl.pyi) may have incorrect type hints for _AdminClientImpl.describe_consumer_groups. Review and fix the stub file types instead of suppressing the error.

Copilot uses AI. Check for mistakes.
key = self._key_deserializer(key, ctx)
except Exception as se:
raise KeyDeserializationError(exception=se, kafka_message=msg)

Copy link

Copilot AI Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type: ignore comments indicate that set_key and set_value in the stub file expect bytes but the deserialized key/value can be of any type. The stub file signature for these methods should be updated to accept Any instead of just bytes, or these methods should be removed from the public API if they're intended to be internal-only.

Suggested change
# The stub file for Message.set_key/set_value expects bytes, but after deserialization, these can be any type.
# The stub should be updated to accept Any. See CodeQL rule and issue for details.

Copilot uses AI. Check for mistakes.
@sonarqube-confluent

This comment has been minimized.

@fangnx fangnx requested a review from Copilot October 21, 2025 18:00
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 43 out of 44 changed files in this pull request and generated 2 comments.


Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

self.topic_partitions = topic_partitions
if self.topic_partitions is None:
self.topic_partitions = []
def __init__(self, topic_partitions: List[TopicPartition] = []) -> None:
Copy link

Copilot AI Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mutable default argument [] for topic_partitions parameter. This creates a shared list across all instances. Use None as default and handle it in the method body (which is already done on line 84, so just change default to None).

Suggested change
def __init__(self, topic_partitions: List[TopicPartition] = []) -> None:
def __init__(self, topic_partitions: Optional[List[TopicPartition]] = None) -> None:

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is catching a real bug here -- we should fix this to be None as it's currently sharing the instance value incorrectly across instances without provided inputs

Comment on lines +162 to 164
self.leader_epoch: Optional[int] = leader_epoch
if leader_epoch < 0:
self.leader_epoch = None
Copy link

Copilot AI Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable self.leader_epoch is assigned on line 162, then immediately conditionally modified on line 164. This could be simplified by directly assigning the conditional result: self.leader_epoch = None if leader_epoch < 0 else leader_epoch.

Suggested change
self.leader_epoch: Optional[int] = leader_epoch
if leader_epoch < 0:
self.leader_epoch = None
self.leader_epoch: Optional[int] = None if leader_epoch < 0 else leader_epoch

Copilot uses AI. Check for mistakes.
Copy link
Contributor

@MSeal MSeal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor changes requested, then good to go

READ_COMMITTED = cimpl.ISOLATION_LEVEL_READ_COMMITTED #: Skip offsets belonging to an aborted transaction.

def __lt__(self, other):
def __lt__(self, other) -> Any:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be -> bool

UNCLEAN = cimpl.ELECTION_TYPE_UNCLEAN

def __lt__(self, other):
def __lt__(self, other) -> Any:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be -> bool

return super(AdminClient, self).list_groups(*args, **kwargs)

def create_partitions(self, new_partitions, **kwargs):
def create_partitions( # type: ignore[override]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the type ignore here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's unfortunately required to work around the current setup:

  1. In C extension layer, Admin_create_partitions returns None
  2. In the cimpl.pyi stub file where we define the C-extension class _AdminClientImpl, we set return type of create_partitions to None
  3. In the python wrapper layer, AdminClient inherits _AdminClientImpl but create_partitions returns a future instead of None


def create_partitions(self, new_partitions, **kwargs):
def create_partitions( # type: ignore[override]
self, new_partitions: List[NewPartitions], **kwargs: Any
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL **kwargs you only set the value type on the hint

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I assume the reason is that the type of key is always str

return hash((self.restype, self.name))

def __lt__(self, other):
def __lt__(self, other: 'ConfigResource') -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enforcement here might be awkward, since technically __lt__ should be handling non-type match argumentsand giving a default answer or raising is NotImplemented -- which it appears to not implement

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think maybe I should add the type check here and in other places to follow the pattern below:

def __lt__(self, other: object) -> bool:
    if not isinstance(other, MyClass):
        return NotImplemented
    return self.attribute < other.attribute

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think it should be that

return cls.for_timestamp(index)

def __lt__(self, other):
def __lt__(self, other) -> Any:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other missing type


def __init__(self):
def __init__(self) -> None:
self.topic = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing string type if we're also typing partitions

self.partitions = {}
self.partitions: Dict[int, 'PartitionMetadata'] = {}
"""Map of partitions indexed by partition id. Value is a PartitionMetadata object."""
self.error = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing KafkaError type if we're also typing partitions

SCRAM_SHA_512 = cimpl.SCRAM_MECHANISM_SHA_512 #: SCRAM-SHA-512 mechanism

def __lt__(self, other):
def __lt__(self, other) -> Any:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bool return type

# Update buffer activity since we just flushed
self._buffer_timeout_manager.mark_activity()

# Then flush the underlying producer and wait for delivery confirmation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the old text was grammatically correct

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor was trying to smart and made a bunch of creative text edits :(

@sonarqube-confluent

This comment has been minimized.

def log(self, *args, **kwargs):
self.loop.call_soon_threadsafe(self.logger.log, *args, **kwargs)
def log(self, *args: Any, **kwargs: Any) -> None:
self.loop.call_soon_threadsafe(lambda: self.logger.log(*args, **kwargs))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another fix to pass the function in correctly

def call_soon_threadsafe(self, callback, *args, context=None)

@sonarqube-confluent

This comment has been minimized.

1 similar comment
@sonarqube-confluent
Copy link

Passed

Analysis Details

16 Issues

  • Bug 0 Bugs
  • Vulnerability 0 Vulnerabilities
  • Code Smell 16 Code Smells

Coverage and Duplications

  • Coverage 85.90% Coverage (67.10% Estimated after merge)
  • Duplications No duplication information (4.80% Estimated after merge)

Project ID: confluent-kafka-python

View in SonarQube

self,
batch_messages: List[Dict[str, Any]]
) -> None:
async def flush_librdkafka_queue(self, timeout=-1):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think since you picked up master changes you now have some more methods to type -- I'll approve PR and we can do a second pass for these separately as part of putting code checks on typing.

@MSeal MSeal merged commit 51652e3 into master Oct 22, 2025
3 checks passed
@MSeal MSeal deleted the typehinting-kafka branch October 22, 2025 22:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants