Skip to content

SI: Implement put operations #67

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 41 commits into from
Dec 30, 2022
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
28c3a59
Basic PUT operation. Currently this never executes because the server
Nov 15, 2022
1b245b1
Bump Spark CLI service protocol version being used.
Nov 15, 2022
1239def
Log when attempting a staging operation
Nov 15, 2022
b605cce
Fix failing unit tests since function signature for ExecuteResponse c…
Nov 16, 2022
3ed84d8
Add e2e test for put.
Nov 16, 2022
57b8a34
Bail on tests if staging_ingestion_user is not set
Nov 16, 2022
7812278
Black client.py
Nov 16, 2022
6b76439
Add unit test that sanity checks _handle_staging_operation is called
Nov 17, 2022
3df7c89
Fix imports so that this module can be run independently:
Nov 17, 2022
8f0a02e
Implement GET operation
Nov 17, 2022
55525cb
Refactor client.py into distinct methods for each ingestion command type
Nov 23, 2022
157ac3d
Update pypoetry so I can develop on Python 3.10
Nov 23, 2022
0739ccc
Applied PR feedback around explicit response codes.
Nov 23, 2022
d3a3651
Applying PR feedback
Nov 23, 2022
72f917e
PR feedback
Nov 23, 2022
fba64b7
Black client.py
Nov 23, 2022
c27a3d6
Refactor e2e test to use a single teste for PUT, GET, and REMOVE
Nov 23, 2022
19ca706
Make REMOVE command work
Nov 23, 2022
0167bd9
These methods don't need to know the `operation`
Nov 23, 2022
85e4d7c
Remove single quote that broke query
Nov 23, 2022
713002d
Remove unneeded argument
Nov 23, 2022
fc06ef8
Expect operation to succeed
Nov 23, 2022
cafa17d
Black PySQLStagingIngestionTestSuite only
Nov 23, 2022
a508a1c
Tidy up comments in e2e test
Nov 23, 2022
ce80df0
Basic e2e test scaffolded in. Currently fails.
Nov 23, 2022
36885a4
Only allow ingestion commands when base_uploads_path is specified
Nov 23, 2022
c0c09d4
Restrict local file operations to descendents of uploads_base_path
Nov 23, 2022
f612795
Remove per PR feedback
Dec 20, 2022
e609ef3
Add check for null local_file per PR feedback
Dec 20, 2022
cdbe2d6
Open output stream _after_ successful HTTP request
Dec 20, 2022
34a0362
Resolve relative paths before comparing row.localFile to uploads_base…
Dec 20, 2022
c8a64c7
Add test that PUT fails if file exists in staging location and OVERWR…
Dec 20, 2022
d48d3f3
Add tests: operations fail to modify another user's staging location
Dec 20, 2022
e0037e0
Add test that ingestion command fails if local file is blank
Dec 20, 2022
3fa5d84
Add test that invalid staging path will fail at server
Dec 20, 2022
4824b68
Basic usage example (needs tweaking)
Dec 22, 2022
469f35f
Add samples of GET and REMOVE
Dec 22, 2022
bdb948a
Refactor to allow uploads_base_path to be either a single string object
Dec 28, 2022
0261b7a
Refactor uploads_base_path to staging_allowed_local_path
Dec 29, 2022
00d8a49
Fix mypy static type failures
Dec 30, 2022
7a602e6
Black src files
Dec 30, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ export access_token=""
There are several e2e test suites available:
- `PySQLCoreTestSuite`
- `PySQLLargeQueriesSuite`
- `PySQLStagingIngestionTestSuite`
- `PySQLRetryTestSuite.HTTP503Suite` **[not documented]**
- `PySQLRetryTestSuite.HTTP429Suite` **[not documented]**
- `PySQLUnityCatalogTestSuite` **[not documented]**
Expand All @@ -122,6 +123,12 @@ To execute the core test suite:
poetry run python -m pytest tests/e2e/driver_tests.py::PySQLCoreTestSuite
```

The `PySQLCoreTestSuite` namespace contains tests for all of the connector's basic features and behaviours. This is the default namespace where tests should be written unless they require specially configured clusters or take an especially long-time to execute by design.

The `PySQLLargeQueriesSuite` namespace contains long-running query tests and is kept separate. In general, if the `PySQLCoreTestSuite` passes then these tests will as well.

The `PySQLStagingIngestionTestSuite` namespace requires a cluster running DBR version > 12.x which supports staging ingestion commands.

The suites marked `[not documented]` require additional configuration which will be documented at a later time.
### Code formatting

Expand Down
48 changes: 42 additions & 6 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ pyarrow = "^9.0.0"
lz4 = "^4.0.2"
requests=">2.18.1"
oauthlib=">=3.1.0"
numpy = [
{version = "1.21.1", python = ">=3.7,<3.8"},
{version = "1.23.4", python = ">=3.8"}
]

[tool.poetry.dev-dependencies]
pytest = "^7.1.2"
Expand Down
124 changes: 122 additions & 2 deletions src/databricks/sql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import pandas
import pyarrow
import requests
import json
import os

from databricks.sql import __version__
from databricks.sql import *
Expand All @@ -28,7 +31,7 @@ def __init__(
session_configuration: Dict[str, Any] = None,
catalog: Optional[str] = None,
schema: Optional[str] = None,
**kwargs
**kwargs,
) -> None:
"""
Connect to a Databricks SQL endpoint or a Databricks cluster.
Expand Down Expand Up @@ -173,7 +176,7 @@ def read(self) -> Optional[OAuthToken]:
http_path,
(http_headers or []) + base_headers,
auth_provider,
**kwargs
**kwargs,
)

self._session_handle = self.thrift_backend.open_session(
Expand Down Expand Up @@ -297,6 +300,117 @@ def _check_not_closed(self):
if not self.open:
raise Error("Attempting operation on closed cursor")

def _handle_staging_operation(self, uploads_base_path: str):
"""Fetch the HTTP request instruction from a staging ingestion command
and call the designated handler.

Raise an exception if localFile is specified by the server but the localFile
is not descended from uploads_base_path.
"""

if uploads_base_path is None:
raise Error(
"You must provide an uploads_base_path when initialising a connection to perform ingestion commands"
)

row = self.active_result_set.fetchone()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know self.active_result_set is introduced in this PR.
so this is merely a generic question rather that specific to staging.

if we are using a field member self.active_result_set for keeping a state that means we won't be able to support multi threading in an application which concurrently uses pysql. is this understanding correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by the first part of your question:

I know self.active_result_set is introduced in this PR.

I don't believe this is correct. active_result_set has been present since the first version of this library. It's present on main right now.

if we are using a field member self.active_result_set for keeping a state that means we won't be able to support multi threading in an application which concurrently uses pysql

You're pulling on a valid thread. But I disagree with this assessment. In general pysql works fine with multi-threading. In fact, multi-threading is required if you want to cancel a running query (which is reflected in PySQLCoreTestSuite.test_cancel_during_execute).

The specific scenario where active_result_set state would affect multi-threaded applications is if multiple threads are working with the same cursor. Is that a desirable usage pattern? I think there is usually one cursor per thread, in which case there's no issue with shared state.


if getattr(row, "localFile", None):
if os.path.commonpath([row.localFile, uploads_base_path]) != uploads_base_path:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if uploads_base_path = /Users/user1 and row.localFile = /Users/user1/../user2 what does this method return?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please add some tests for that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

tl;dr I updated the code in 34a0362 so that it resolves any relative paths before checking for their common_path. I added a test to prove this.

Before

/Users/user1 and /Users/user1/../user2 show a common path of /Users/user1 which is wrong.

After

/Users/user1 and /Users/user1/../user2 show a common path of /Users which is correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@moderakh Are there other cases we should consider?

raise Error("Local file operations are restricted to paths within the configured uploads_base_path")

# TODO: Experiment with DBR sending real headers.
# The specification says headers will be in JSON format but the current null value is actually an empty list []
handler_args = {
"presigned_url": row.presignedUrl,
"local_file": getattr(row, "localFile", None),
"headers": json.loads(row.headers or "{}"),
}

logger.debug(
f"Attempting staging operation indicated by server: {row.operation} - {getattr(row, 'localFile', '')}"
)

# TODO: Create a retry loop here to re-attempt if the request times out or fails
if row.operation == "GET":
return self._handle_staging_get(**handler_args)
elif row.operation == "PUT":
return self._handle_staging_put(**handler_args)
elif row.operation == "REMOVE":
# Local file isn't needed to remove a remote resource
handler_args.pop("local_file")
return self._handle_staging_remove(**handler_args)
else:
raise Error(
f"Operation {row.operation} is not supported. "
+ "Supported operations are GET, PUT, and REMOVE"
)

def _handle_staging_put(
self, presigned_url: str, local_file: str, headers: dict = None
):
"""Make an HTTP PUT request

Raise an exception if request fails. Returns no data.
"""

if local_file is None:
raise Error("Cannot perform PUT without specifying a local_file")

with open(local_file, "rb") as fh:
r = requests.put(url=presigned_url, data=fh, headers=headers)

# fmt: off
# Design borrowed from: https://stackoverflow.com/a/2342589/5093960

OK = requests.codes.ok # 200
CREATED = requests.codes.created # 201
ACCEPTED = requests.codes.accepted # 202
NO_CONTENT = requests.codes.no_content # 204

# fmt: on

if r.status_code not in [OK, CREATED, NO_CONTENT, ACCEPTED]:
raise Error(
f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}"
)

if r.status_code == ACCEPTED:
logger.debug(
f"Response code {ACCEPTED} from server indicates ingestion command was accepted "
+ "but not yet applied on the server. It's possible this command may fail later."
)

def _handle_staging_get(
self, local_file: str, presigned_url: str, headers: dict = None
):
"""Make an HTTP GET request, create a local file with the received data

Raise an exception if request fails. Returns no data.
"""

with open(local_file, "wb") as fp:
r = requests.get(url=presigned_url, headers=headers)

# response.ok verifies the status code is not between 400-600.
# Any 2xx or 3xx will evaluate r.ok == True
if not r.ok:
raise Error(
f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}"
)

fp.write(r.content)

def _handle_staging_remove(self, presigned_url: str, headers: dict = None):
"""Make an HTTP DELETE request to the presigned_url"""

r = requests.delete(url=presigned_url, headers=headers)

if not r.ok:
raise Error(
f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}"
)

def execute(
self, operation: str, parameters: Optional[Dict[str, str]] = None
) -> "Cursor":
Expand Down Expand Up @@ -331,6 +445,12 @@ def execute(
self.buffer_size_bytes,
self.arraysize,
)

if execute_response.is_staging_operation:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question for reviewers: is there any specifically desired end-state for the cursor after a staging operation? Maybe we return a new NamedTuple StagingOperationResult with properties of .successful:boolean and perhaps a copy of the operation and localFile that were used?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite get this question, but the cursor for now will return just one row and we should have reached the end of this cursor.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@susodapop could you please with an a sample code explain how this will provide different experience to the end user?

self._handle_staging_operation(
uploads_base_path=self.thrift_backend.uploads_base_path
)

return self

def executemany(self, operation, seq_of_parameters):
Expand Down
7 changes: 6 additions & 1 deletion src/databricks/sql/thrift_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(
http_path: str,
http_headers,
auth_provider: AuthProvider,
uploads_base_path: str = None,
**kwargs,
):
# Internal arguments in **kwargs:
Expand Down Expand Up @@ -110,6 +111,7 @@ def __init__(
else:
raise ValueError("No valid connection settings.")

self.uploads_base_path = uploads_base_path
self._initialize_retry_args(kwargs)
self._use_arrow_native_complex_types = kwargs.get(
"_use_arrow_native_complex_types", True
Expand Down Expand Up @@ -452,7 +454,7 @@ def open_session(self, session_configuration, catalog, schema):
initial_namespace = None

open_session_req = ttypes.TOpenSessionReq(
client_protocol_i64=ttypes.TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V6,
client_protocol_i64=ttypes.TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V7,
client_protocol=None,
initialNamespace=initial_namespace,
canUseMultipleCatalogs=True,
Expand Down Expand Up @@ -733,6 +735,8 @@ def _results_message_to_execute_response(self, resp, operation_state):
.to_pybytes()
)
lz4_compressed = t_result_set_metadata_resp.lz4Compressed
# TODO: will this fail if metadata doesn't include `isStagingOperation`?
is_staging_operation = t_result_set_metadata_resp.isStagingOperation
if direct_results and direct_results.resultSet:
assert direct_results.resultSet.results.startRowOffset == 0
assert direct_results.resultSetMetadata
Expand All @@ -752,6 +756,7 @@ def _results_message_to_execute_response(self, resp, operation_state):
has_been_closed_server_side=has_been_closed_server_side,
has_more_rows=has_more_rows,
lz4_compressed=lz4_compressed,
is_staging_operation=is_staging_operation,
command_handle=resp.operationHandle,
description=description,
arrow_schema_bytes=schema_bytes,
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/sql/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def remaining_rows(self) -> pyarrow.Table:

ExecuteResponse = namedtuple(
"ExecuteResponse",
"status has_been_closed_server_side has_more_rows description lz4_compressed "
"status has_been_closed_server_side has_more_rows description lz4_compressed is_staging_operation "
"command_handle arrow_queue arrow_schema_bytes",
)

Expand Down
Loading