Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ confluent-kafka-python v2.8.2 is based on librdkafka v2.8.0, see the
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.8.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.

Note: Versioning is skipped due to breaking change in v2.8.1.
Note: Versioning is skipped due to breaking change in v2.8.1.
Do not run software with v2.8.1 installed.


Expand Down Expand Up @@ -166,7 +166,7 @@ We apologize for the inconvenience and appreciate the feedback that we have gott

v2.6.2 is a feature release with the following features, fixes and enhancements:

Note: This release modifies the dependencies of the Schema Registry client.
Note: This release modifies the dependencies of the Schema Registry client.
If you are using the Schema Registry client, please ensure that you install the
extra dependencies using the following syntax:

Expand Down Expand Up @@ -246,15 +246,15 @@ for a complete list of changes, enhancements, fixes and upgrade considerations.
## v2.5.0 - 2024-07-10

> [!WARNING]
This version has introduced a regression in which an assert is triggered during **PushTelemetry** call. This happens when no metric is matched on the client side among those requested by broker subscription.
This version has introduced a regression in which an assert is triggered during **PushTelemetry** call. This happens when no metric is matched on the client side among those requested by broker subscription.
>
> You won't face any problem if:
> * Broker doesn't support [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability).
> * [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) feature is disabled on the broker side.
> * [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) feature is disabled on the client side. This is enabled by default. Set configuration `enable.metrics.push` to `false`.
> * If [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) is enabled on the broker side and there is no subscription configured there.
> * If [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) is enabled on the broker side with subscriptions that match the [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) metrics defined on the client.
>
>
> Having said this, we strongly recommend using `v2.5.3` and above to not face this regression at all.

