Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/databricks/sql/thrift_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -898,8 +898,12 @@ def execute_command(
sessionHandle=session_handle,
statement=operation,
runAsync=True,
getDirectResults=ttypes.TSparkGetDirectResults(
maxRows=max_rows, maxBytes=max_bytes
# For async operation we don't want the direct results
getDirectResults=None
if async_op
else ttypes.TSparkGetDirectResults(
maxRows=max_rows,
maxBytes=max_bytes,
),
Comment on lines +902 to 907
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have a linter for python driver yet? if not, could you please open an issue to community or internally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jayantsing-db I can raise a new PR for that

canReadArrowResult=True if pyarrow else False,
canDecompressLZ4Result=lz4_compression,
Expand Down
66 changes: 59 additions & 7 deletions tests/e2e/test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,19 +177,22 @@ def test_cloud_fetch(self):
for i in range(len(cf_result)):
assert cf_result[i] == noop_result[i]

def test_execute_async(self):
def isExecuting(operation_state):
return not operation_state or operation_state in [
ttypes.TOperationState.RUNNING_STATE,
ttypes.TOperationState.PENDING_STATE,
]

class TestPySQLAsyncQueriesSuite(PySQLPytestTestCase):
def isExecuting(self, operation_state):
return not operation_state or operation_state in [
ttypes.TOperationState.RUNNING_STATE,
ttypes.TOperationState.PENDING_STATE,
]

def test_execute_async__long_running(self):

long_running_query = "SELECT COUNT(*) FROM RANGE(10000 * 16) x JOIN RANGE(10000) y ON FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') LIKE '%not%a%date%'"
with self.cursor() as cursor:
cursor.execute_async(long_running_query)

## Polling after every POLLING_INTERVAL seconds
while isExecuting(cursor.get_query_state()):
while self.isExecuting(cursor.get_query_state()):
time.sleep(self.POLLING_INTERVAL)
log.info("Polling the status in test_execute_async")

Expand All @@ -198,6 +201,55 @@ def isExecuting(operation_state):

assert result[0].asDict() == {"count(1)": 0}

def test_execute_async__small_result(self):
small_result_query = "SELECT 1"

with self.cursor() as cursor:
cursor.execute_async(small_result_query)

## Fake sleep for 5 secs
time.sleep(5)

## Polling after every POLLING_INTERVAL seconds
while self.isExecuting(cursor.get_query_state()):
time.sleep(self.POLLING_INTERVAL)
log.info("Polling the status in test_execute_async")

cursor.get_async_execution_result()
result = cursor.fetchall()

assert result[0].asDict() == {"1": 1}

def test_execute_async__large_result(self):
x_dimension = 1000
y_dimension = 1000
large_result_query = f"""
SELECT
x.id AS x_id,
y.id AS y_id,
FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') AS date
FROM
RANGE({x_dimension}) x
JOIN
RANGE({y_dimension}) y
"""

with self.cursor() as cursor:
cursor.execute_async(large_result_query)

## Fake sleep for 5 secs
time.sleep(5)

## Polling after every POLLING_INTERVAL seconds
while self.isExecuting(cursor.get_query_state()):
time.sleep(self.POLLING_INTERVAL)
log.info("Polling the status in test_execute_async")

cursor.get_async_execution_result()
result = cursor.fetchall()

assert len(result) == x_dimension * y_dimension


# Exclude Retry tests because they require specific setups, and LargeQueries too slow for core
# tests
Expand Down
Loading