Skip to content

Incorrect rows in inline fetch result #479

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Dec 22, 2024
7 changes: 7 additions & 0 deletions src/databricks/sql/auth/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class CommandType(Enum):
CLOSE_SESSION = "CloseSession"
CLOSE_OPERATION = "CloseOperation"
GET_OPERATION_STATUS = "GetOperationStatus"
FETCH_RESULTS_ORIENTATION_FETCH_NEXT = "FetchResultsOrientation_FETCH_NEXT"
OTHER = "Other"

@classmethod
Expand Down Expand Up @@ -362,6 +363,12 @@ def should_retry(self, method: str, status_code: int) -> Tuple[bool, str]:
if status_code == 501:
raise NonRecoverableNetworkError("Received code 501 from server.")

if self.command_type == CommandType.FETCH_RESULTS_ORIENTATION_FETCH_NEXT:
return (
False,
"FetchResults with FETCH_NEXT orientation are not idempotent and is not retried",
)

# Request failed and this method is not retryable. We only retry POST requests.
if not self._is_method_retryable(method):
return False, "Only POST requests are retried"
Expand Down
13 changes: 11 additions & 2 deletions src/databricks/sql/thrift_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,15 @@ def attempt_request(attempt):

# These three lines are no-ops if the v3 retry policy is not in use
if self.enable_v3_retries:
# Not to retry when FetchResults has orientation as FETCH_NEXT as it is not idempotent
if this_method_name == "FetchResults":
this_method_name += (
"Orientation_"
+ ttypes.TFetchOrientation._VALUES_TO_NAMES[
request.orientation
]
)

this_command_type = CommandType.get(this_method_name)
self._transport.set_retry_command_type(this_command_type)
self._transport.startRetryTimer()
Expand Down Expand Up @@ -1046,8 +1055,8 @@ def fetch_results(

resp = self.make_request(self._client.FetchResults, req)
if resp.results.startRowOffset > expected_row_start_offset:
logger.warning(
"Expected results to start from {} but they instead start at {}".format(
raise DataError(
"fetch_results failed due to inconsistency in the state between the client and the server. Expected results to start from {} but they instead start at {}, some result batches must have been skipped".format(
expected_row_start_offset, resp.results.startRowOffset
)
)
Expand Down
6 changes: 6 additions & 0 deletions tests/unit/test_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,9 @@ def test_sleep__retry_after_present(self, t_mock, retry_policy, error_history):
retry_policy.history = [error_history, error_history, error_history]
retry_policy.sleep(HTTPResponse(status=503, headers={"Retry-After": "3"}))
t_mock.assert_called_with(3)

def test_not_retryable__fetch_results_orientation_fetch_next(self, retry_policy):
HTTP_STATUS_CODES = [200, 429, 503, 504]
retry_policy.command_type = CommandType.FETCH_RESULTS_ORIENTATION_FETCH_NEXT
for status_code in HTTP_STATUS_CODES:
assert not retry_policy.is_retry("METHOD_NAME", status_code=status_code)
Loading