Skip to content

Incorrect rows in inline fetch result #479

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Dec 22, 2024
4 changes: 4 additions & 0 deletions src/databricks/sql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ def execute(
self.thrift_backend,
self.buffer_size_bytes,
self.arraysize,
self.connection.use_cloud_fetch,
)

if execute_response.is_staging_operation:
Expand Down Expand Up @@ -1202,6 +1203,7 @@ def __init__(
thrift_backend: ThriftBackend,
result_buffer_size_bytes: int = DEFAULT_RESULT_BUFFER_SIZE_BYTES,
arraysize: int = 10000,
use_cloud_fetch: bool = True,
):
"""
A ResultSet manages the results of a single command.
Expand All @@ -1223,6 +1225,7 @@ def __init__(
self.description = execute_response.description
self._arrow_schema_bytes = execute_response.arrow_schema_bytes
self._next_row_index = 0
self._use_cloud_fetch = use_cloud_fetch

if execute_response.arrow_queue:
# In this case the server has taken the fast path and returned an initial batch of
Expand Down Expand Up @@ -1250,6 +1253,7 @@ def _fill_results_buffer(self):
lz4_compressed=self.lz4_compressed,
arrow_schema_bytes=self._arrow_schema_bytes,
description=self.description,
use_cloud_fetch=self._use_cloud_fetch,
)
self.results = results
self.has_more_rows = has_more_rows
Expand Down
12 changes: 7 additions & 5 deletions src/databricks/sql/thrift_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ def _handle_request_error(self, error_info, attempt, elapsed):

# FUTURE: Consider moving to https://github.com/litl/backoff or
# https://github.com/jd/tenacity for retry logic.
def make_request(self, method, request):
def make_request(self, method, request, retryable=True):
"""Execute given request, attempting retries when
1. Receiving HTTP 429/503 from server
2. OSError is raised during a GetOperationStatus
Expand Down Expand Up @@ -460,7 +460,7 @@ def attempt_request(attempt):
# return on success
# if available: bounded delay and retry
# if not: raise error
max_attempts = self._retry_stop_after_attempts_count
max_attempts = self._retry_stop_after_attempts_count if retryable else 1

# use index-1 counting for logging/human consistency
for attempt in range(1, max_attempts + 1):
Expand Down Expand Up @@ -1028,6 +1028,7 @@ def fetch_results(
lz4_compressed,
arrow_schema_bytes,
description,
use_cloud_fetch=True,
):
assert op_handle is not None

Expand All @@ -1044,10 +1045,11 @@ def fetch_results(
includeResultSetMetadata=True,
)

resp = self.make_request(self._client.FetchResults, req)
# Fetch results in Inline mode with FETCH_NEXT orientation are not idempotent and hence not retried
resp = self.make_request(self._client.FetchResults, req, use_cloud_fetch)
if resp.results.startRowOffset > expected_row_start_offset:
logger.warning(
"Expected results to start from {} but they instead start at {}".format(
raise DataError(
"fetch_results failed due to inconsistency in the state between the client and the server. Expected results to start from {} but they instead start at {}, some result batches must have been skipped".format(
expected_row_start_offset, resp.results.startRowOffset
)
)
Expand Down
1 change: 1 addition & 0 deletions tests/unit/test_fetches.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def fetch_results(
lz4_compressed,
arrow_schema_bytes,
description,
use_cloud_fetch=True,
):
nonlocal batch_index
results = FetchTests.make_arrow_queue(batch_list[batch_index])
Expand Down
Loading