From b235ae26e72f8734b947e3d0dce4c1b1840241f8 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Wed, 5 Nov 2025 21:37:10 +0530 Subject: [PATCH 1/3] Add null checks to fix SEGV --- src/confluent_kafka/src/Admin.c | 110 +++++++++++++++++++++- src/confluent_kafka/src/Consumer.c | 70 +++++++------- src/confluent_kafka/src/Metadata.c | 4 +- src/confluent_kafka/src/Producer.c | 44 ++++++++- src/confluent_kafka/src/confluent_kafka.h | 9 ++ tests/test_Admin.py | 93 ++++++++++++++++++ tests/test_Consumer.py | 62 ++++++++++++ tests/test_Producer.py | 54 ++++++++++- 8 files changed, 403 insertions(+), 43 deletions(-) diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index a97460027..bdd343c44 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -138,7 +138,7 @@ Admin_options_to_c (Handle *self, rd_kafka_admin_op_t for_api, char errstr[512]; if (!self->rk) { - PyErr_SetString(PyExc_RuntimeError, "AdminClient has been closed"); + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_ADMIN_CLIENT_CLOSED); return NULL; } @@ -578,6 +578,11 @@ static PyObject *Admin_create_topics (Handle *self, PyObject *args, &options.validate_only)) return NULL; + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_ADMIN_CLIENT_CLOSED); + return NULL; + } + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_CREATETOPICS, &options, future); if (!c_options) @@ -721,6 +726,11 @@ static PyObject *Admin_delete_topics (Handle *self, PyObject *args, return NULL; } + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_ADMIN_CLIENT_CLOSED); + return NULL; + } + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_DELETETOPICS, &options, future); if (!c_options) @@ -831,6 +841,11 @@ static PyObject *Admin_create_partitions (Handle *self, PyObject *args, &options.validate_only)) return NULL; + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_ADMIN_CLIENT_CLOSED); + return NULL; + } + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_CREATEPARTITIONS, &options, future); if (!c_options) @@ -952,6 +967,11 @@ static PyObject *Admin_describe_configs (Handle *self, PyObject *args, return NULL; } + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_ADMIN_CLIENT_CLOSED); + return NULL; + } + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS, &options, future); if (!c_options) @@ -1085,6 +1105,12 @@ static PyObject *Admin_incremental_alter_configs(Handle *self,PyObject *args,PyO !cfl_PyBool_get(validate_only_obj, "validate_only", &options.validate_only)) return NULL; + + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_ADMIN_CLIENT_CLOSED); + return NULL; + } + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_INCREMENTALALTERCONFIGS, &options, future); if (!c_options) @@ -1252,6 +1278,11 @@ static PyObject *Admin_alter_configs (Handle *self, PyObject *args, &options.validate_only)) return NULL; + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_ADMIN_CLIENT_CLOSED); + return NULL; + } + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_ALTERCONFIGS, &options, future); if (!c_options) @@ -1409,6 +1440,11 @@ static PyObject *Admin_create_acls (Handle *self, PyObject *args, PyObject *kwar goto err; } + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_ADMIN_CLIENT_CLOSED); + goto err; + } + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_CREATEACLS, &options, future); if (!c_options) @@ -1528,6 +1564,11 @@ static PyObject *Admin_describe_acls (Handle *self, PyObject *args, PyObject *kw goto err; } + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_ADMIN_CLIENT_CLOSED); + goto err; + } + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_CREATEACLS, &options, future); if (!c_options) @@ -1640,6 +1681,11 @@ static PyObject *Admin_delete_acls (Handle *self, PyObject *args, PyObject *kwar goto err; } + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_ADMIN_CLIENT_CLOSED); + goto err; + } + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_DELETEACLS, &options, future); if (!c_options) @@ -1804,6 +1850,11 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw } } + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_ADMIN_CLIENT_CLOSED); + goto err; + } + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS, &options, future); if (!c_options) { @@ -1890,6 +1941,11 @@ static PyObject *Admin_describe_user_scram_credentials(Handle *self, PyObject *a return NULL; } + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_ADMIN_CLIENT_CLOSED); + return NULL; + } + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_DESCRIBEUSERSCRAMCREDENTIALS, &options, future); if (!c_options) @@ -2054,6 +2110,11 @@ static PyObject *Admin_alter_user_scram_credentials(Handle *self, PyObject *args goto err; } + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_ADMIN_CLIENT_CLOSED); + goto err; + } + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_ALTERUSERSCRAMCREDENTIALS, &options, future); if (!c_options) @@ -2303,6 +2364,11 @@ PyObject *Admin_describe_consumer_groups (Handle *self, PyObject *args, PyObject Py_XDECREF(uogroup); } + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_ADMIN_CLIENT_CLOSED); + goto err; + } + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_DESCRIBECONSUMERGROUPS, &options, future); if (!c_options) { @@ -2426,6 +2492,11 @@ PyObject *Admin_describe_topics (Handle *self, PyObject *args, PyObject *kwargs) } } + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_ADMIN_CLIENT_CLOSED); + goto err; + } + c_topic_collection = rd_kafka_TopicCollection_of_topic_names(c_topics, topics_cnt); c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_DESCRIBETOPICS, &options, future); @@ -2514,6 +2585,11 @@ PyObject *Admin_describe_cluster (Handle *self, PyObject *args, PyObject *kwargs &options.include_authorized_operations)) goto err; + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_ADMIN_CLIENT_CLOSED); + goto err; + } + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_DESCRIBECLUSTER, &options, future); if (!c_options) { @@ -2586,6 +2662,11 @@ PyObject *Admin_delete_consumer_groups (Handle *self, PyObject *args, PyObject * goto err; } + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_ADMIN_CLIENT_CLOSED); + goto err; + } + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_DELETEGROUPS, &options, future); if (!c_options) { @@ -2702,6 +2783,11 @@ PyObject *Admin_list_consumer_group_offsets (Handle *self, PyObject *args, PyObj &options.require_stable_offsets)) return NULL; + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_ADMIN_CLIENT_CLOSED); + goto err; + } + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPOFFSETS, &options, future); if (!c_options) { @@ -2841,6 +2927,11 @@ PyObject *Admin_alter_consumer_group_offsets (Handle *self, PyObject *args, PyOb goto err; } + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_ADMIN_CLIENT_CLOSED); + goto err; + } + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_ALTERCONSUMERGROUPOFFSETS, &options, future); if (!c_options) { @@ -2972,6 +3063,11 @@ PyObject *Admin_list_offsets (Handle *self,PyObject *args, PyObject *kwargs) { goto err; } + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_ADMIN_CLIENT_CLOSED); + goto err; + } + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_LISTOFFSETS, &options, future); if (!c_options) { @@ -3054,6 +3150,11 @@ PyObject* Admin_delete_records (Handle *self,PyObject *args,PyObject *kwargs){ goto err; } + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_ADMIN_CLIENT_CLOSED); + goto err; + } + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_DELETERECORDS, &options, future); if (!c_options) { @@ -3148,6 +3249,11 @@ PyObject *Admin_elect_leaders(Handle *self, PyObject *args, PyObject *kwargs) { goto err; } + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_ADMIN_CLIENT_CLOSED); + goto err; + } + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_ELECTLEADERS, &options, future); if (!c_options) { @@ -3251,7 +3357,7 @@ static PyObject *Admin_poll (Handle *self, PyObject *args, return NULL; if (!self->rk) { - PyErr_SetString(PyExc_RuntimeError, "AdminClient has been closed"); + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_ADMIN_CLIENT_CLOSED); return NULL; } diff --git a/src/confluent_kafka/src/Consumer.c b/src/confluent_kafka/src/Consumer.c index 2b5fbfd09..9a747862d 100644 --- a/src/confluent_kafka/src/Consumer.c +++ b/src/confluent_kafka/src/Consumer.c @@ -120,8 +120,8 @@ static PyObject *Consumer_subscribe (Handle *self, PyObject *args, rd_kafka_resp_err_t err; if (!self->rk) { - PyErr_SetString(PyExc_RuntimeError, - "Consumer closed"); + PyErr_SetString(PyExc_RuntimeError, + ERR_MSG_CONSUMER_CLOSED); return NULL; } @@ -221,8 +221,8 @@ static PyObject *Consumer_unsubscribe (Handle *self, rd_kafka_resp_err_t err; if (!self->rk) { - PyErr_SetString(PyExc_RuntimeError, - "Consumer closed"); + PyErr_SetString(PyExc_RuntimeError, + ERR_MSG_CONSUMER_CLOSED); return NULL; } @@ -243,8 +243,8 @@ static PyObject *Consumer_incremental_assign (Handle *self, PyObject *tlist) { rd_kafka_error_t *error; if (!self->rk) { - PyErr_SetString(PyExc_RuntimeError, - "Consumer closed"); + PyErr_SetString(PyExc_RuntimeError, + ERR_MSG_CONSUMER_CLOSED); return NULL; } @@ -272,8 +272,8 @@ static PyObject *Consumer_assign (Handle *self, PyObject *tlist) { rd_kafka_resp_err_t err; if (!self->rk) { - PyErr_SetString(PyExc_RuntimeError, - "Consumer closed"); + PyErr_SetString(PyExc_RuntimeError, + ERR_MSG_CONSUMER_CLOSED); return NULL; } @@ -302,8 +302,8 @@ static PyObject *Consumer_unassign (Handle *self, PyObject *ignore) { rd_kafka_resp_err_t err; if (!self->rk) { - PyErr_SetString(PyExc_RuntimeError, - "Consumer closed"); + PyErr_SetString(PyExc_RuntimeError, + ERR_MSG_CONSUMER_CLOSED); return NULL; } @@ -327,8 +327,8 @@ static PyObject *Consumer_incremental_unassign (Handle *self, PyObject *tlist) { rd_kafka_error_t *error; if (!self->rk) { - PyErr_SetString(PyExc_RuntimeError, - "Consumer closed"); + PyErr_SetString(PyExc_RuntimeError, + ERR_MSG_CONSUMER_CLOSED); return NULL; } @@ -358,8 +358,8 @@ static PyObject *Consumer_assignment (Handle *self, PyObject *args, rd_kafka_resp_err_t err; if (!self->rk) { - PyErr_SetString(PyExc_RuntimeError, - "Consumer closed"); + PyErr_SetString(PyExc_RuntimeError, + ERR_MSG_CONSUMER_CLOSED); return NULL; } @@ -472,8 +472,8 @@ static PyObject *Consumer_commit (Handle *self, PyObject *args, PyThreadState *thread_state; if (!self->rk) { - PyErr_SetString(PyExc_RuntimeError, - "Consumer closed"); + PyErr_SetString(PyExc_RuntimeError, + ERR_MSG_CONSUMER_CLOSED); return NULL; } @@ -610,8 +610,8 @@ static PyObject *Consumer_store_offsets (Handle *self, PyObject *args, static char *kws[] = { "message", "offsets", NULL }; if (!self->rk) { - PyErr_SetString(PyExc_RuntimeError, - "Consumer closed"); + PyErr_SetString(PyExc_RuntimeError, + ERR_MSG_CONSUMER_CLOSED); return NULL; } @@ -697,8 +697,8 @@ static PyObject *Consumer_committed (Handle *self, PyObject *args, static char *kws[] = { "partitions", "timeout", NULL }; if (!self->rk) { - PyErr_SetString(PyExc_RuntimeError, - "Consumer closed"); + PyErr_SetString(PyExc_RuntimeError, + ERR_MSG_CONSUMER_CLOSED); return NULL; } @@ -739,8 +739,8 @@ static PyObject *Consumer_position (Handle *self, PyObject *args, static char *kws[] = { "partitions", NULL }; if (!self->rk) { - PyErr_SetString(PyExc_RuntimeError, - "Consumer closed"); + PyErr_SetString(PyExc_RuntimeError, + ERR_MSG_CONSUMER_CLOSED); return NULL; } @@ -830,7 +830,7 @@ static PyObject *Consumer_seek (Handle *self, PyObject *args, PyObject *kwargs) rd_kafka_error_t *error; if (!self->rk) { - PyErr_SetString(PyExc_RuntimeError, "Consumer closed"); + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_CONSUMER_CLOSED); return NULL; } @@ -889,8 +889,8 @@ static PyObject *Consumer_get_watermark_offsets (Handle *self, PyObject *args, PyObject *rtup; if (!self->rk) { - PyErr_SetString(PyExc_RuntimeError, - "Consumer closed"); + PyErr_SetString(PyExc_RuntimeError, + ERR_MSG_CONSUMER_CLOSED); return NULL; } @@ -952,8 +952,8 @@ static PyObject *Consumer_offsets_for_times (Handle *self, PyObject *args, static char *kws[] = { "partitions", "timeout", NULL }; if (!self->rk) { - PyErr_SetString(PyExc_RuntimeError, - "Consumer closed"); + PyErr_SetString(PyExc_RuntimeError, + ERR_MSG_CONSUMER_CLOSED); return NULL; } @@ -994,8 +994,8 @@ static PyObject *Consumer_poll (Handle *self, PyObject *args, CallState cs; if (!self->rk) { - PyErr_SetString(PyExc_RuntimeError, - "Consumer closed"); + PyErr_SetString(PyExc_RuntimeError, + ERR_MSG_CONSUMER_CLOSED); return NULL; } @@ -1032,8 +1032,8 @@ static PyObject *Consumer_memberid (Handle *self, PyObject *args, char *memberid; PyObject *memberidobj; if (!self->rk) { - PyErr_SetString(PyExc_RuntimeError, - "Consumer closed"); + PyErr_SetString(PyExc_RuntimeError, + ERR_MSG_CONSUMER_CLOSED); return NULL; } @@ -1066,8 +1066,8 @@ static PyObject *Consumer_consume (Handle *self, PyObject *args, Py_ssize_t i, n; if (!self->rk) { - PyErr_SetString(PyExc_RuntimeError, - "Consumer closed"); + PyErr_SetString(PyExc_RuntimeError, + ERR_MSG_CONSUMER_CLOSED); return NULL; } @@ -1176,8 +1176,8 @@ Consumer_consumer_group_metadata (Handle *self, PyObject *ignore) { PyObject *obj; if (!self->rk) { - PyErr_SetString(PyExc_RuntimeError, - "Consumer closed"); + PyErr_SetString(PyExc_RuntimeError, + ERR_MSG_CONSUMER_CLOSED); return NULL; } diff --git a/src/confluent_kafka/src/Metadata.c b/src/confluent_kafka/src/Metadata.c index bcc49c002..d8af708e7 100644 --- a/src/confluent_kafka/src/Metadata.c +++ b/src/confluent_kafka/src/Metadata.c @@ -371,7 +371,7 @@ list_topics (Handle *self, PyObject *args, PyObject *kwargs) { return NULL; if (!self->rk) { - PyErr_SetString(PyExc_RuntimeError, "Handle has been closed"); + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_HANDLE_CLOSED); return NULL; } @@ -611,7 +611,7 @@ list_groups (Handle *self, PyObject *args, PyObject *kwargs) { return NULL; if (!self->rk) { - PyErr_SetString(PyExc_RuntimeError, "Handle has been closed"); + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_HANDLE_CLOSED); return NULL; } diff --git a/src/confluent_kafka/src/Producer.c b/src/confluent_kafka/src/Producer.c index c4015b8d8..efc6cdc7b 100644 --- a/src/confluent_kafka/src/Producer.c +++ b/src/confluent_kafka/src/Producer.c @@ -306,7 +306,7 @@ static PyObject *Producer_produce (Handle *self, PyObject *args, dr_cb = self->u.Producer.default_dr_cb; if (!self->rk) { - PyErr_SetString(PyExc_RuntimeError, "Producer has been closed"); + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_PRODUCER_CLOSED); return NULL; } @@ -381,7 +381,7 @@ static PyObject *Producer_poll (Handle *self, PyObject *args, return NULL; if (!self->rk) { - PyErr_SetString(PyExc_RuntimeError, "Producer has been closed"); + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_PRODUCER_CLOSED); return NULL; } @@ -405,7 +405,7 @@ static PyObject *Producer_flush (Handle *self, PyObject *args, return NULL; if (!self->rk) { - PyErr_SetString(PyExc_RuntimeError, "Producer has been closed"); + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_PRODUCER_CLOSED); return NULL; } @@ -652,6 +652,11 @@ static PyObject *Producer_produce_batch (Handle *self, PyObject *args, return cfl_PyInt_FromInt(0); } + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_PRODUCER_CLOSED); + return NULL; + } + /* Allocate arrays for librdkafka messages and msgstates */ rkmessages = calloc(message_cnt, sizeof(*rkmessages)); msgstates = calloc(message_cnt, sizeof(*msgstates)); @@ -699,6 +704,11 @@ static PyObject *Producer_init_transactions (Handle *self, PyObject *args) { if (!PyArg_ParseTuple(args, "|d", &tmout)) return NULL; + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_PRODUCER_CLOSED); + return NULL; + } + CallState_begin(self, &cs); error = rd_kafka_init_transactions(self->rk, cfl_timeout_ms(tmout)); @@ -720,6 +730,11 @@ static PyObject *Producer_init_transactions (Handle *self, PyObject *args) { static PyObject *Producer_begin_transaction (Handle *self) { rd_kafka_error_t *error; + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_PRODUCER_CLOSED); + return NULL; + } + error = rd_kafka_begin_transaction(self->rk); if (error) { @@ -750,6 +765,13 @@ static PyObject *Producer_send_offsets_to_transaction(Handle *self, return NULL; } + if (!self->rk) { + rd_kafka_consumer_group_metadata_destroy(cgmd); + rd_kafka_topic_partition_list_destroy(c_offsets); + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_PRODUCER_CLOSED); + return NULL; + } + CallState_begin(self, &cs); error = rd_kafka_send_offsets_to_transaction(self->rk, c_offsets, @@ -781,6 +803,11 @@ static PyObject *Producer_commit_transaction(Handle *self, PyObject *args) { if (!PyArg_ParseTuple(args, "|d", &tmout)) return NULL; + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_PRODUCER_CLOSED); + return NULL; + } + CallState_begin(self, &cs); error = rd_kafka_commit_transaction(self->rk, cfl_timeout_ms(tmout)); @@ -807,6 +834,11 @@ static PyObject *Producer_abort_transaction(Handle *self, PyObject *args) { if (!PyArg_ParseTuple(args, "|d", &tmout)) return NULL; + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_PRODUCER_CLOSED); + return NULL; + } + CallState_begin(self, &cs); error = rd_kafka_abort_transaction(self->rk, cfl_timeout_ms(tmout)); @@ -838,6 +870,12 @@ static void *Producer_purge (Handle *self, PyObject *args, if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|bbb", kws, &in_queue, &in_flight, &blocking)) return NULL; + + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_PRODUCER_CLOSED); + return NULL; + } + if (in_queue) purge_strategy = RD_KAFKA_PURGE_F_QUEUE; if (in_flight) diff --git a/src/confluent_kafka/src/confluent_kafka.h b/src/confluent_kafka/src/confluent_kafka.h index 1b7b8880a..b69f75dbd 100644 --- a/src/confluent_kafka/src/confluent_kafka.h +++ b/src/confluent_kafka/src/confluent_kafka.h @@ -415,6 +415,15 @@ extern PyTypeObject TopicPartitionType; * * ****************************************************************************/ + +/** + * Error messages for uninitialized/closed Handle objects + */ +#define ERR_MSG_PRODUCER_CLOSED "Producer has been closed" +#define ERR_MSG_ADMIN_CLIENT_CLOSED "AdminClient has been closed" +#define ERR_MSG_CONSUMER_CLOSED "Consumer closed" +#define ERR_MSG_HANDLE_CLOSED "Handle has been closed" + #define PY_RD_KAFKA_ADMIN 100 /* There is no Admin client type in librdkafka, * so we use the producer type for now, * but we need to differentiate between a diff --git a/tests/test_Admin.py b/tests/test_Admin.py index a3fe6bf67..660a0d207 100644 --- a/tests/test_Admin.py +++ b/tests/test_Admin.py @@ -1520,3 +1520,96 @@ def test_admin_context_manager_with_admin_apis(): admin.create_acls([acl_binding]) with pytest.raises(RuntimeError, match="AdminClient has been closed"): admin.describe_consumer_groups(["test-group-2"]) + + +def test_uninitialized_admin_client_methods(): + """Test that all AdminClient methods raise RuntimeError when called on uninitialized instance. + """ + + class UninitializedAdmin(AdminClient): + def __init__(self, config): + # Don't call super().__init__() - leaves self->rk as NULL + pass + + admin = UninitializedAdmin({}) + + + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.create_topics([NewTopic("test", 1, 1)]) + + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.delete_topics(["test"]) + + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.create_partitions([NewPartitions("test", 2)]) + + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.describe_configs([ConfigResource(ResourceType.TOPIC, "test")]) + + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.incremental_alter_configs([ConfigResource(ResourceType.TOPIC, "test")]) + + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.alter_configs([ConfigResource(ResourceType.TOPIC, "test")]) + + acl_binding = AclBinding( + ResourceType.TOPIC, "topic1", ResourcePatternType.LITERAL, + "User:u1", "*", AclOperation.WRITE, AclPermissionType.ALLOW + ) + + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.create_acls([acl_binding]) + + acl_filter = AclBindingFilter(ResourceType.ANY, None, ResourcePatternType.ANY, + None, None, AclOperation.ANY, AclPermissionType.ANY) + + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.delete_acls([acl_filter]) + + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.describe_acls(acl_filter) + + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.list_consumer_groups() + + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.describe_user_scram_credentials(["user"]) + + scram_info = ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 10000) + upsertion = UserScramCredentialUpsertion("user", scram_info, b"password") + + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.alter_user_scram_credentials([upsertion]) + + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.describe_consumer_groups(["group"]) + + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.describe_topics(TopicCollection(["topic"])) + + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.describe_cluster() + + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.delete_consumer_groups(["group"]) + + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.list_consumer_group_offsets([ConsumerGroupTopicPartitions("group")]) + + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.alter_consumer_group_offsets([ConsumerGroupTopicPartitions("group", [TopicPartition("topic", 0, 5)])]) + + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.list_offsets({TopicPartition("topic", 0, 10): OffsetSpec.earliest()}) + + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.delete_records([TopicPartition("topic", 0, 10)]) + + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.elect_leaders(ElectionType.PREFERRED, [TopicPartition("topic", 0)]) + + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.poll(0.001) + + # Test __len__() - should return 0 for closed admin (safe, no crash) + assert len(admin) == 0 diff --git a/tests/test_Consumer.py b/tests/test_Consumer.py index c363ebbe8..9a0b91bf1 100644 --- a/tests/test_Consumer.py +++ b/tests/test_Consumer.py @@ -615,3 +615,65 @@ def test_consumer_context_manager_multiple_instances(): consumer1.subscribe(['mytopic']) with pytest.raises(RuntimeError): consumer2.subscribe(['mytopic2']) + + +def test_uninitialized_consumer_methods(): + """Test that all Consumer methods raise RuntimeError when called on uninitialized instance. + """ + class UninitializedConsumer(Consumer): + def __init__(self, config): + # Don't call super().__init__() - leaves self->rk as NULL + pass + + consumer = UninitializedConsumer({'group.id': 'test'}) + + with pytest.raises(RuntimeError, match="Consumer closed"): + consumer.subscribe(['topic']) + + with pytest.raises(RuntimeError, match="Consumer closed"): + consumer.unsubscribe() + + with pytest.raises(RuntimeError, match="Consumer closed"): + consumer.poll() + + with pytest.raises(RuntimeError, match="Consumer closed"): + consumer.consume() + + with pytest.raises(RuntimeError, match="Consumer closed"): + consumer.assign([TopicPartition('topic', 0)]) + + with pytest.raises(RuntimeError, match="Consumer closed"): + consumer.unassign() + + with pytest.raises(RuntimeError, match="Consumer closed"): + consumer.assignment() + + with pytest.raises(RuntimeError, match="Consumer closed"): + consumer.commit() + + with pytest.raises(RuntimeError, match="Consumer closed"): + consumer.committed([TopicPartition('topic', 0)]) + + with pytest.raises(RuntimeError, match="Consumer closed"): + consumer.position([TopicPartition('topic', 0)]) + + with pytest.raises(RuntimeError, match="Consumer closed"): + consumer.seek(TopicPartition('topic', 0, 0)) + + with pytest.raises(RuntimeError, match="Consumer closed"): + consumer.get_watermark_offsets(TopicPartition('topic', 0)) + + with pytest.raises(RuntimeError, match="Consumer closed"): + consumer.store_offsets([TopicPartition('topic', 0, 42)]) + + with pytest.raises(RuntimeError, match="Consumer closed"): + consumer.offsets_for_times([TopicPartition('topic', 0)]) + + with pytest.raises(RuntimeError, match="Handle has been closed"): + consumer.list_topics() + + + consumer.close() # Should succeed + + with pytest.raises(RuntimeError, match="Consumer closed"): + consumer.consumer_group_metadata() diff --git a/tests/test_Producer.py b/tests/test_Producer.py index f535d0e44..c78ea6ac0 100644 --- a/tests/test_Producer.py +++ b/tests/test_Producer.py @@ -6,7 +6,7 @@ import time from struct import pack -from confluent_kafka import Producer, KafkaError, KafkaException, \ +from confluent_kafka import Producer, Consumer, KafkaError, KafkaException, \ TopicPartition from tests.common import TestConsumer @@ -1343,3 +1343,55 @@ def on_delivery(err, msg): assert err.code() in (KafkaError._MSG_TIMED_OUT, KafkaError._TRANSPORT, KafkaError._TIMED_OUT), \ f"Expected success (err=None) or timeout/transport error, got {err.code()}" + + +def test_uninitialized_producer_methods(): + """Test that all Producer methods raise RuntimeError when called on uninitialized instance. + + This test verifies issue #1590 fix - prevents SEGV when subclassing Producer + without calling super().__init__(). + """ + class UninitializedProducer(Producer): + def __init__(self, config): + # Don't call super().__init__() - leaves self->rk as NULL + pass + + producer = UninitializedProducer({}) + + with pytest.raises(RuntimeError, match="Producer has been closed"): + producer.produce('topic', value=b'test') + + with pytest.raises(RuntimeError, match="Producer has been closed"): + producer.poll() + + with pytest.raises(RuntimeError, match="Producer has been closed"): + producer.flush() + + with pytest.raises(RuntimeError, match="Producer has been closed"): + producer.purge() + + with pytest.raises(RuntimeError, match="Producer has been closed"): + producer.produce_batch('topic', [{'value': b'test'}]) + + with pytest.raises(RuntimeError, match="Producer has been closed"): + producer.init_transactions() + + with pytest.raises(RuntimeError, match="Producer has been closed"): + producer.begin_transaction() + + # send_offsets_to_transaction - NULL check happens after argument parsing + consumer = Consumer({'group.id': 'test', 'socket.timeout.ms': 10}) + metadata = consumer.consumer_group_metadata() + consumer.close() + + with pytest.raises(RuntimeError, match="Producer has been closed"): + producer.send_offsets_to_transaction([TopicPartition('topic', 0)], metadata) + + with pytest.raises(RuntimeError, match="Producer has been closed"): + producer.commit_transaction() + + with pytest.raises(RuntimeError, match="Producer has been closed"): + producer.abort_transaction() + + # Test __len__() - should return 0 for closed producer (safe, no crash) + assert len(producer) == 0 From 14fc4a3e9cb2b0452636dedf77857ab3fb69ede1 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Thu, 6 Nov 2025 19:59:53 +0530 Subject: [PATCH 2/3] Fix flake8 --- tests/test_Admin.py | 57 +++++++++++++++++++++--------------------- tests/test_Consumer.py | 35 +++++++++++++------------- tests/test_Producer.py | 28 ++++++++++----------- 3 files changed, 59 insertions(+), 61 deletions(-) diff --git a/tests/test_Admin.py b/tests/test_Admin.py index 660a0d207..44fcd4683 100644 --- a/tests/test_Admin.py +++ b/tests/test_Admin.py @@ -1530,86 +1530,85 @@ class UninitializedAdmin(AdminClient): def __init__(self, config): # Don't call super().__init__() - leaves self->rk as NULL pass - + admin = UninitializedAdmin({}) - - + with pytest.raises(RuntimeError, match="AdminClient has been closed"): admin.create_topics([NewTopic("test", 1, 1)]) - + with pytest.raises(RuntimeError, match="AdminClient has been closed"): admin.delete_topics(["test"]) - + with pytest.raises(RuntimeError, match="AdminClient has been closed"): admin.create_partitions([NewPartitions("test", 2)]) - + with pytest.raises(RuntimeError, match="AdminClient has been closed"): admin.describe_configs([ConfigResource(ResourceType.TOPIC, "test")]) - + with pytest.raises(RuntimeError, match="AdminClient has been closed"): admin.incremental_alter_configs([ConfigResource(ResourceType.TOPIC, "test")]) - + with pytest.raises(RuntimeError, match="AdminClient has been closed"): admin.alter_configs([ConfigResource(ResourceType.TOPIC, "test")]) - + acl_binding = AclBinding( ResourceType.TOPIC, "topic1", ResourcePatternType.LITERAL, "User:u1", "*", AclOperation.WRITE, AclPermissionType.ALLOW ) - + with pytest.raises(RuntimeError, match="AdminClient has been closed"): admin.create_acls([acl_binding]) - + acl_filter = AclBindingFilter(ResourceType.ANY, None, ResourcePatternType.ANY, - None, None, AclOperation.ANY, AclPermissionType.ANY) - + None, None, AclOperation.ANY, AclPermissionType.ANY) + with pytest.raises(RuntimeError, match="AdminClient has been closed"): admin.delete_acls([acl_filter]) - + with pytest.raises(RuntimeError, match="AdminClient has been closed"): admin.describe_acls(acl_filter) - + with pytest.raises(RuntimeError, match="AdminClient has been closed"): admin.list_consumer_groups() - + with pytest.raises(RuntimeError, match="AdminClient has been closed"): admin.describe_user_scram_credentials(["user"]) - + scram_info = ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 10000) upsertion = UserScramCredentialUpsertion("user", scram_info, b"password") - + with pytest.raises(RuntimeError, match="AdminClient has been closed"): admin.alter_user_scram_credentials([upsertion]) - + with pytest.raises(RuntimeError, match="AdminClient has been closed"): admin.describe_consumer_groups(["group"]) - + with pytest.raises(RuntimeError, match="AdminClient has been closed"): admin.describe_topics(TopicCollection(["topic"])) - + with pytest.raises(RuntimeError, match="AdminClient has been closed"): admin.describe_cluster() - + with pytest.raises(RuntimeError, match="AdminClient has been closed"): admin.delete_consumer_groups(["group"]) - + with pytest.raises(RuntimeError, match="AdminClient has been closed"): admin.list_consumer_group_offsets([ConsumerGroupTopicPartitions("group")]) - + with pytest.raises(RuntimeError, match="AdminClient has been closed"): admin.alter_consumer_group_offsets([ConsumerGroupTopicPartitions("group", [TopicPartition("topic", 0, 5)])]) - + with pytest.raises(RuntimeError, match="AdminClient has been closed"): admin.list_offsets({TopicPartition("topic", 0, 10): OffsetSpec.earliest()}) - + with pytest.raises(RuntimeError, match="AdminClient has been closed"): admin.delete_records([TopicPartition("topic", 0, 10)]) - + with pytest.raises(RuntimeError, match="AdminClient has been closed"): admin.elect_leaders(ElectionType.PREFERRED, [TopicPartition("topic", 0)]) - + with pytest.raises(RuntimeError, match="AdminClient has been closed"): admin.poll(0.001) - + # Test __len__() - should return 0 for closed admin (safe, no crash) assert len(admin) == 0 diff --git a/tests/test_Consumer.py b/tests/test_Consumer.py index 9a0b91bf1..0b212d497 100644 --- a/tests/test_Consumer.py +++ b/tests/test_Consumer.py @@ -624,56 +624,55 @@ class UninitializedConsumer(Consumer): def __init__(self, config): # Don't call super().__init__() - leaves self->rk as NULL pass - + consumer = UninitializedConsumer({'group.id': 'test'}) - + with pytest.raises(RuntimeError, match="Consumer closed"): consumer.subscribe(['topic']) - + with pytest.raises(RuntimeError, match="Consumer closed"): consumer.unsubscribe() - + with pytest.raises(RuntimeError, match="Consumer closed"): consumer.poll() - + with pytest.raises(RuntimeError, match="Consumer closed"): consumer.consume() - + with pytest.raises(RuntimeError, match="Consumer closed"): consumer.assign([TopicPartition('topic', 0)]) - + with pytest.raises(RuntimeError, match="Consumer closed"): consumer.unassign() - + with pytest.raises(RuntimeError, match="Consumer closed"): consumer.assignment() - + with pytest.raises(RuntimeError, match="Consumer closed"): consumer.commit() - + with pytest.raises(RuntimeError, match="Consumer closed"): consumer.committed([TopicPartition('topic', 0)]) - + with pytest.raises(RuntimeError, match="Consumer closed"): consumer.position([TopicPartition('topic', 0)]) - + with pytest.raises(RuntimeError, match="Consumer closed"): consumer.seek(TopicPartition('topic', 0, 0)) - + with pytest.raises(RuntimeError, match="Consumer closed"): consumer.get_watermark_offsets(TopicPartition('topic', 0)) - + with pytest.raises(RuntimeError, match="Consumer closed"): consumer.store_offsets([TopicPartition('topic', 0, 42)]) - + with pytest.raises(RuntimeError, match="Consumer closed"): consumer.offsets_for_times([TopicPartition('topic', 0)]) - + with pytest.raises(RuntimeError, match="Handle has been closed"): consumer.list_topics() - consumer.close() # Should succeed - + with pytest.raises(RuntimeError, match="Consumer closed"): consumer.consumer_group_metadata() diff --git a/tests/test_Producer.py b/tests/test_Producer.py index c78ea6ac0..f94e4903f 100644 --- a/tests/test_Producer.py +++ b/tests/test_Producer.py @@ -1347,7 +1347,7 @@ def on_delivery(err, msg): def test_uninitialized_producer_methods(): """Test that all Producer methods raise RuntimeError when called on uninitialized instance. - + This test verifies issue #1590 fix - prevents SEGV when subclassing Producer without calling super().__init__(). """ @@ -1355,43 +1355,43 @@ class UninitializedProducer(Producer): def __init__(self, config): # Don't call super().__init__() - leaves self->rk as NULL pass - + producer = UninitializedProducer({}) - + with pytest.raises(RuntimeError, match="Producer has been closed"): producer.produce('topic', value=b'test') - + with pytest.raises(RuntimeError, match="Producer has been closed"): producer.poll() - + with pytest.raises(RuntimeError, match="Producer has been closed"): producer.flush() - + with pytest.raises(RuntimeError, match="Producer has been closed"): producer.purge() - + with pytest.raises(RuntimeError, match="Producer has been closed"): producer.produce_batch('topic', [{'value': b'test'}]) - + with pytest.raises(RuntimeError, match="Producer has been closed"): producer.init_transactions() - + with pytest.raises(RuntimeError, match="Producer has been closed"): producer.begin_transaction() - + # send_offsets_to_transaction - NULL check happens after argument parsing consumer = Consumer({'group.id': 'test', 'socket.timeout.ms': 10}) metadata = consumer.consumer_group_metadata() consumer.close() - + with pytest.raises(RuntimeError, match="Producer has been closed"): producer.send_offsets_to_transaction([TopicPartition('topic', 0)], metadata) - + with pytest.raises(RuntimeError, match="Producer has been closed"): producer.commit_transaction() - + with pytest.raises(RuntimeError, match="Producer has been closed"): producer.abort_transaction() - + # Test __len__() - should return 0 for closed producer (safe, no crash) assert len(producer) == 0 From 04ff523a8db70ac68bc01a43cc02c3717fbc363a Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Thu, 6 Nov 2025 20:05:28 +0530 Subject: [PATCH 3/3] Address copilot comments --- src/confluent_kafka/src/Producer.c | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/confluent_kafka/src/Producer.c b/src/confluent_kafka/src/Producer.c index efc6cdc7b..0e3d4a307 100644 --- a/src/confluent_kafka/src/Producer.c +++ b/src/confluent_kafka/src/Producer.c @@ -757,6 +757,11 @@ static PyObject *Producer_send_offsets_to_transaction(Handle *self, if (!PyArg_ParseTuple(args, "OO|d", &offsets, &metadata, &tmout)) return NULL; + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, ERR_MSG_PRODUCER_CLOSED); + return NULL; + } + if (!(c_offsets = py_to_c_parts(offsets))) return NULL; @@ -765,13 +770,6 @@ static PyObject *Producer_send_offsets_to_transaction(Handle *self, return NULL; } - if (!self->rk) { - rd_kafka_consumer_group_metadata_destroy(cgmd); - rd_kafka_topic_partition_list_destroy(c_offsets); - PyErr_SetString(PyExc_RuntimeError, ERR_MSG_PRODUCER_CLOSED); - return NULL; - } - CallState_begin(self, &cs); error = rd_kafka_send_offsets_to_transaction(self->rk, c_offsets,