Skip to content

Commit 3886a30

Browse files
authored
Set up type hinting: add correct types and apply type fixes in schema-registry module (#2107)
* update * remove py.typed for now * update * fix cimply and add types to serde producer/consumer * admin * address feedback * add warning to stub and c files; admin typing more * add accidentally removed md files * fix merge conflicts in md files, add types to admin and serialization entrypoint init files * finish admin init * add types for AIO module * linter fix * address mypy complaints * revert some accidental doc change * fix some suggestions by copilot * linter * fix * resolve conflict * encryption clients * fix * revert incorrect merge conflict changes * fix many things * more fixes in non sr modules * type encrypt_executor.py * more typeignore removals * update * handle union types in schemas * a bit more * revert some bad changes during merge, address copilot comments * minor * support type hint substitution for unasync * linter * fix failign integration tests * fix incorrect cache pattern * fix type * linter for comment * pin google-re2<1.1.20251105 due to dropped python 3.9 support
1 parent 7b2889f commit 3886a30

38 files changed

+1367
-637
lines changed

pyproject.toml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,21 @@ Homepage = "https://github.com/confluentinc/confluent-kafka-python"
2727
[tool.mypy]
2828
ignore_missing_imports = true
2929

30+
[[tool.mypy.overrides]]
31+
module = [
32+
"confluent_kafka.schema_registry.avro",
33+
"confluent_kafka.schema_registry.json_schema",
34+
"confluent_kafka.schema_registry.protobuf",
35+
]
36+
disable_error_code = ["assignment", "no-redef"]
37+
38+
[[tool.mypy.overrides]]
39+
module = [
40+
"confluent_kafka.schema_registry.confluent.meta_pb2",
41+
"confluent_kafka.schema_registry.confluent.types.decimal_pb2",
42+
]
43+
ignore_errors = true
44+
3045
[tool.setuptools]
3146
include-package-data = false
3247

requirements/requirements-rules.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ azure-identity
22
azure-keyvault-keys
33
boto3>=1.35
44
cel-python>=0.4.0
5+
# Pin google-re2 to last version with Python 3.9 wheels (see https://pypi.org/project/google-re2/1.1.20251105/#files)
6+
google-re2<1.1.20251105
57
google-auth
68
google-api-core
79
google-cloud-kms

requirements/requirements-tests.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# core test requirements
22
urllib3<3
33
flake8
4+
mypy
5+
types-cachetools
46
orjson
57
pytest
68
pytest-timeout

src/confluent_kafka/admin/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -649,11 +649,9 @@ def delete_topics( # type: ignore[override]
649649
return futmap
650650

651651
def list_topics(self, *args: Any, **kwargs: Any) -> ClusterMetadata:
652-
653652
return super(AdminClient, self).list_topics(*args, **kwargs)
654653

655654
def list_groups(self, *args: Any, **kwargs: Any) -> List[GroupMetadata]:
656-
657655
return super(AdminClient, self).list_groups(*args, **kwargs)
658656

659657
def create_partitions( # type: ignore[override]

src/confluent_kafka/admin/_group.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class MemberAssignment:
8080
The topic partitions assigned to a group member.
8181
"""
8282

83-
def __init__(self, topic_partitions: Optional[List[TopicPartition]] = None) -> None:
83+
def __init__(self, topic_partitions: Optional[List[TopicPartition]]) -> None:
8484
self.topic_partitions = topic_partitions or []
8585

8686

src/confluent_kafka/admin/_resource.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from typing import Any
1615
from enum import Enum
1716
from .. import cimpl as _cimpl
1817

@@ -28,7 +27,7 @@ class ResourceType(Enum):
2827
BROKER = _cimpl.RESOURCE_BROKER #: Broker resource. Resource name is broker id.
2928
TRANSACTIONAL_ID = _cimpl.RESOURCE_TRANSACTIONAL_ID #: Transactional ID resource.
3029

31-
def __lt__(self, other: object) -> Any:
30+
def __lt__(self, other: object) -> bool:
3231
if not isinstance(other, ResourceType):
3332
return NotImplemented
3433
return self.value < other.value
@@ -44,7 +43,7 @@ class ResourcePatternType(Enum):
4443
LITERAL = _cimpl.RESOURCE_PATTERN_LITERAL #: Literal: A literal resource name
4544
PREFIXED = _cimpl.RESOURCE_PATTERN_PREFIXED #: Prefixed: A prefixed resource name
4645

47-
def __lt__(self, other: object) -> Any:
46+
def __lt__(self, other: object) -> bool:
4847
if not isinstance(other, ResourcePatternType):
4948
return NotImplemented
5049
return self.value < other.value

src/confluent_kafka/avro/cached_schema_registry_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232

3333
# Python 2 considers int an instance of str
3434
try:
35-
string_type = basestring # noqa
35+
string_type = basestring # type: ignore[name-defined] # noqa
3636
except NameError:
3737
string_type = str
3838

src/confluent_kafka/avro/load.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ def _hash_func(self):
4747
from avro.errors import SchemaParseException
4848
except ImportError:
4949
# avro < 1.11.0
50-
from avro.schema import SchemaParseException
50+
from avro.schema import SchemaParseException # type: ignore[attr-defined,no-redef]
5151

52-
schema.RecordSchema.__hash__ = _hash_func
53-
schema.PrimitiveSchema.__hash__ = _hash_func
54-
schema.UnionSchema.__hash__ = _hash_func
52+
schema.RecordSchema.__hash__ = _hash_func # type: ignore[method-assign]
53+
schema.PrimitiveSchema.__hash__ = _hash_func # type: ignore[method-assign]
54+
schema.UnionSchema.__hash__ = _hash_func # type: ignore[method-assign]
5555

5656
except ImportError:
57-
schema = None
57+
schema = None # type: ignore[assignment]

src/confluent_kafka/cimpl.pyi

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,13 @@ TODO: Consider migrating to Cython in the future to eliminate this dual
3434
maintenance burden and get type hints directly from the implementation.
3535
"""
3636

37-
from typing import Any, Optional, Callable, List, Tuple, Dict, Union, overload, TYPE_CHECKING
37+
from typing import Any, Optional, Callable, List, Tuple, Dict, Union, overload
3838
from typing_extensions import Self, Literal
3939
import builtins
4040

41-
from ._types import HeadersType
41+
from confluent_kafka.admin._metadata import ClusterMetadata, GroupMetadata
4242

43-
if TYPE_CHECKING:
44-
from confluent_kafka.admin._metadata import ClusterMetadata, GroupMetadata
43+
from ._types import HeadersType
4544

4645
# Callback types with proper class references (defined locally to avoid circular imports)
4746
DeliveryCallback = Callable[[Optional['KafkaError'], 'Message'], None]

src/confluent_kafka/experimental/aio/producer/_kafka_batch_executor.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,22 +110,22 @@ def _produce_batch_and_poll() -> int:
110110

111111
async def flush_librdkafka_queue(self, timeout=-1):
112112
"""Flush the librdkafka queue and wait for all messages to be delivered
113-
114113
This method awaits until all outstanding produce requests are completed
115114
or the timeout is reached, unless the timeout is set to 0 (non-blocking).
116-
117115
Args:
118116
timeout: Maximum time to wait in seconds:
119117
- -1 = wait indefinitely (default)
120118
- 0 = non-blocking, return immediately
121119
- >0 = wait up to timeout seconds
122-
123120
Returns:
124121
Number of messages still in queue after flush attempt
125122
"""
126123
return await _common.async_call(self._executor, self._producer.flush, timeout)
127124

128-
def _handle_partial_failures(self, batch_messages: List[Dict[str, Any]]) -> None:
125+
def _handle_partial_failures(
126+
self,
127+
batch_messages: List[Dict[str, Any]]
128+
) -> None:
129129
"""Handle messages that failed during produce_batch
130130
131131
When produce_batch encounters messages that fail immediately (e.g.,

0 commit comments

Comments
 (0)