Skip to content

Commit 0e87374

Browse files
is_direct_results -> has_more_rows
Signed-off-by: varun-edachali-dbx <[email protected]>
1 parent 971969a commit 0e87374

File tree

6 files changed

+38
-46
lines changed

6 files changed

+38
-46
lines changed

src/databricks/sql/backend/thrift_backend.py

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -790,7 +790,7 @@ def _results_message_to_execute_response(self, resp, operation_state):
790790
direct_results = resp.directResults
791791
has_been_closed_server_side = direct_results and direct_results.closeOperation
792792

793-
is_direct_results = (
793+
has_more_rows = (
794794
(not direct_results)
795795
or (not direct_results.resultSet)
796796
or direct_results.resultSet.hasMoreRows
@@ -831,7 +831,7 @@ def _results_message_to_execute_response(self, resp, operation_state):
831831
result_format=t_result_set_metadata_resp.resultFormat,
832832
)
833833

834-
return execute_response, is_direct_results
834+
return execute_response, has_more_rows
835835

836836
def get_execution_result(
837837
self, command_id: CommandId, cursor: Cursor
@@ -876,7 +876,7 @@ def get_execution_result(
876876

877877
lz4_compressed = t_result_set_metadata_resp.lz4Compressed
878878
is_staging_operation = t_result_set_metadata_resp.isStagingOperation
879-
is_direct_results = resp.hasMoreRows
879+
has_more_rows = resp.hasMoreRows
880880

881881
status = CommandState.from_thrift_state(resp.status) or CommandState.RUNNING
882882

@@ -902,7 +902,7 @@ def get_execution_result(
902902
t_row_set=resp.results,
903903
max_download_threads=self.max_download_threads,
904904
ssl_options=self._ssl_options,
905-
is_direct_results=is_direct_results,
905+
has_more_rows=has_more_rows,
906906
)
907907

908908
def _wait_until_command_done(self, op_handle, initial_operation_status_resp):
@@ -1021,7 +1021,7 @@ def execute_command(
10211021
self._handle_execute_response_async(resp, cursor)
10221022
return None
10231023
else:
1024-
execute_response, is_direct_results = self._handle_execute_response(
1024+
execute_response, has_more_rows = self._handle_execute_response(
10251025
resp, cursor
10261026
)
10271027

@@ -1039,7 +1039,7 @@ def execute_command(
10391039
t_row_set=t_row_set,
10401040
max_download_threads=self.max_download_threads,
10411041
ssl_options=self._ssl_options,
1042-
is_direct_results=is_direct_results,
1042+
has_more_rows=has_more_rows,
10431043
session_id_hex=self._session_id_hex,
10441044
)
10451045

@@ -1062,9 +1062,7 @@ def get_catalogs(
10621062
)
10631063
resp = self.make_request(self._client.GetCatalogs, req)
10641064

1065-
execute_response, is_direct_results = self._handle_execute_response(
1066-
resp, cursor
1067-
)
1065+
execute_response, has_more_rows = self._handle_execute_response(resp, cursor)
10681066

10691067
t_row_set = None
10701068
if resp.directResults and resp.directResults.resultSet:
@@ -1080,7 +1078,7 @@ def get_catalogs(
10801078
t_row_set=t_row_set,
10811079
max_download_threads=self.max_download_threads,
10821080
ssl_options=self._ssl_options,
1083-
is_direct_results=is_direct_results,
1081+
has_more_rows=has_more_rows,
10841082
session_id_hex=self._session_id_hex,
10851083
)
10861084

@@ -1109,9 +1107,7 @@ def get_schemas(
11091107
)
11101108
resp = self.make_request(self._client.GetSchemas, req)
11111109

1112-
execute_response, is_direct_results = self._handle_execute_response(
1113-
resp, cursor
1114-
)
1110+
execute_response, has_more_rows = self._handle_execute_response(resp, cursor)
11151111

11161112
t_row_set = None
11171113
if resp.directResults and resp.directResults.resultSet:
@@ -1127,7 +1123,7 @@ def get_schemas(
11271123
t_row_set=t_row_set,
11281124
max_download_threads=self.max_download_threads,
11291125
ssl_options=self._ssl_options,
1130-
is_direct_results=is_direct_results,
1126+
has_more_rows=has_more_rows,
11311127
session_id_hex=self._session_id_hex,
11321128
)
11331129

@@ -1160,9 +1156,7 @@ def get_tables(
11601156
)
11611157
resp = self.make_request(self._client.GetTables, req)
11621158

1163-
execute_response, is_direct_results = self._handle_execute_response(
1164-
resp, cursor
1165-
)
1159+
execute_response, has_more_rows = self._handle_execute_response(resp, cursor)
11661160

11671161
t_row_set = None
11681162
if resp.directResults and resp.directResults.resultSet:
@@ -1178,7 +1172,7 @@ def get_tables(
11781172
t_row_set=t_row_set,
11791173
max_download_threads=self.max_download_threads,
11801174
ssl_options=self._ssl_options,
1181-
is_direct_results=is_direct_results,
1175+
has_more_rows=has_more_rows,
11821176
session_id_hex=self._session_id_hex,
11831177
)
11841178

@@ -1211,9 +1205,7 @@ def get_columns(
12111205
)
12121206
resp = self.make_request(self._client.GetColumns, req)
12131207

1214-
execute_response, is_direct_results = self._handle_execute_response(
1215-
resp, cursor
1216-
)
1208+
execute_response, has_more_rows = self._handle_execute_response(resp, cursor)
12171209

12181210
t_row_set = None
12191211
if resp.directResults and resp.directResults.resultSet:
@@ -1229,7 +1221,7 @@ def get_columns(
12291221
t_row_set=t_row_set,
12301222
max_download_threads=self.max_download_threads,
12311223
ssl_options=self._ssl_options,
1232-
is_direct_results=is_direct_results,
1224+
has_more_rows=has_more_rows,
12331225
session_id_hex=self._session_id_hex,
12341226
)
12351227

src/databricks/sql/result_set.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def __init__(
4343
command_id: CommandId,
4444
status: CommandState,
4545
has_been_closed_server_side: bool = False,
46-
is_direct_results: bool = False,
46+
has_more_rows: bool = False,
4747
results_queue=None,
4848
description: List[Tuple] = [],
4949
is_staging_operation: bool = False,
@@ -61,7 +61,7 @@ def __init__(
6161
:param command_id: The command ID
6262
:param status: The command status
6363
:param has_been_closed_server_side: Whether the command has been closed on the server
64-
:param is_direct_results: Whether the command has more rows
64+
:param has_more_rows: Whether the command has more rows
6565
:param results_queue: The results queue
6666
:param description: column description of the results
6767
:param is_staging_operation: Whether the command is a staging operation
@@ -76,7 +76,7 @@ def __init__(
7676
self.command_id = command_id
7777
self.status = status
7878
self.has_been_closed_server_side = has_been_closed_server_side
79-
self.is_direct_results = is_direct_results
79+
self.has_more_rows = has_more_rows
8080
self.results = results_queue
8181
self._is_staging_operation = is_staging_operation
8282
self.lz4_compressed = lz4_compressed
@@ -200,7 +200,7 @@ def __init__(
200200
t_row_set=None,
201201
max_download_threads: int = 10,
202202
ssl_options=None,
203-
is_direct_results: bool = True,
203+
has_more_rows: bool = True,
204204
):
205205
"""
206206
Initialize a ThriftResultSet with direct access to the ThriftDatabricksClient.
@@ -215,13 +215,13 @@ def __init__(
215215
:param t_row_set: The TRowSet containing result data (if available)
216216
:param max_download_threads: Maximum number of download threads for cloud fetch
217217
:param ssl_options: SSL options for cloud fetch
218-
:param is_direct_results: Whether there are more rows to fetch
218+
:param has_more_rows: Whether there are more rows to fetch
219219
"""
220220
self.num_downloaded_chunks = 0
221221

222222
# Initialize ThriftResultSet-specific attributes
223223
self._use_cloud_fetch = use_cloud_fetch
224-
self.is_direct_results = is_direct_results
224+
self.has_more_rows = has_more_rows
225225

226226
# Build the results queue if t_row_set is provided
227227
results_queue = None
@@ -253,7 +253,7 @@ def __init__(
253253
command_id=execute_response.command_id,
254254
status=execute_response.status,
255255
has_been_closed_server_side=execute_response.has_been_closed_server_side,
256-
is_direct_results=is_direct_results,
256+
has_more_rows=has_more_rows,
257257
results_queue=results_queue,
258258
description=execute_response.description,
259259
is_staging_operation=execute_response.is_staging_operation,
@@ -266,7 +266,7 @@ def __init__(
266266
self._fill_results_buffer()
267267

268268
def _fill_results_buffer(self):
269-
results, is_direct_results, result_links_count = self.backend.fetch_results(
269+
results, has_more_rows, result_links_count = self.backend.fetch_results(
270270
command_id=self.command_id,
271271
max_rows=self.arraysize,
272272
max_bytes=self.buffer_size_bytes,
@@ -278,7 +278,7 @@ def _fill_results_buffer(self):
278278
chunk_id=self.num_downloaded_chunks,
279279
)
280280
self.results = results
281-
self.is_direct_results = is_direct_results
281+
self.has_more_rows = has_more_rows
282282
self.num_downloaded_chunks += result_links_count
283283

284284
def _convert_columnar_table(self, table):
@@ -326,7 +326,7 @@ def fetchmany_arrow(self, size: int) -> "pyarrow.Table":
326326
while (
327327
n_remaining_rows > 0
328328
and not self.has_been_closed_server_side
329-
and self.is_direct_results
329+
and self.has_more_rows
330330
):
331331
self._fill_results_buffer()
332332
partial_results = self.results.next_n_rows(n_remaining_rows)
@@ -351,7 +351,7 @@ def fetchmany_columnar(self, size: int):
351351
while (
352352
n_remaining_rows > 0
353353
and not self.has_been_closed_server_side
354-
and self.is_direct_results
354+
and self.has_more_rows
355355
):
356356
self._fill_results_buffer()
357357
partial_results = self.results.next_n_rows(n_remaining_rows)
@@ -366,7 +366,7 @@ def fetchall_arrow(self) -> "pyarrow.Table":
366366
results = self.results.remaining_rows()
367367
self._next_row_index += results.num_rows
368368
partial_result_chunks = [results]
369-
while not self.has_been_closed_server_side and self.is_direct_results:
369+
while not self.has_been_closed_server_side and self.has_more_rows:
370370
self._fill_results_buffer()
371371
partial_results = self.results.remaining_rows()
372372
if isinstance(results, ColumnTable) and isinstance(
@@ -392,7 +392,7 @@ def fetchall_columnar(self):
392392
results = self.results.remaining_rows()
393393
self._next_row_index += results.num_rows
394394

395-
while not self.has_been_closed_server_side and self.is_direct_results:
395+
while not self.has_been_closed_server_side and self.has_more_rows:
396396
self._fill_results_buffer()
397397
partial_results = self.results.remaining_rows()
398398
results = self.merge_columnar(results, partial_results)

tests/unit/test_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def new(cls):
4646
is_staging_operation=False,
4747
command_id=None,
4848
has_been_closed_server_side=True,
49-
is_direct_results=True,
49+
has_more_rows=True,
5050
lz4_compressed=True,
5151
arrow_schema_bytes=b"schema",
5252
)

tests/unit/test_fetches_bench.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def make_dummy_result_set_from_initial_results(arrow_table):
3636
execute_response=ExecuteResponse(
3737
status=None,
3838
has_been_closed_server_side=True,
39-
is_direct_results=False,
39+
has_more_rows=False,
4040
description=Mock(),
4141
command_id=None,
4242
arrow_schema_bytes=arrow_table.schema,

tests/unit/test_sea_result_set.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def execute_response(self):
4646
mock_response.command_id = CommandId.from_sea_statement_id("test-statement-123")
4747
mock_response.status = CommandState.SUCCEEDED
4848
mock_response.has_been_closed_server_side = False
49-
mock_response.is_direct_results = False
49+
mock_response.has_more_rows = False
5050
mock_response.results_queue = None
5151
mock_response.description = [
5252
("col1", "string", None, None, None, None, None),

tests/unit/test_thrift_backend.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1016,10 +1016,10 @@ def test_fall_back_to_hive_schema_if_no_arrow_schema(self, tcli_service_class):
10161016
def test_handle_execute_response_reads_has_more_rows_in_direct_results(
10171017
self, tcli_service_class, build_queue
10181018
):
1019-
for is_direct_results, resp_type in itertools.product(
1019+
for has_more_rows, resp_type in itertools.product(
10201020
[True, False], self.execute_response_types
10211021
):
1022-
with self.subTest(is_direct_results=is_direct_results, resp_type=resp_type):
1022+
with self.subTest(has_more_rows=has_more_rows, resp_type=resp_type):
10231023
tcli_service_instance = tcli_service_class.return_value
10241024
results_mock = Mock()
10251025
results_mock.startRowOffset = 0
@@ -1031,7 +1031,7 @@ def test_handle_execute_response_reads_has_more_rows_in_direct_results(
10311031
resultSetMetadata=self.metadata_resp,
10321032
resultSet=ttypes.TFetchResultsResp(
10331033
status=self.okay_status,
1034-
hasMoreRows=is_direct_results,
1034+
hasMoreRows=has_more_rows,
10351035
results=results_mock,
10361036
),
10371037
closeOperation=Mock(),
@@ -1052,7 +1052,7 @@ def test_handle_execute_response_reads_has_more_rows_in_direct_results(
10521052
has_more_rows_result,
10531053
) = thrift_backend._handle_execute_response(execute_resp, Mock())
10541054

1055-
self.assertEqual(is_direct_results, has_more_rows_result)
1055+
self.assertEqual(has_more_rows, has_more_rows_result)
10561056

10571057
@patch(
10581058
"databricks.sql.utils.ThriftResultSetQueueFactory.build_queue",
@@ -1062,10 +1062,10 @@ def test_handle_execute_response_reads_has_more_rows_in_direct_results(
10621062
def test_handle_execute_response_reads_has_more_rows_in_result_response(
10631063
self, tcli_service_class, build_queue
10641064
):
1065-
for is_direct_results, resp_type in itertools.product(
1065+
for has_more_rows, resp_type in itertools.product(
10661066
[True, False], self.execute_response_types
10671067
):
1068-
with self.subTest(is_direct_results=is_direct_results, resp_type=resp_type):
1068+
with self.subTest(has_more_rows=has_more_rows, resp_type=resp_type):
10691069
tcli_service_instance = tcli_service_class.return_value
10701070
results_mock = MagicMock()
10711071
results_mock.startRowOffset = 0
@@ -1078,7 +1078,7 @@ def test_handle_execute_response_reads_has_more_rows_in_result_response(
10781078

10791079
fetch_results_resp = ttypes.TFetchResultsResp(
10801080
status=self.okay_status,
1081-
hasMoreRows=is_direct_results,
1081+
hasMoreRows=has_more_rows,
10821082
results=results_mock,
10831083
resultSetMetadata=ttypes.TGetResultSetMetadataResp(
10841084
resultFormat=ttypes.TSparkRowSetType.ARROW_BASED_SET
@@ -1112,7 +1112,7 @@ def test_handle_execute_response_reads_has_more_rows_in_result_response(
11121112
chunk_id=0,
11131113
)
11141114

1115-
self.assertEqual(is_direct_results, has_more_rows_resp)
1115+
self.assertEqual(has_more_rows, has_more_rows_resp)
11161116

11171117
@patch("databricks.sql.backend.thrift_backend.TCLIService.Client", autospec=True)
11181118
def test_arrow_batches_row_count_are_respected(self, tcli_service_class):

0 commit comments

Comments
 (0)