Skip to content

Commit 77c0343

Browse files
Revert "Chunk download latency (#634)"
This reverts commit b57c3f3.
1 parent ad6b356 commit 77c0343

File tree

17 files changed

+89
-218
lines changed

17 files changed

+89
-218
lines changed

src/databricks/sql/backend/sea/queue.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from typing import Dict, List, Optional, Tuple, Union, TYPE_CHECKING
66

77
from databricks.sql.cloudfetch.download_manager import ResultFileDownloadManager
8-
from databricks.sql.telemetry.models.enums import StatementType
98

109
from databricks.sql.cloudfetch.downloader import ResultSetDownloadHandler
1110

@@ -328,13 +327,9 @@ def __init__(
328327
super().__init__(
329328
max_download_threads=max_download_threads,
330329
ssl_options=ssl_options,
331-
statement_id=statement_id,
332330
schema_bytes=None,
333331
lz4_compressed=lz4_compressed,
334332
description=description,
335-
# TODO: fix these arguments when telemetry is implemented in SEA
336-
session_id_hex=None,
337-
chunk_id=0,
338333
)
339334

340335
logger.debug(

src/databricks/sql/backend/thrift_backend.py

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,9 @@
66
import time
77
import threading
88
from typing import List, Optional, Union, Any, TYPE_CHECKING
9-
from uuid import UUID
109

1110
from databricks.sql.result_set import ThriftResultSet
12-
from databricks.sql.telemetry.models.event import StatementType
11+
1312

1413
if TYPE_CHECKING:
1514
from databricks.sql.client import Cursor
@@ -901,7 +900,6 @@ def get_execution_result(
901900
max_download_threads=self.max_download_threads,
902901
ssl_options=self._ssl_options,
903902
is_direct_results=is_direct_results,
904-
session_id_hex=self._session_id_hex,
905903
)
906904

907905
def _wait_until_command_done(self, op_handle, initial_operation_status_resp):
@@ -1039,7 +1037,6 @@ def execute_command(
10391037
max_download_threads=self.max_download_threads,
10401038
ssl_options=self._ssl_options,
10411039
is_direct_results=is_direct_results,
1042-
session_id_hex=self._session_id_hex,
10431040
)
10441041

10451042
def get_catalogs(
@@ -1080,7 +1077,6 @@ def get_catalogs(
10801077
max_download_threads=self.max_download_threads,
10811078
ssl_options=self._ssl_options,
10821079
is_direct_results=is_direct_results,
1083-
session_id_hex=self._session_id_hex,
10841080
)
10851081

10861082
def get_schemas(
@@ -1127,7 +1123,6 @@ def get_schemas(
11271123
max_download_threads=self.max_download_threads,
11281124
ssl_options=self._ssl_options,
11291125
is_direct_results=is_direct_results,
1130-
session_id_hex=self._session_id_hex,
11311126
)
11321127

11331128
def get_tables(
@@ -1178,7 +1173,6 @@ def get_tables(
11781173
max_download_threads=self.max_download_threads,
11791174
ssl_options=self._ssl_options,
11801175
is_direct_results=is_direct_results,
1181-
session_id_hex=self._session_id_hex,
11821176
)
11831177

11841178
def get_columns(
@@ -1229,7 +1223,6 @@ def get_columns(
12291223
max_download_threads=self.max_download_threads,
12301224
ssl_options=self._ssl_options,
12311225
is_direct_results=is_direct_results,
1232-
session_id_hex=self._session_id_hex,
12331226
)
12341227

12351228
def _handle_execute_response(self, resp, cursor):
@@ -1264,7 +1257,6 @@ def fetch_results(
12641257
lz4_compressed: bool,
12651258
arrow_schema_bytes,
12661259
description,
1267-
chunk_id: int,
12681260
use_cloud_fetch=True,
12691261
):
12701262
thrift_handle = command_id.to_thrift_handle()
@@ -1302,16 +1294,9 @@ def fetch_results(
13021294
lz4_compressed=lz4_compressed,
13031295
description=description,
13041296
ssl_options=self._ssl_options,
1305-
session_id_hex=self._session_id_hex,
1306-
statement_id=command_id.to_hex_guid(),
1307-
chunk_id=chunk_id,
13081297
)
13091298

1310-
return (
1311-
queue,
1312-
resp.hasMoreRows,
1313-
len(resp.results.resultLinks) if resp.results.resultLinks else 0,
1314-
)
1299+
return queue, resp.hasMoreRows
13151300

13161301
def cancel_command(self, command_id: CommandId) -> None:
13171302
thrift_handle = command_id.to_thrift_handle()

src/databricks/sql/backend/types.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import logging
55

66
from databricks.sql.backend.utils.guid_utils import guid_to_hex_id
7-
from databricks.sql.telemetry.models.enums import StatementType
87
from databricks.sql.thrift_api.TCLIService import ttypes
98

109
logger = logging.getLogger(__name__)

src/databricks/sql/client.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -284,9 +284,7 @@ def read(self) -> Optional[OAuthToken]:
284284

285285
driver_connection_params = DriverConnectionParameters(
286286
http_path=http_path,
287-
mode=DatabricksClientType.SEA
288-
if self.session.use_sea
289-
else DatabricksClientType.THRIFT,
287+
mode=DatabricksClientType.THRIFT,
290288
host_info=HostDetails(host_url=server_hostname, port=self.session.port),
291289
auth_mech=TelemetryHelper.get_auth_mechanism(self.session.auth_provider),
292290
auth_flow=TelemetryHelper.get_auth_flow(self.session.auth_provider),

src/databricks/sql/cloudfetch/download_manager.py

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
import logging
22

33
from concurrent.futures import ThreadPoolExecutor, Future
4-
from typing import List, Union, Tuple, Optional
4+
from typing import List, Union
55

66
from databricks.sql.cloudfetch.downloader import (
77
ResultSetDownloadHandler,
88
DownloadableResultSettings,
99
DownloadedFile,
1010
)
1111
from databricks.sql.types import SSLOptions
12-
from databricks.sql.telemetry.models.event import StatementType
12+
1313
from databricks.sql.thrift_api.TCLIService.ttypes import TSparkArrowResultLink
1414

1515
logger = logging.getLogger(__name__)
@@ -22,31 +22,24 @@ def __init__(
2222
max_download_threads: int,
2323
lz4_compressed: bool,
2424
ssl_options: SSLOptions,
25-
session_id_hex: Optional[str],
26-
statement_id: str,
27-
chunk_id: int,
2825
):
29-
self._pending_links: List[Tuple[int, TSparkArrowResultLink]] = []
30-
self.chunk_id = chunk_id
31-
for i, link in enumerate(links, start=chunk_id):
26+
self._pending_links: List[TSparkArrowResultLink] = []
27+
for link in links:
3228
if link.rowCount <= 0:
3329
continue
3430
logger.debug(
35-
"ResultFileDownloadManager: adding file link, chunk id {}, start offset {}, row count: {}".format(
36-
i, link.startRowOffset, link.rowCount
31+
"ResultFileDownloadManager: adding file link, start offset {}, row count: {}".format(
32+
link.startRowOffset, link.rowCount
3733
)
3834
)
39-
self._pending_links.append((i, link))
40-
self.chunk_id += len(links)
35+
self._pending_links.append(link)
4136

4237
self._download_tasks: List[Future[DownloadedFile]] = []
4338
self._max_download_threads: int = max_download_threads
4439
self._thread_pool = ThreadPoolExecutor(max_workers=self._max_download_threads)
4540

4641
self._downloadable_result_settings = DownloadableResultSettings(lz4_compressed)
4742
self._ssl_options = ssl_options
48-
self.session_id_hex = session_id_hex
49-
self.statement_id = statement_id
5043

5144
def get_next_downloaded_file(
5245
self, next_row_offset: int
@@ -96,19 +89,14 @@ def _schedule_downloads(self):
9689
while (len(self._download_tasks) < self._max_download_threads) and (
9790
len(self._pending_links) > 0
9891
):
99-
chunk_id, link = self._pending_links.pop(0)
92+
link = self._pending_links.pop(0)
10093
logger.debug(
101-
"- chunk: {}, start: {}, row count: {}".format(
102-
chunk_id, link.startRowOffset, link.rowCount
103-
)
94+
"- start: {}, row count: {}".format(link.startRowOffset, link.rowCount)
10495
)
10596
handler = ResultSetDownloadHandler(
10697
settings=self._downloadable_result_settings,
10798
link=link,
10899
ssl_options=self._ssl_options,
109-
chunk_id=chunk_id,
110-
session_id_hex=self.session_id_hex,
111-
statement_id=self.statement_id,
112100
)
113101
task = self._thread_pool.submit(handler.run)
114102
self._download_tasks.append(task)
@@ -129,8 +117,7 @@ def add_link(self, link: TSparkArrowResultLink):
129117
link.startRowOffset, link.rowCount
130118
)
131119
)
132-
self._pending_links.append((self.chunk_id, link))
133-
self.chunk_id += 1
120+
self._pending_links.append(link)
134121

135122
def _shutdown_manager(self):
136123
# Clear download handlers and shutdown the thread pool

src/databricks/sql/cloudfetch/downloader.py

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import logging
22
from dataclasses import dataclass
3-
from typing import Optional
43

54
import requests
65
from requests.adapters import HTTPAdapter, Retry
@@ -10,8 +9,6 @@
109
from databricks.sql.thrift_api.TCLIService.ttypes import TSparkArrowResultLink
1110
from databricks.sql.exc import Error
1211
from databricks.sql.types import SSLOptions
13-
from databricks.sql.telemetry.latency_logger import log_latency
14-
from databricks.sql.telemetry.models.event import StatementType
1512

1613
logger = logging.getLogger(__name__)
1714

@@ -69,18 +66,11 @@ def __init__(
6966
settings: DownloadableResultSettings,
7067
link: TSparkArrowResultLink,
7168
ssl_options: SSLOptions,
72-
chunk_id: int,
73-
session_id_hex: Optional[str],
74-
statement_id: str,
7569
):
7670
self.settings = settings
7771
self.link = link
7872
self._ssl_options = ssl_options
79-
self.chunk_id = chunk_id
80-
self.session_id_hex = session_id_hex
81-
self.statement_id = statement_id
8273

83-
@log_latency(StatementType.QUERY)
8474
def run(self) -> DownloadedFile:
8575
"""
8676
Download the file described in the cloud fetch link.
@@ -90,8 +80,8 @@ def run(self) -> DownloadedFile:
9080
"""
9181

9282
logger.debug(
93-
"ResultSetDownloadHandler: starting file download, chunk id {}, offset {}, row count {}".format(
94-
self.chunk_id, self.link.startRowOffset, self.link.rowCount
83+
"ResultSetDownloadHandler: starting file download, offset {}, row count {}".format(
84+
self.link.startRowOffset, self.link.rowCount
9585
)
9686
)
9787

src/databricks/sql/result_set.py

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
ColumnQueue,
2323
)
2424
from databricks.sql.backend.types import CommandId, CommandState, ExecuteResponse
25-
from databricks.sql.telemetry.models.event import StatementType
2625

2726
logger = logging.getLogger(__name__)
2827

@@ -193,7 +192,6 @@ def __init__(
193192
connection: "Connection",
194193
execute_response: "ExecuteResponse",
195194
thrift_client: "ThriftDatabricksClient",
196-
session_id_hex: Optional[str],
197195
buffer_size_bytes: int = 104857600,
198196
arraysize: int = 10000,
199197
use_cloud_fetch: bool = True,
@@ -217,7 +215,6 @@ def __init__(
217215
:param ssl_options: SSL options for cloud fetch
218216
:param is_direct_results: Whether there are more rows to fetch
219217
"""
220-
self.num_downloaded_chunks = 0
221218

222219
# Initialize ThriftResultSet-specific attributes
223220
self._use_cloud_fetch = use_cloud_fetch
@@ -237,12 +234,7 @@ def __init__(
237234
lz4_compressed=execute_response.lz4_compressed,
238235
description=execute_response.description,
239236
ssl_options=ssl_options,
240-
session_id_hex=session_id_hex,
241-
statement_id=execute_response.command_id.to_hex_guid(),
242-
chunk_id=self.num_downloaded_chunks,
243237
)
244-
if t_row_set.resultLinks:
245-
self.num_downloaded_chunks += len(t_row_set.resultLinks)
246238

247239
# Call parent constructor with common attributes
248240
super().__init__(
@@ -266,7 +258,7 @@ def __init__(
266258
self._fill_results_buffer()
267259

268260
def _fill_results_buffer(self):
269-
results, is_direct_results, result_links_count = self.backend.fetch_results(
261+
results, is_direct_results = self.backend.fetch_results(
270262
command_id=self.command_id,
271263
max_rows=self.arraysize,
272264
max_bytes=self.buffer_size_bytes,
@@ -275,11 +267,9 @@ def _fill_results_buffer(self):
275267
arrow_schema_bytes=self._arrow_schema_bytes,
276268
description=self.description,
277269
use_cloud_fetch=self._use_cloud_fetch,
278-
chunk_id=self.num_downloaded_chunks,
279270
)
280271
self.results = results
281272
self.is_direct_results = is_direct_results
282-
self.num_downloaded_chunks += result_links_count
283273

284274
def _convert_columnar_table(self, table):
285275
column_names = [c[0] for c in self.description]

src/databricks/sql/session.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,10 @@ def _create_backend(
9797
kwargs: dict,
9898
) -> DatabricksClient:
9999
"""Create and return the appropriate backend client."""
100-
self.use_sea = kwargs.get("use_sea", False)
100+
use_sea = kwargs.get("use_sea", False)
101101

102102
databricks_client_class: Type[DatabricksClient]
103-
if self.use_sea:
103+
if use_sea:
104104
logger.debug("Creating SEA backend client")
105105
databricks_client_class = SeaDatabricksClient
106106
else:

0 commit comments

Comments
 (0)