v2.5.0 is a feature release with the following features, fixes and enhancements:
Expand Down Expand Up @@ -628,4 +628,3 @@ v1.5.0 is a maintenance release with the following fixes and enhancements:
confluent-kafka-python is based on librdkafka v1.5.0, see the
[librdkafka release notes](https://github.com/edenhill/librdkafka/releases/tag/v1.5.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ pip install confluent-kafka

# With Schema Registry support
pip install "confluent-kafka[avro,schemaregistry]" # Avro
pip install "confluent-kafka[json,schemaregistry]" # JSON Schema
pip install "confluent-kafka[json,schemaregistry]" # JSON Schema
pip install "confluent-kafka[protobuf,schemaregistry]" # Protobuf

# With Data Contract rules (includes CSFLE support)
Expand Down
15 changes: 8 additions & 7 deletions src/confluent_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,20 @@
:ivar float throttle_time: The amount of time (in seconds) the broker throttled (delayed) the request
"""

def __init__(self, broker_name, broker_id, throttle_time):
def __init__(self, broker_name: str,
broker_id: int,
throttle_time: float) -> None:
self.broker_name = broker_name
self.broker_id = broker_id
self.throttle_time = throttle_time

def __str__(self):
return "{}/{} throttled for {} ms".format(
self.broker_name, self.broker_id, int(self.throttle_time * 1000)
)
def __str__(self) -> str:
return "{}/{} throttled for {} ms".format(self.broker_name, self.broker_id,
int(self.throttle_time * 1000))


def _resolve_plugins(plugins):
"""Resolve embedded plugins from the wheel's library directory.
def _resolve_plugins(plugins: str) -> str:

Check failure on line 113 in src/confluent_kafka/__init__.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

src/confluent_kafka/__init__.py#L113

Refactor this function to reduce its Cognitive Complexity from 17 to the 15 allowed.
""" Resolve embedded plugins from the wheel's library directory.

For internal module use only.

Expand Down
28 changes: 15 additions & 13 deletions src/confluent_kafka/_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Optional
from enum import Enum
from .. import cimpl
from ..cimpl import TopicPartition


class Node:
Expand All @@ -35,14 +37,14 @@ class Node:
The rack for this node.
"""

def __init__(self, id, host, port, rack=None):
def __init__(self, id: int, host: str, port: int, rack: Optional[str] = None) -> None:
self.id = id
self.id_string = str(id)
self.host = host
self.port = port
self.rack = rack

def __str__(self):
def __str__(self) -> str:
return f"({self.id}) {self.host}:{self.port} {f'(Rack - {self.rack})' if self.rack else ''}"


Expand All @@ -60,7 +62,7 @@ class ConsumerGroupTopicPartitions:
List of topic partitions information.
"""

def __init__(self, group_id, topic_partitions=None):
def __init__(self, group_id: str, topic_partitions: Optional[List[TopicPartition]] = None) -> None:
self.group_id = group_id
self.topic_partitions = topic_partitions

Expand Down Expand Up @@ -89,8 +91,8 @@ class ConsumerGroupState(Enum):
#: Consumer Group is empty.
EMPTY = cimpl.CONSUMER_GROUP_STATE_EMPTY

def __lt__(self, other):
if self.__class__ != other.__class__:
def __lt__(self, other: object) -> bool:
if not isinstance(other, ConsumerGroupState):
return NotImplemented
return self.value < other.value

Expand All @@ -109,8 +111,8 @@ class ConsumerGroupType(Enum):
#: Classic Type
CLASSIC = cimpl.CONSUMER_GROUP_TYPE_CLASSIC

def __lt__(self, other):
if self.__class__ != other.__class__:
def __lt__(self, other: object) -> bool:
if not isinstance(other, ConsumerGroupType):
return NotImplemented
return self.value < other.value

Expand All @@ -126,7 +128,7 @@ class TopicCollection:
List of topic names.
"""

def __init__(self, topic_names):
def __init__(self, topic_names: List[str]) -> None:
self.topic_names = topic_names


Expand All @@ -147,7 +149,7 @@ class TopicPartitionInfo:
In-Sync-Replica brokers for the partition.
"""

def __init__(self, id, leader, replicas, isr):
def __init__(self, id: int, leader: Node, replicas: List[Node], isr: List[Node]) -> None:
self.id = id
self.leader = leader
self.replicas = replicas
Expand All @@ -165,8 +167,8 @@ class IsolationLevel(Enum):
READ_UNCOMMITTED = cimpl.ISOLATION_LEVEL_READ_UNCOMMITTED #: Receive all the offsets.
READ_COMMITTED = cimpl.ISOLATION_LEVEL_READ_COMMITTED #: Skip offsets belonging to an aborted transaction.

def __lt__(self, other):
if self.__class__ != other.__class__:
def __lt__(self, other: object) -> bool:
if not isinstance(other, IsolationLevel):
return NotImplemented
return self.value < other.value

Expand All @@ -184,7 +186,7 @@ class ElectionType(Enum):
#: Unclean election
UNCLEAN = cimpl.ELECTION_TYPE_UNCLEAN

def __lt__(self, other):
if self.__class__ != other.__class__:
def __lt__(self, other: object) -> bool:
if not isinstance(other, ElectionType):
return NotImplemented
return self.value < other.value
41 changes: 41 additions & 0 deletions src/confluent_kafka/_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2025 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Common type definitions for confluent_kafka package.

This module provides centralized type aliases to maintain DRY principle
and ensure consistency across the package.
"""

from typing import Any, Optional, Dict, Union, Callable, List, Tuple

# Headers can be either dict format or list of tuples format
HeadersType = Union[
Dict[str, Union[str, bytes, None]],
List[Tuple[str, Union[str, bytes, None]]]
]

# Serializer/Deserializer callback types (will need SerializationContext import where used)
Serializer = Callable[[Any, Any], bytes] # (obj, SerializationContext) -> bytes
Deserializer = Callable[[Optional[bytes], Any], Any] # (Optional[bytes], SerializationContext) -> obj

# Forward declarations for callback types that reference classes from cimpl
# These are defined here to avoid circular imports
DeliveryCallback = Callable[[Optional[Any], Any], None] # (KafkaError, Message) -> None
RebalanceCallback = Callable[[Any, List[Any]], None] # (Consumer, List[TopicPartition]) -> None
3 changes: 2 additions & 1 deletion src/confluent_kafka/_util/conversion_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Union, Type
from enum import Enum


class ConversionUtil:
@staticmethod
def convert_to_enum(val, enum_clazz):
def convert_to_enum(val: Union[str, int, Enum], enum_clazz: Type[Enum]) -> Enum:
if type(enum_clazz) is not type(Enum):
raise TypeError("'enum_clazz' must be of type Enum")

Expand Down
15 changes: 8 additions & 7 deletions src/confluent_kafka/_util/validation_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,45 +12,46 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, List
from ..cimpl import KafkaError

try:
string_type = basestring
string_type = basestring # type: ignore[name-defined]
except NameError:
string_type = str


class ValidationUtil:
@staticmethod
def check_multiple_not_none(obj, vars_to_check):
def check_multiple_not_none(obj: Any, vars_to_check: List[str]) -> None:
for param in vars_to_check:
ValidationUtil.check_not_none(obj, param)

@staticmethod
def check_not_none(obj, param):
def check_not_none(obj: Any, param: str) -> None:
if getattr(obj, param) is None:
raise ValueError("Expected %s to be not None" % (param,))

@staticmethod
def check_multiple_is_string(obj, vars_to_check):
def check_multiple_is_string(obj: Any, vars_to_check: List[str]) -> None:
for param in vars_to_check:
ValidationUtil.check_is_string(obj, param)

@staticmethod
def check_is_string(obj, param):
def check_is_string(obj: Any, param: str) -> None:
param_value = getattr(obj, param)
if param_value is not None and not isinstance(param_value, string_type):
raise TypeError("Expected %s to be a string" % (param,))

@staticmethod
def check_kafka_errors(errors):
def check_kafka_errors(errors: List[KafkaError]) -> None:
if not isinstance(errors, list):
raise TypeError("errors should be None or a list")
for error in errors:
if not isinstance(error, KafkaError):
raise TypeError("Expected list of KafkaError")

@staticmethod
def check_kafka_error(error):
def check_kafka_error(error: KafkaError) -> None:
if not isinstance(error, KafkaError):
raise TypeError("Expected error to be a KafkaError")
Loading