Skip to content

Commit 817bac5

Browse files
k/s: Add support for describe_registry_acls tagged field
If this new tagged field is true, then return Schema Registry resources in the DescribeAcls response. Will also accept SR resource types if the new tagged field is true. Signed-off-by: Michael Boquard <[email protected]>
1 parent f31b881 commit 817bac5

File tree

4 files changed

+229
-6
lines changed

4 files changed

+229
-6
lines changed

src/v/kafka/server/handlers/describe_acls.cc

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ namespace kafka {
2828
static void fill_response(
2929
request_context& ctx,
3030
security::acl_binding_filter& filter,
31-
describe_acls_response_data& response) {
31+
describe_acls_response_data& response,
32+
bool describing_registry_resource) {
3233
/*
3334
* collapse common acls by pattern
3435
*/
@@ -47,10 +48,14 @@ static void fill_response(
4748
describe_acls_resource resource;
4849

4950
// resource pattern
50-
resource.type = details::to_kafka_resource_type(entry.first.resource());
51+
resource.type
52+
= describing_registry_resource
53+
? details::to_kafka_registry_resource_type(entry.first.resource())
54+
: details::to_kafka_resource_type(entry.first.resource());
5155
resource.name = entry.first.name();
5256
resource.pattern_type = details::to_kafka_pattern_type(
5357
entry.first.pattern());
58+
resource.registry_resource = describing_registry_resource;
5459

5560
// acl entries
5661
for (auto& acl : entry.second) {
@@ -113,7 +118,7 @@ ss::future<response_ptr> describe_acls_handler::handle(
113118

114119
try {
115120
auto filter = details::to_acl_binding_filter(request.data);
116-
fill_response(ctx, filter, data);
121+
fill_response(ctx, filter, data, request.data.describe_registry_acls);
117122
} catch (const details::acl_conversion_error& e) {
118123
vlog(klog.debug, "Error describing ACLs: {}", e.what());
119124
data.error_code = error_code::invalid_request;

src/v/kafka/server/handlers/details/security.h

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
#include "security/scram_credential.h"
1919
namespace kafka::details {
2020

21+
constexpr auto sr_subject_wire_value = 100;
22+
constexpr auto sr_registry_wire_value = 101;
23+
2124
using acl_conversion_error = security::acl_conversion_error;
2225

2326
inline security::acl_principal to_acl_principal(const ss::sstring& principal) {
@@ -52,6 +55,18 @@ inline security::resource_type to_resource_type(int8_t type) {
5255
}
5356
}
5457

58+
inline security::resource_type to_registry_resource_type(int8_t type) {
59+
switch (type) {
60+
case sr_subject_wire_value:
61+
return security::resource_type::sr_subject;
62+
case sr_registry_wire_value:
63+
return security::resource_type::sr_registry;
64+
default:
65+
throw acl_conversion_error(
66+
fmt::format("Invalid registry resource type: {}", type));
67+
}
68+
}
69+
5570
inline security::pattern_type to_pattern_type(int8_t type) {
5671
switch (type) {
5772
case 3:
@@ -156,7 +171,9 @@ to_resource_pattern_filter(const describe_acls_request_data& request) {
156171
// wildcard
157172
break;
158173
default:
159-
resource_type = to_resource_type(request.resource_type);
174+
resource_type = request.describe_registry_acls
175+
? to_registry_resource_type(request.resource_type)
176+
: to_resource_type(request.resource_type);
160177
}
161178

162179
std::optional<security::resource_pattern_filter::pattern_filter_type>
@@ -174,7 +191,12 @@ to_resource_pattern_filter(const describe_acls_request_data& request) {
174191
}
175192

176193
return security::resource_pattern_filter(
177-
resource_type, request.resource_name_filter, pattern_filter);
194+
resource_type,
195+
request.resource_name_filter,
196+
pattern_filter,
197+
request.describe_registry_acls
198+
? security::resource_pattern_filter::resource_subsystem::schema_registry
199+
: security::resource_pattern_filter::resource_subsystem::kafka);
178200
}
179201

180202
/*
@@ -238,7 +260,17 @@ inline int8_t to_kafka_resource_type(security::resource_type type) {
238260
vassert(
239261
false, "Schema Registry resources are not supported in kafka ACLs");
240262
}
241-
__builtin_unreachable();
263+
}
264+
265+
inline int8_t to_kafka_registry_resource_type(security::resource_type type) {
266+
switch (type) {
267+
case security::resource_type::sr_subject:
268+
return sr_subject_wire_value;
269+
case security::resource_type::sr_registry:
270+
return sr_registry_wire_value;
271+
default:
272+
vassert(false, "Request only for Schema Registry resources");
273+
}
242274
}
243275

244276
inline int8_t to_kafka_pattern_type(security::pattern_type type) {

src/v/kafka/server/tests/BUILD

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -934,3 +934,25 @@ redpanda_cc_btest(
934934
"@seastar//:testing",
935935
],
936936
)
937+
938+
redpanda_cc_btest(
939+
name = "describe_acls_test",
940+
timeout = "short",
941+
srcs = [
942+
"describe_acls_test.cc",
943+
],
944+
env = {"RP_FIXTURE_ENV": "1"},
945+
tags = ["exclusive"],
946+
deps = [
947+
"//src/v/cluster",
948+
"//src/v/kafka/protocol",
949+
"//src/v/kafka/protocol:describe_acls",
950+
"//src/v/kafka/server",
951+
"//src/v/model",
952+
"//src/v/redpanda/tests:fixture",
953+
"//src/v/security",
954+
"//src/v/test_utils:seastar_boost",
955+
"@seastar",
956+
"@seastar//:testing",
957+
],
958+
)
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
// Copyright 2025 Redpanda Data, Inc.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the file licenses/BSL.md
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0
9+
10+
#include "cluster/security_frontend.h"
11+
#include "kafka/protocol/describe_acls.h"
12+
#include "redpanda/tests/fixture.h"
13+
#include "security/acl.h"
14+
15+
using namespace std::chrono_literals;
16+
17+
class describe_acls_fixture : public redpanda_thread_fixture {
18+
protected:
19+
void create_acl(security::acl_binding binding) {
20+
app.controller->get_security_frontend()
21+
.local()
22+
.create_acls({std::move(binding)}, 5s)
23+
.get();
24+
}
25+
};
26+
27+
FIXTURE_TEST(describe_acls_any_kafka, describe_acls_fixture) {
28+
// This test verifies that when requesting any ACL and
29+
// "describe_registry_acls" is false, only receive back Kafka resource ACLs
30+
wait_for_controller_leadership().get();
31+
32+
create_acl(security::acl_binding{
33+
security::resource_pattern{
34+
security::resource_type::topic,
35+
"test-topic",
36+
security::pattern_type::literal},
37+
security::acl_entry{
38+
security::acl_principal{
39+
security::principal_type::user, "User:test-user"},
40+
security::acl_host::wildcard_host(),
41+
security::acl_operation::read,
42+
security::acl_permission::allow}});
43+
44+
create_acl(security::acl_binding{
45+
security::resource_pattern{
46+
security::resource_type::sr_subject,
47+
"test-topic-value",
48+
security::pattern_type::literal},
49+
security::acl_entry{
50+
security::acl_principal{
51+
security::principal_type::user, "User:test-user"},
52+
security::acl_host::wildcard_host(),
53+
security::acl_operation::read,
54+
security::acl_permission::allow}});
55+
56+
auto client = make_kafka_client().get();
57+
auto deferred_close = ss::defer([&client] { client.stop().get(); });
58+
client.connect().get();
59+
60+
kafka::describe_acls_request req;
61+
req.data.describe_registry_acls = false;
62+
req.data.resource_type = 1;
63+
req.data.resource_pattern_type = 1;
64+
req.data.operation = 1;
65+
req.data.permission_type = 1;
66+
67+
auto resp = client.dispatch(std::move(req), kafka::api_version(2)).get();
68+
BOOST_REQUIRE(!resp.data.errored());
69+
BOOST_REQUIRE_EQUAL(resp.data.error_code, kafka::error_code::none);
70+
BOOST_REQUIRE_EQUAL(resp.data.resources.size(), 1);
71+
BOOST_REQUIRE_EQUAL(resp.data.resources[0].type, 2);
72+
BOOST_REQUIRE_EQUAL(resp.data.resources[0].name, "test-topic");
73+
BOOST_REQUIRE(!resp.data.resources[0].registry_resource);
74+
}
75+
76+
FIXTURE_TEST(describe_acls_any_sr, describe_acls_fixture) {
77+
// This test verifies that when requesting any ACL and
78+
// "describe_registry_acls" is true, only receive back Schema Registry
79+
// resource ACLs
80+
wait_for_controller_leadership().get();
81+
82+
create_acl(security::acl_binding{
83+
security::resource_pattern{
84+
security::resource_type::topic,
85+
"test-topic",
86+
security::pattern_type::literal},
87+
security::acl_entry{
88+
security::acl_principal{
89+
security::principal_type::user, "User:test-user"},
90+
security::acl_host::wildcard_host(),
91+
security::acl_operation::read,
92+
security::acl_permission::allow}});
93+
94+
create_acl(security::acl_binding{
95+
security::resource_pattern{
96+
security::resource_type::sr_subject,
97+
"test-topic-value",
98+
security::pattern_type::literal},
99+
security::acl_entry{
100+
security::acl_principal{
101+
security::principal_type::user, "User:test-user"},
102+
security::acl_host::wildcard_host(),
103+
security::acl_operation::read,
104+
security::acl_permission::allow}});
105+
106+
auto client = make_kafka_client().get();
107+
auto deferred_close = ss::defer([&client] { client.stop().get(); });
108+
client.connect().get();
109+
110+
kafka::describe_acls_request req;
111+
req.data.describe_registry_acls = true;
112+
req.data.resource_type = 1;
113+
req.data.resource_pattern_type = 1;
114+
req.data.operation = 1;
115+
req.data.permission_type = 1;
116+
117+
auto resp = client.dispatch(std::move(req), kafka::api_version(2)).get();
118+
BOOST_REQUIRE(!resp.data.errored());
119+
BOOST_REQUIRE_EQUAL(resp.data.error_code, kafka::error_code::none);
120+
BOOST_REQUIRE_EQUAL(resp.data.resources.size(), 1);
121+
BOOST_REQUIRE_EQUAL(resp.data.resources[0].type, 100); // 100 is SR_SUBJECT
122+
BOOST_REQUIRE_EQUAL(resp.data.resources[0].name, "test-topic-value");
123+
BOOST_REQUIRE(resp.data.resources[0].registry_resource);
124+
}
125+
126+
FIXTURE_TEST(describe_acls_invalid_for_kafka, describe_acls_fixture) {
127+
auto client = make_kafka_client().get();
128+
auto deferred_close = ss::defer([&client] { client.stop().get(); });
129+
client.connect().get();
130+
131+
kafka::describe_acls_request req;
132+
req.data.describe_registry_acls = false;
133+
// Request SR_SUBJECT resource type, which is invalid if
134+
// describe_registry_acls is false
135+
req.data.resource_type = 100;
136+
req.data.resource_pattern_type = 1;
137+
req.data.operation = 1;
138+
req.data.permission_type = 1;
139+
140+
auto resp = client.dispatch(std::move(req), kafka::api_version(2)).get();
141+
BOOST_REQUIRE(resp.data.errored());
142+
BOOST_REQUIRE_EQUAL(
143+
resp.data.error_code, kafka::error_code::invalid_request);
144+
}
145+
146+
FIXTURE_TEST(describe_acls_invalid_for_sr, describe_acls_fixture) {
147+
auto client = make_kafka_client().get();
148+
auto deferred_close = ss::defer([&client] { client.stop().get(); });
149+
client.connect().get();
150+
151+
kafka::describe_acls_request req;
152+
req.data.describe_registry_acls = true;
153+
// Request TOPIC resource type, which is invalid if describe_registry_acls
154+
// is true
155+
req.data.resource_type = 2;
156+
req.data.resource_pattern_type = 1;
157+
req.data.operation = 1;
158+
req.data.permission_type = 1;
159+
160+
auto resp = client.dispatch(std::move(req), kafka::api_version(2)).get();
161+
BOOST_REQUIRE(resp.data.errored());
162+
BOOST_REQUIRE_EQUAL(
163+
resp.data.error_code, kafka::error_code::invalid_request);
164+
}

0 commit comments

Comments
 (0)