Skip to content

Commit ab4b73b

Browse files
authored
Incorrect rows in inline fetch result (#479)
* Raised error when incorrect Row offset it returned * Changed error type * grammar fix * Added unit tests and modified the code * Updated error message * Updated the non retying to only inline case * Updated fix * Changed the flow * Minor update * Updated the retryable condition * Minor test fix * Added extra space
1 parent 680b3b6 commit ab4b73b

File tree

3 files changed

+12
-5
lines changed

3 files changed

+12
-5
lines changed

src/databricks/sql/client.py

+4
Original file line numberDiff line numberDiff line change
@@ -808,6 +808,7 @@ def execute(
808808
self.thrift_backend,
809809
self.buffer_size_bytes,
810810
self.arraysize,
811+
self.connection.use_cloud_fetch,
811812
)
812813

813814
if execute_response.is_staging_operation:
@@ -1202,6 +1203,7 @@ def __init__(
12021203
thrift_backend: ThriftBackend,
12031204
result_buffer_size_bytes: int = DEFAULT_RESULT_BUFFER_SIZE_BYTES,
12041205
arraysize: int = 10000,
1206+
use_cloud_fetch: bool = True,
12051207
):
12061208
"""
12071209
A ResultSet manages the results of a single command.
@@ -1223,6 +1225,7 @@ def __init__(
12231225
self.description = execute_response.description
12241226
self._arrow_schema_bytes = execute_response.arrow_schema_bytes
12251227
self._next_row_index = 0
1228+
self._use_cloud_fetch = use_cloud_fetch
12261229

12271230
if execute_response.arrow_queue:
12281231
# In this case the server has taken the fast path and returned an initial batch of
@@ -1250,6 +1253,7 @@ def _fill_results_buffer(self):
12501253
lz4_compressed=self.lz4_compressed,
12511254
arrow_schema_bytes=self._arrow_schema_bytes,
12521255
description=self.description,
1256+
use_cloud_fetch=self._use_cloud_fetch,
12531257
)
12541258
self.results = results
12551259
self.has_more_rows = has_more_rows

src/databricks/sql/thrift_backend.py

+7-5
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ def _handle_request_error(self, error_info, attempt, elapsed):
321321

322322
# FUTURE: Consider moving to https://github.com/litl/backoff or
323323
# https://github.com/jd/tenacity for retry logic.
324-
def make_request(self, method, request):
324+
def make_request(self, method, request, retryable=True):
325325
"""Execute given request, attempting retries when
326326
1. Receiving HTTP 429/503 from server
327327
2. OSError is raised during a GetOperationStatus
@@ -460,7 +460,7 @@ def attempt_request(attempt):
460460
# return on success
461461
# if available: bounded delay and retry
462462
# if not: raise error
463-
max_attempts = self._retry_stop_after_attempts_count
463+
max_attempts = self._retry_stop_after_attempts_count if retryable else 1
464464

465465
# use index-1 counting for logging/human consistency
466466
for attempt in range(1, max_attempts + 1):
@@ -1028,6 +1028,7 @@ def fetch_results(
10281028
lz4_compressed,
10291029
arrow_schema_bytes,
10301030
description,
1031+
use_cloud_fetch=True,
10311032
):
10321033
assert op_handle is not None
10331034

@@ -1044,10 +1045,11 @@ def fetch_results(
10441045
includeResultSetMetadata=True,
10451046
)
10461047

1047-
resp = self.make_request(self._client.FetchResults, req)
1048+
# Fetch results in Inline mode with FETCH_NEXT orientation are not idempotent and hence not retried
1049+
resp = self.make_request(self._client.FetchResults, req, use_cloud_fetch)
10481050
if resp.results.startRowOffset > expected_row_start_offset:
1049-
logger.warning(
1050-
"Expected results to start from {} but they instead start at {}".format(
1051+
raise DataError(
1052+
"fetch_results failed due to inconsistency in the state between the client and the server. Expected results to start from {} but they instead start at {}, some result batches must have been skipped".format(
10511053
expected_row_start_offset, resp.results.startRowOffset
10521054
)
10531055
)

tests/unit/test_fetches.py

+1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ def fetch_results(
6666
lz4_compressed,
6767
arrow_schema_bytes,
6868
description,
69+
use_cloud_fetch=True,
6970
):
7071
nonlocal batch_index
7172
results = FetchTests.make_arrow_queue(batch_list[batch_index])

0 commit comments

Comments
 (0)