Skip to content

Commit 9590af7

Browse files
resolve merge artifacts
Signed-off-by: varun-edachali-dbx <[email protected]>
1 parent 7257168 commit 9590af7

File tree

2 files changed

+3
-252
lines changed

2 files changed

+3
-252
lines changed

src/databricks/sql/backend/sea/queue.py

Lines changed: 2 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -281,51 +281,14 @@ def __init__(
281281
self.link_fetcher.start()
282282

283283
# Initialize table and position
284-
self.table = self._create_table_from_link(self._current_chunk_link)
285-
286-
def _convert_to_thrift_link(self, link: "ExternalLink") -> TSparkArrowResultLink:
287-
"""Convert SEA external links to Thrift format for compatibility with existing download manager."""
288-
# Parse the ISO format expiration time
289-
expiry_time = int(dateutil.parser.parse(link.expiration).timestamp())
290-
return TSparkArrowResultLink(
291-
fileLink=link.external_link,
292-
expiryTime=expiry_time,
293-
rowCount=link.row_count,
294-
bytesNum=link.byte_count,
295-
startRowOffset=link.row_offset,
296-
httpHeaders=link.http_headers or {},
297-
)
284+
self.table = self._create_next_table()
298285

299286
def _get_chunk_link(self, chunk_index: int) -> Optional["ExternalLink"]:
300287
if chunk_index not in self._chunk_index_to_link:
301288
links = self._sea_client.get_chunk_links(self._statement_id, chunk_index)
302289
self._chunk_index_to_link.update({link.chunk_index: link for link in links})
303290
return self._chunk_index_to_link.get(chunk_index, None)
304291

305-
def _progress_chunk_link(self):
306-
"""Progress to the next chunk link."""
307-
if not self._current_chunk_link:
308-
return None
309-
310-
next_chunk_index = self._current_chunk_link.next_chunk_index
311-
312-
if next_chunk_index is None:
313-
self._current_chunk_link = None
314-
return None
315-
316-
self._current_chunk_link = self._get_chunk_link(next_chunk_index)
317-
if not self._current_chunk_link:
318-
logger.error(
319-
"SeaCloudFetchQueue: unable to retrieve link for chunk {}".format(
320-
next_chunk_index
321-
)
322-
)
323-
return None
324-
325-
logger.debug(
326-
f"SeaCloudFetchQueue: Progressed to link for chunk {next_chunk_index}: {self._current_chunk_link}"
327-
)
328-
329292
def _create_next_table(self) -> Union["pyarrow.Table", None]:
330293
"""Create next table by retrieving the logical next downloaded file."""
331294
if not self._current_chunk_link:
@@ -345,8 +308,4 @@ def _create_next_table(self) -> Union["pyarrow.Table", None]:
345308

346309
self.current_chunk_index += 1
347310

348-
if not self._current_chunk_link:
349-
logger.debug("SeaCloudFetchQueue: No current chunk link, returning")
350-
return None
351-
352-
return self._create_table_from_link(self._current_chunk_link)
311+
return arrow_table

tests/unit/test_sea_queue.py

Lines changed: 1 addition & 209 deletions
Original file line numberDiff line numberDiff line change
@@ -213,9 +213,7 @@ def test_build_queue_arrow_stream(
213213

214214
with patch(
215215
"databricks.sql.backend.sea.queue.ResultFileDownloadManager"
216-
), patch.object(
217-
SeaCloudFetchQueue, "_create_table_from_link", return_value=None
218-
):
216+
), patch.object(SeaCloudFetchQueue, "_create_next_table", return_value=None):
219217
queue = SeaResultSetQueueFactory.build_queue(
220218
result_data=result_data,
221219
manifest=arrow_manifest,
@@ -298,79 +296,6 @@ def sample_external_link_no_headers(self):
298296
http_headers=None,
299297
)
300298

301-
def test_convert_to_thrift_link(self, sample_external_link):
302-
"""Test conversion of ExternalLink to TSparkArrowResultLink."""
303-
queue = Mock(spec=SeaCloudFetchQueue)
304-
305-
# Call the method directly
306-
result = SeaCloudFetchQueue._convert_to_thrift_link(queue, sample_external_link)
307-
308-
# Verify the conversion
309-
assert result.fileLink == sample_external_link.external_link
310-
assert result.rowCount == sample_external_link.row_count
311-
assert result.bytesNum == sample_external_link.byte_count
312-
assert result.startRowOffset == sample_external_link.row_offset
313-
assert result.httpHeaders == sample_external_link.http_headers
314-
315-
def test_convert_to_thrift_link_no_headers(self, sample_external_link_no_headers):
316-
"""Test conversion of ExternalLink with no headers to TSparkArrowResultLink."""
317-
queue = Mock(spec=SeaCloudFetchQueue)
318-
319-
# Call the method directly
320-
result = SeaCloudFetchQueue._convert_to_thrift_link(
321-
queue, sample_external_link_no_headers
322-
)
323-
324-
# Verify the conversion
325-
assert result.fileLink == sample_external_link_no_headers.external_link
326-
assert result.rowCount == sample_external_link_no_headers.row_count
327-
assert result.bytesNum == sample_external_link_no_headers.byte_count
328-
assert result.startRowOffset == sample_external_link_no_headers.row_offset
329-
assert result.httpHeaders == {}
330-
331-
@patch("databricks.sql.backend.sea.queue.ResultFileDownloadManager")
332-
@patch("databricks.sql.backend.sea.queue.logger")
333-
def test_init_with_valid_initial_link(
334-
self,
335-
mock_logger,
336-
mock_download_manager_class,
337-
mock_sea_client,
338-
ssl_options,
339-
description,
340-
sample_external_link,
341-
):
342-
"""Test initialization with valid initial link."""
343-
mock_download_manager = Mock()
344-
mock_download_manager_class.return_value = mock_download_manager
345-
346-
# Create a queue with valid initial link
347-
with patch.object(SeaCloudFetchQueue, "_create_next_table", return_value=None):
348-
queue = SeaCloudFetchQueue(
349-
initial_links=[sample_external_link],
350-
max_download_threads=5,
351-
ssl_options=ssl_options,
352-
sea_client=mock_sea_client,
353-
statement_id="test-statement-123",
354-
total_chunk_count=1,
355-
lz4_compressed=False,
356-
description=description,
357-
)
358-
359-
# Verify debug message was logged
360-
mock_logger.debug.assert_called_with(
361-
"SeaCloudFetchQueue: Initialize CloudFetch loader for statement {}, total chunks: {}".format(
362-
"test-statement-123", 1
363-
)
364-
)
365-
366-
# Verify download manager was created
367-
mock_download_manager_class.assert_called_once()
368-
369-
# Verify attributes
370-
assert queue._statement_id == "test-statement-123"
371-
assert queue._current_chunk_link == sample_external_link
372-
assert queue.download_manager == mock_download_manager
373-
374299
@patch("databricks.sql.backend.sea.queue.ResultFileDownloadManager")
375300
@patch("databricks.sql.backend.sea.queue.logger")
376301
def test_init_no_initial_links(
@@ -410,136 +335,3 @@ def test_init_no_initial_links(
410335
not hasattr(queue, "_current_chunk_link")
411336
or queue._current_chunk_link is None
412337
)
413-
414-
@patch("databricks.sql.backend.sea.queue.ResultFileDownloadManager")
415-
@patch("databricks.sql.backend.sea.queue.logger")
416-
def test_init_non_zero_chunk_index(
417-
self,
418-
mock_logger,
419-
mock_download_manager_class,
420-
mock_sea_client,
421-
ssl_options,
422-
description,
423-
):
424-
"""Test initialization with non-zero chunk index initial link."""
425-
# Create a link with chunk_index != 0
426-
non_zero_link = ExternalLink(
427-
external_link="https://example.com/data/chunk1",
428-
expiration="2025-07-03T05:51:18.118009",
429-
row_count=100,
430-
byte_count=1024,
431-
row_offset=100,
432-
chunk_index=1,
433-
next_chunk_index=2,
434-
http_headers={"Authorization": "Bearer token123"},
435-
)
436-
437-
# Create a queue with non-zero chunk index
438-
queue = SeaCloudFetchQueue(
439-
initial_links=[non_zero_link],
440-
max_download_threads=5,
441-
ssl_options=ssl_options,
442-
sea_client=mock_sea_client,
443-
statement_id="test-statement-123",
444-
total_chunk_count=1,
445-
lz4_compressed=False,
446-
description=description,
447-
)
448-
449-
# Verify debug message was logged
450-
mock_logger.debug.assert_called_with(
451-
"SeaCloudFetchQueue: Initialize CloudFetch loader for statement {}, total chunks: {}".format(
452-
"test-statement-123", 1
453-
)
454-
)
455-
456-
# Verify download manager wasn't created (no chunk 0)
457-
mock_download_manager_class.assert_not_called()
458-
459-
@patch("databricks.sql.backend.sea.queue.logger")
460-
def test_progress_chunk_link_no_current_link(self, mock_logger):
461-
"""Test _progress_chunk_link with no current link."""
462-
# Create a queue instance without initializing
463-
queue = Mock(spec=SeaCloudFetchQueue)
464-
queue._current_chunk_link = None
465-
466-
# Call the method directly
467-
result = SeaCloudFetchQueue._progress_chunk_link(queue)
468-
469-
# Verify the result is None
470-
assert result is None
471-
472-
@patch("databricks.sql.backend.sea.queue.logger")
473-
def test_progress_chunk_link_no_next_chunk(self, mock_logger):
474-
"""Test _progress_chunk_link with no next chunk index."""
475-
# Create a queue instance without initializing
476-
queue = Mock(spec=SeaCloudFetchQueue)
477-
queue._current_chunk_link = ExternalLink(
478-
external_link="https://example.com/data/chunk0",
479-
expiration="2025-07-03T05:51:18.118009",
480-
row_count=100,
481-
byte_count=1024,
482-
row_offset=0,
483-
chunk_index=0,
484-
next_chunk_index=None,
485-
http_headers={"Authorization": "Bearer token123"},
486-
)
487-
488-
# Call the method directly
489-
result = SeaCloudFetchQueue._progress_chunk_link(queue)
490-
491-
# Verify the result is None
492-
assert result is None
493-
assert queue._current_chunk_link is None
494-
495-
@patch("databricks.sql.backend.sea.queue.logger")
496-
def test_create_next_table_no_current_link(self, mock_logger):
497-
"""Test _create_next_table with no current link."""
498-
# Create a queue instance without initializing
499-
queue = Mock(spec=SeaCloudFetchQueue)
500-
queue._current_chunk_link = None
501-
502-
# Call the method directly
503-
result = SeaCloudFetchQueue._create_next_table(queue)
504-
505-
# Verify debug message was logged
506-
mock_logger.debug.assert_called_with(
507-
"SeaCloudFetchQueue: No current chunk link, returning"
508-
)
509-
510-
# Verify the result is None
511-
assert result is None
512-
513-
@patch("databricks.sql.backend.sea.queue.logger")
514-
def test_create_next_table_success(self, mock_logger):
515-
"""Test _create_next_table with successful table creation."""
516-
# Create a queue instance without initializing
517-
queue = Mock(spec=SeaCloudFetchQueue)
518-
queue._current_chunk_link = ExternalLink(
519-
external_link="https://example.com/data/chunk0",
520-
expiration="2025-07-03T05:51:18.118009",
521-
row_count=100,
522-
byte_count=1024,
523-
row_offset=50,
524-
chunk_index=0,
525-
next_chunk_index=1,
526-
http_headers={"Authorization": "Bearer token123"},
527-
)
528-
queue.download_manager = Mock()
529-
530-
# Mock the dependencies
531-
mock_table = Mock()
532-
queue._create_table_at_offset = Mock(return_value=mock_table)
533-
queue._progress_chunk_link = Mock()
534-
535-
# Call the method directly
536-
result = SeaCloudFetchQueue._create_next_table(queue)
537-
538-
# Verify the table was created
539-
queue._create_table_at_offset.assert_called_once_with(50)
540-
541-
# Verify progress was called
542-
queue._progress_chunk_link.assert_called_once()
543-
544-
# Verify the result is the table
545-
assert result == mock_table

0 commit comments

Comments
 (0)