From 28c3a59e2e7feb5b22913501be21993c9ef00b04 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Tue, 15 Nov 2022 11:03:54 -0600 Subject: [PATCH 01/41] Basic PUT operation. Currently this never executes because the server doesn't set `isStagingOperation` flag to True...researching Signed-off-by: Jesse Whitehouse --- src/databricks/sql/client.py | 39 ++++++++++++++++++++++++++++ src/databricks/sql/thrift_backend.py | 3 +++ src/databricks/sql/utils.py | 2 +- 3 files changed, 43 insertions(+), 1 deletion(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 53b0c971..dec02d0c 100644 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -2,6 +2,7 @@ import pandas import pyarrow +import requests from databricks.sql import __version__ from databricks.sql import * @@ -297,6 +298,40 @@ def _check_not_closed(self): if not self.open: raise Error("Attempting operation on closed cursor") + def _handle_staging_operation(self): + """Make HTTP request using instructions provided by server + """ + + row = self.active_result_set.fetchone() + + # TODO: Handle headers. What format will gateway send? json? plaintext? + operation, presigned_url, local_file, headers = row.operation, row.presignedUrl, row.localFile, None + + operation_map = { + "GET" : requests.get, + "PUT" : requests.put, + "REMOVE" : requests.delete + } + + req_func = operation_map[operation] + + if local_file: + raw_data = open(local_file, 'rb') + else: + raw_data = None + + + rq_func_args = dict( + url=presigned_url, + data=raw_data + ) + + # Call the function + resp = req_func(**rq_func_args) + + + + def execute( self, operation: str, parameters: Optional[Dict[str, str]] = None ) -> "Cursor": @@ -331,6 +366,10 @@ def execute( self.buffer_size_bytes, self.arraysize, ) + + if execute_response.is_staging_operation: + self._handle_staging_operation() + return self def executemany(self, operation, seq_of_parameters): diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 48d7c201..375a389a 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -733,6 +733,8 @@ def _results_message_to_execute_response(self, resp, operation_state): .to_pybytes() ) lz4_compressed = t_result_set_metadata_resp.lz4Compressed + # TODO: will this fail if metadata doesn't include `isStagingOperation`? + is_staging_operation = t_result_set_metadata_resp.isStagingOperation if direct_results and direct_results.resultSet: assert direct_results.resultSet.results.startRowOffset == 0 assert direct_results.resultSetMetadata @@ -752,6 +754,7 @@ def _results_message_to_execute_response(self, resp, operation_state): has_been_closed_server_side=has_been_closed_server_side, has_more_rows=has_more_rows, lz4_compressed=lz4_compressed, + is_staging_operation=is_staging_operation, command_handle=resp.operationHandle, description=description, arrow_schema_bytes=schema_bytes, diff --git a/src/databricks/sql/utils.py b/src/databricks/sql/utils.py index 9c466886..ae411c7d 100644 --- a/src/databricks/sql/utils.py +++ b/src/databricks/sql/utils.py @@ -40,7 +40,7 @@ def remaining_rows(self) -> pyarrow.Table: ExecuteResponse = namedtuple( "ExecuteResponse", - "status has_been_closed_server_side has_more_rows description lz4_compressed " + "status has_been_closed_server_side has_more_rows description lz4_compressed is_staging_operation " "command_handle arrow_queue arrow_schema_bytes", ) From 1b245b1db3bf3d5efe3ba64f6bdd1b69b1ad741d Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Tue, 15 Nov 2022 11:20:37 -0600 Subject: [PATCH 02/41] Bump Spark CLI service protocol version being used. This upgrade now captures the isStagingOperation flag. Staging Ops still don't work because the flag shows false. Researching... Signed-off-by: Jesse Whitehouse --- src/databricks/sql/thrift_backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 375a389a..c4b19fa8 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -452,7 +452,7 @@ def open_session(self, session_configuration, catalog, schema): initial_namespace = None open_session_req = ttypes.TOpenSessionReq( - client_protocol_i64=ttypes.TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V6, + client_protocol_i64=ttypes.TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V7, client_protocol=None, initialNamespace=initial_namespace, canUseMultipleCatalogs=True, From 1239def4fd00a958c113ab775f69d832b6fb27ee Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Tue, 15 Nov 2022 11:28:02 -0600 Subject: [PATCH 03/41] Log when attempting a staging operation Signed-off-by: Jesse Whitehouse --- src/databricks/sql/client.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index dec02d0c..449fa562 100644 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -325,13 +325,11 @@ def _handle_staging_operation(self): url=presigned_url, data=raw_data ) - + + logger.debug("Attempting staging operation: {} - {}".format(operation, local_file)) # Call the function resp = req_func(**rq_func_args) - - - def execute( self, operation: str, parameters: Optional[Dict[str, str]] = None ) -> "Cursor": From b605cce18d70579533173f797b893bd08e83f15d Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Wed, 16 Nov 2022 15:17:29 -0600 Subject: [PATCH 04/41] Fix failing unit tests since function signature for ExecuteResponse changed Signed-off-by: Jesse Whitehouse --- tests/unit/test_fetches.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_fetches.py b/tests/unit/test_fetches.py index 33fca075..7d5686f8 100644 --- a/tests/unit/test_fetches.py +++ b/tests/unit/test_fetches.py @@ -41,7 +41,8 @@ def make_dummy_result_set_from_initial_results(initial_results): lz4_compressed=Mock(), command_handle=None, arrow_queue=arrow_queue, - arrow_schema_bytes=schema.serialize().to_pybytes())) + arrow_schema_bytes=schema.serialize().to_pybytes(), + is_staging_operation=False)) num_cols = len(initial_results[0]) if initial_results else 0 rs.description = [(f'col{col_id}', 'integer', None, None, None, None, None) for col_id in range(num_cols)] @@ -75,7 +76,8 @@ def fetch_results(op_handle, max_rows, max_bytes, expected_row_start_offset, lz4 lz4_compressed=Mock(), command_handle=None, arrow_queue=None, - arrow_schema_bytes=None)) + arrow_schema_bytes=None, + is_staging_operation=False)) return rs def assertEqualRowValues(self, actual, expected): From 3ed84d8b495ab41c8b3e0683ac760f85fce3b779 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Wed, 16 Nov 2022 17:06:47 -0600 Subject: [PATCH 05/41] Add e2e test for put. Stub out delete and get tests so it's clear what has and has not been done. Signed-off-by: Jesse Whitehouse --- CONTRIBUTING.md | 7 ++++++ src/databricks/sql/client.py | 11 ++++++--- tests/e2e/driver_tests.py | 46 ++++++++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 3 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 386ba5da..c4c9280f 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -112,6 +112,7 @@ export access_token="" There are several e2e test suites available: - `PySQLCoreTestSuite` - `PySQLLargeQueriesSuite` +- `PySQLStagingIngestionTestSuite` - `PySQLRetryTestSuite.HTTP503Suite` **[not documented]** - `PySQLRetryTestSuite.HTTP429Suite` **[not documented]** - `PySQLUnityCatalogTestSuite` **[not documented]** @@ -122,6 +123,12 @@ To execute the core test suite: poetry run python -m pytest tests/e2e/driver_tests.py::PySQLCoreTestSuite ``` +The `PySQLCoreTestSuite` namespace contains tests for all of the connector's basic features and behaviours. This is the default namespace where tests should be written unless they require specially configured clusters or take an especially long-time to execute by design. + +The `PySQLLargeQueriesSuite` namespace contains long-running query tests and is kept separate. In general, if the `PySQLCoreTestSuite` passes then these tests will as well. + +The `PySQLStagingIngestionTestSuite` namespace requires a cluster running DBR version > 13.x which supports staging ingestion commands. + The suites marked `[not documented]` require additional configuration which will be documented at a later time. ### Code formatting diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 449fa562..3fa3d60d 100644 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -308,18 +308,19 @@ def _handle_staging_operation(self): operation, presigned_url, local_file, headers = row.operation, row.presignedUrl, row.localFile, None operation_map = { - "GET" : requests.get, "PUT" : requests.put, - "REMOVE" : requests.delete } + if operation not in operation_map: + raise Error("Operation {} is not supported. Supported operations are {}".format(operation, ",".join(operation_map.keys()))) + req_func = operation_map[operation] if local_file: raw_data = open(local_file, 'rb') else: raw_data = None - + rq_func_args = dict( url=presigned_url, @@ -327,9 +328,13 @@ def _handle_staging_operation(self): ) logger.debug("Attempting staging operation: {} - {}".format(operation, local_file)) + # Call the function resp = req_func(**rq_func_args) + if resp.status_code != 200: + raise Error("Staging operation over HTTP was unsuccessful: {}-{}".format(resp.status_code, resp.text)) + def execute( self, operation: str, parameters: Optional[Dict[str, str]] = None ) -> "Cursor": diff --git a/tests/e2e/driver_tests.py b/tests/e2e/driver_tests.py index c29ebabc..df9a391f 100644 --- a/tests/e2e/driver_tests.py +++ b/tests/e2e/driver_tests.py @@ -5,6 +5,7 @@ import logging import os import sys +import tempfile import threading import time from unittest import loader, skipIf, skipUnless, TestCase @@ -14,6 +15,7 @@ import pyarrow import pytz import thrift +import pytest import databricks.sql as sql from databricks.sql import STRING, BINARY, NUMBER, DATETIME, DATE, DatabaseError, Error, OperationalError @@ -630,6 +632,50 @@ def test_initial_namespace(self): cursor.execute("select current_database()") self.assertEqual(cursor.fetchone()[0], table_name) +class PySQLStagingIngestionTestSuite(PySQLTestCase): + """Simple namespace for ingestion tests. These should be run against DBR >13.x + + In addition to connection credentials (host, path, token) this suite requires an env var + named staging_ingestion_user""" + + staging_ingestion_user = os.getenv("staging_ingestion_user") + + def test_staging_ingestion_put(self): + + fh, temp_path = tempfile.mkstemp() + + with open(fh, 'wb') as fp: + fp.write("hello world!".encode("utf-8")) + + with self.connection() as conn: + cursor = conn.cursor() + query = f"PUT '{temp_path}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE" + cursor.execute(query) + + os.remove(temp_path) + + def test_staging_ingestion_get(self): + + fh, temp_path = tempfile.mkstemp() + + with self.connection() as conn: + cursor = conn.cursor() + query = f"GET 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' TO '{temp_path}'" + with pytest.raises(Error): + cursor.execute(query) + + os.remove(temp_path) + + def test_staging_ingestion_delete(self): + + # Test stub to be completed when we implement DELETE. We need to guarantee this file exists before we attempt to remove it. + + with self.connection() as conn: + cursor = conn.cursor() + query = f"REMOVE 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv''" + with pytest.raises(Error): + cursor.execute(query) + def main(cli_args): global get_args_from_env From 57b8a34eefdb7c7a6833f955c993d0f81589d25b Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Wed, 16 Nov 2022 17:08:47 -0600 Subject: [PATCH 06/41] Bail on tests if staging_ingestion_user is not set Signed-off-by: Jesse Whitehouse --- tests/e2e/driver_tests.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/e2e/driver_tests.py b/tests/e2e/driver_tests.py index df9a391f..b8d435cd 100644 --- a/tests/e2e/driver_tests.py +++ b/tests/e2e/driver_tests.py @@ -640,6 +640,9 @@ class PySQLStagingIngestionTestSuite(PySQLTestCase): staging_ingestion_user = os.getenv("staging_ingestion_user") + if staging_ingestion_user is None: + raise ValueError("To run these tests you must designate a `staging_ingestion_user` environment variable. This will the user associated with the personal access token.") + def test_staging_ingestion_put(self): fh, temp_path = tempfile.mkstemp() From 78122787b365763f445e50afb9cb88012b607b97 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Wed, 16 Nov 2022 17:09:30 -0600 Subject: [PATCH 07/41] Black client.py Signed-off-by: Jesse Whitehouse --- src/databricks/sql/client.py | 36 +++++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 3fa3d60d..70548475 100644 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -299,41 +299,51 @@ def _check_not_closed(self): raise Error("Attempting operation on closed cursor") def _handle_staging_operation(self): - """Make HTTP request using instructions provided by server - """ + """Make HTTP request using instructions provided by server""" row = self.active_result_set.fetchone() # TODO: Handle headers. What format will gateway send? json? plaintext? - operation, presigned_url, local_file, headers = row.operation, row.presignedUrl, row.localFile, None + operation, presigned_url, local_file, headers = ( + row.operation, + row.presignedUrl, + row.localFile, + None, + ) operation_map = { - "PUT" : requests.put, + "PUT": requests.put, } if operation not in operation_map: - raise Error("Operation {} is not supported. Supported operations are {}".format(operation, ",".join(operation_map.keys()))) + raise Error( + "Operation {} is not supported. Supported operations are {}".format( + operation, ",".join(operation_map.keys()) + ) + ) req_func = operation_map[operation] if local_file: - raw_data = open(local_file, 'rb') + raw_data = open(local_file, "rb") else: raw_data = None + rq_func_args = dict(url=presigned_url, data=raw_data) - rq_func_args = dict( - url=presigned_url, - data=raw_data + logger.debug( + "Attempting staging operation: {} - {}".format(operation, local_file) ) - - logger.debug("Attempting staging operation: {} - {}".format(operation, local_file)) - + # Call the function resp = req_func(**rq_func_args) if resp.status_code != 200: - raise Error("Staging operation over HTTP was unsuccessful: {}-{}".format(resp.status_code, resp.text)) + raise Error( + "Staging operation over HTTP was unsuccessful: {}-{}".format( + resp.status_code, resp.text + ) + ) def execute( self, operation: str, parameters: Optional[Dict[str, str]] = None From 6b76439e1dc904d4cf55856a7a04618c66c3297f Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Thu, 17 Nov 2022 09:45:03 -0600 Subject: [PATCH 08/41] Add unit test that sanity checks _handle_staging_operation is called when the server indicates isStagingOperation=True in the ExecuteResponse Signed-off-by: Jesse Whitehouse --- tests/unit/tests.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/unit/tests.py b/tests/unit/tests.py index d5ca2387..b61477a3 100644 --- a/tests/unit/tests.py +++ b/tests/unit/tests.py @@ -534,6 +534,21 @@ def test_cursor_keeps_connection_alive(self, mock_client_class): self.assertEqual(instance.close_session.call_count, 0) cursor.close() + @patch("%s.client.ThriftBackend" % PACKAGE_NAME) + @patch("%s.client.Cursor._handle_staging_operation" % PACKAGE_NAME) + @patch("%s.utils.ExecuteResponse" % PACKAGE_NAME) + def test_staging_operation_response_is_handled(self, mock_client_class, mock_handle_staging_operation, mock_execute_response): + # If server sets ExecuteResponse.is_staging_operation True then _handle_staging_operation should be called + + mock_execute_response.is_staging_operation = True + + connection = databricks.sql.connect(**self.DUMMY_CONNECTION_ARGS) + cursor = connection.cursor() + cursor.execute("Text of some staging operation command;") + connection.close() + + mock_handle_staging_operation.assert_called_once_with() + if __name__ == '__main__': suite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) From 3df7c89bd9c961b537059689dbb7875a3bbe43f3 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Thu, 17 Nov 2022 09:46:50 -0600 Subject: [PATCH 09/41] Fix imports so that this module can be run independently: poetry run python -m pytest tests/unit/tests.py This command failed with only relative inputs. However poetry run python -m pytest tests/unit would succeed Signed-off-by: Jesse Whitehouse --- tests/unit/tests.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/unit/tests.py b/tests/unit/tests.py index b61477a3..74274373 100644 --- a/tests/unit/tests.py +++ b/tests/unit/tests.py @@ -12,9 +12,9 @@ from databricks.sql import InterfaceError, DatabaseError, Error, NotSupportedError from databricks.sql.types import Row -from test_fetches import FetchTests -from test_thrift_backend import ThriftBackendTestSuite -from test_arrow_queue import ArrowQueueSuite +from tests.unit.test_fetches import FetchTests +from tests.unit.test_thrift_backend import ThriftBackendTestSuite +from tests.unit.test_arrow_queue import ArrowQueueSuite class ClientTestSuite(unittest.TestCase): From 8f0a02e1f61458b5efb451067c17f0eb0273e111 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Thu, 17 Nov 2022 10:08:06 -0600 Subject: [PATCH 10/41] Implement GET operation Signed-off-by: Jesse Whitehouse --- src/databricks/sql/client.py | 5 +++++ tests/e2e/driver_tests.py | 24 +++++++++++++++--------- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 70548475..60fdb074 100644 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -313,6 +313,7 @@ def _handle_staging_operation(self): operation_map = { "PUT": requests.put, + "GET": requests.get, } if operation not in operation_map: @@ -345,6 +346,10 @@ def _handle_staging_operation(self): ) ) + if operation == "GET": + with open(local_file, "wb") as fp: + fp.write(resp.content) + def execute( self, operation: str, parameters: Optional[Dict[str, str]] = None ) -> "Cursor": diff --git a/tests/e2e/driver_tests.py b/tests/e2e/driver_tests.py index b8d435cd..5dd0b04a 100644 --- a/tests/e2e/driver_tests.py +++ b/tests/e2e/driver_tests.py @@ -643,31 +643,37 @@ class PySQLStagingIngestionTestSuite(PySQLTestCase): if staging_ingestion_user is None: raise ValueError("To run these tests you must designate a `staging_ingestion_user` environment variable. This will the user associated with the personal access token.") - def test_staging_ingestion_put(self): + def test_staging_ingestion_put_and_get(self): fh, temp_path = tempfile.mkstemp() + original_text = "hello world!".encode("utf-8") + with open(fh, 'wb') as fp: - fp.write("hello world!".encode("utf-8")) + fp.write(original_text) with self.connection() as conn: cursor = conn.cursor() query = f"PUT '{temp_path}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE" cursor.execute(query) - os.remove(temp_path) - - def test_staging_ingestion_get(self): + # TODO: What is the acceptance test for a successful staging operation? + # For now, let's GET the file back and compare it to the original - fh, temp_path = tempfile.mkstemp() + new_fh, new_temp_path = tempfile.mkstemp() with self.connection() as conn: cursor = conn.cursor() - query = f"GET 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' TO '{temp_path}'" - with pytest.raises(Error): - cursor.execute(query) + query = f"GET 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' TO '{new_temp_path}'" + cursor.execute(query) + + with open(new_fh, 'rb') as fp: + fetched_text = fp.read() + assert fetched_text == original_text + os.remove(temp_path) + os.remove(new_temp_path) def test_staging_ingestion_delete(self): From 55525cbf8660c7d47a628a2c072610b45c212ee8 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Wed, 23 Nov 2022 13:22:03 -0600 Subject: [PATCH 11/41] Refactor client.py into distinct methods for each ingestion command type Signed-off-by: Jesse Whitehouse --- src/databricks/sql/client.py | 97 ++++++++++++++++++++++-------------- 1 file changed, 60 insertions(+), 37 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 60fdb074..221f52fd 100644 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -3,6 +3,7 @@ import pandas import pyarrow import requests +import json from databricks.sql import __version__ from databricks.sql import * @@ -299,57 +300,79 @@ def _check_not_closed(self): raise Error("Attempting operation on closed cursor") def _handle_staging_operation(self): - """Make HTTP request using instructions provided by server""" + """Fetch the HTTP request instruction from a staging ingestion command + and call the designated handler.""" row = self.active_result_set.fetchone() - # TODO: Handle headers. What format will gateway send? json? plaintext? - operation, presigned_url, local_file, headers = ( - row.operation, - row.presignedUrl, - row.localFile, - None, - ) - operation_map = { - "PUT": requests.put, - "GET": requests.get, + # TODO: Experiment with DBR sending real headers. + # The specification says headers will be in JSON format but the current null value is actually an empty list [] + handler_args = { + "operation": row.operation, + "presigned_url": row.presignedUrl, + "local_file": row.localFile, + "headers": json.loads(row.headers or "{}") } + + logger.debug(f"Attempting staging operation indicated by server: {row.operation} - {row.localFile}") - if operation not in operation_map: - raise Error( - "Operation {} is not supported. Supported operations are {}".format( - operation, ",".join(operation_map.keys()) - ) - ) + # TODO: Create a retry loop here to re-attempt if the request times out or fails + if row.operation == "GET": + return self._handle_staging_get(**handler_args) + elif row.operation == "PUT": + return self._handle_staging_put(**handler_args) + elif row.operation == "REMOVE": + # Local file isn't needed to remove a remote resource + handler_args.pop("local_file") + return self._handle_staging_remove(**handler_args) + else: + raise Error(f"Operation {row.operation} is not supported. " + + "Supported operations are GET, PUT, and REMOVE") - req_func = operation_map[operation] - if local_file: - raw_data = open(local_file, "rb") - else: - raw_data = None + def _handle_staging_put(self, operation:str, presigned_url:str, local_file:str, headers:dict=None): + """Make an HTTP PUT request - rq_func_args = dict(url=presigned_url, data=raw_data) + Raise an exception if request fails. Returns no data. + """ - logger.debug( - "Attempting staging operation: {} - {}".format(operation, local_file) - ) + if local_file is None: + raise Error("Cannot perform PUT without specifying a local_file") - # Call the function - resp = req_func(**rq_func_args) + with open(local_file, "rb") as fh: + r = requests.put(url=presigned_url, data=fh, headers=headers) - if resp.status_code != 200: - raise Error( - "Staging operation over HTTP was unsuccessful: {}-{}".format( - resp.status_code, resp.text - ) - ) + OK = requests.codes.ok + CREATED = requests.codes.created + + if r.status_code not in [OK, CREATED]: + raise Error(f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}") + + - if operation == "GET": - with open(local_file, "wb") as fp: - fp.write(resp.content) + def _handle_staging_get(self, operation:str, local_file:str, presigned_url:str, headers:dict=None): + """Make an HTTP GET request, create a local file with the received data + + Raise an exception if request fails. Returns no data. + """ + + OK = requests.codes.ok + + with open(local_file, "wb") as fp: + r = requests.get(url=presigned_url, headers=headers) + + if r.status_code != OK: + raise Error(f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}") + + fp.write(r.content) + + + + def _handle_staging_remove(self, operation, presigned_url, local_file, headers=None): + pass + def execute( self, operation: str, parameters: Optional[Dict[str, str]] = None ) -> "Cursor": From 157ac3ddc4fcb130f1a422162de74711f1a2bd79 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Wed, 23 Nov 2022 13:22:15 -0600 Subject: [PATCH 12/41] Update pypoetry so I can develop on Python 3.10 Signed-off-by: Jesse Whitehouse --- poetry.lock | 48 ++++++++++++++++++++++++++++++++++++++++++------ pyproject.toml | 4 ++++ 2 files changed, 46 insertions(+), 6 deletions(-) diff --git a/poetry.lock b/poetry.lock index e0d19799..9fef1e5a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -33,7 +33,7 @@ python-versions = ">=3.5" dev = ["cloudpickle", "coverage[toml] (>=5.0.2)", "furo", "hypothesis", "mypy (>=0.900,!=0.940)", "pre-commit", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "sphinx", "sphinx-notfound-page", "zope.interface"] docs = ["furo", "sphinx", "sphinx-notfound-page", "zope.interface"] tests = ["cloudpickle", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy (>=0.900,!=0.940)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "zope.interface"] -tests_no_zope = ["cloudpickle", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy (>=0.900,!=0.940)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins"] +tests-no-zope = ["cloudpickle", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy (>=0.900,!=0.940)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins"] [[package]] name = "black" @@ -75,7 +75,7 @@ optional = false python-versions = ">=3.6.0" [package.extras] -unicode_backport = ["unicodedata2"] +unicode-backport = ["unicodedata2"] [[package]] name = "click" @@ -151,9 +151,9 @@ python-versions = ">=3.6.1,<4.0" [package.extras] colors = ["colorama (>=0.4.3,<0.5.0)"] -pipfile_deprecated_finder = ["pipreqs", "requirementslib"] +pipfile-deprecated-finder = ["pipreqs", "requirementslib"] plugins = ["setuptools"] -requirements_deprecated_finder = ["pip-api", "pipreqs"] +requirements-deprecated-finder = ["pip-api", "pipreqs"] [[package]] name = "lazy-object-proxy" @@ -219,6 +219,14 @@ category = "main" optional = false python-versions = ">=3.7" +[[package]] +name = "numpy" +version = "1.23.4" +description = "NumPy is the fundamental package for array computing with Python." +category = "main" +optional = false +python-versions = ">=3.8" + [[package]] name = "oauthlib" version = "3.2.0" @@ -407,7 +415,7 @@ urllib3 = ">=1.21.1,<1.27" [package.extras] socks = ["PySocks (>=1.5.6,!=1.5.7)"] -use_chardet_on_py3 = ["chardet (>=3.0.2,<6)"] +use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"] [[package]] name = "setuptools" @@ -506,7 +514,7 @@ testing = ["func-timeout", "jaraco.itertools", "pytest (>=6)", "pytest-black (>= [metadata] lock-version = "1.1" python-versions = "^3.7.1" -content-hash = "5de07f9b2c9a2f80ca0411f0f99b6b529b00b034f2ad13199cf29c862e125a57" +content-hash = "40ffbb9e4aa38da3f1169ab074b63a9e5b45461018c78e9b6d1fa784d2d8c4d1" [metadata.files] astroid = [ @@ -705,6 +713,34 @@ numpy = [ {file = "numpy-1.21.1-cp39-cp39-win_amd64.whl", hash = "sha256:01721eefe70544d548425a07c80be8377096a54118070b8a62476866d5208e33"}, {file = "numpy-1.21.1-pp37-pypy37_pp73-manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:2d4d1de6e6fb3d28781c73fbde702ac97f03d79e4ffd6598b880b2d95d62ead4"}, {file = "numpy-1.21.1.zip", hash = "sha256:dff4af63638afcc57a3dfb9e4b26d434a7a602d225b42d746ea7fe2edf1342fd"}, + {file = "numpy-1.23.4-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:95d79ada05005f6f4f337d3bb9de8a7774f259341c70bc88047a1f7b96a4bcb2"}, + {file = "numpy-1.23.4-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:926db372bc4ac1edf81cfb6c59e2a881606b409ddc0d0920b988174b2e2a767f"}, + {file = "numpy-1.23.4-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c237129f0e732885c9a6076a537e974160482eab8f10db6292e92154d4c67d71"}, + {file = "numpy-1.23.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a8365b942f9c1a7d0f0dc974747d99dd0a0cdfc5949a33119caf05cb314682d3"}, + {file = "numpy-1.23.4-cp310-cp310-win32.whl", hash = "sha256:2341f4ab6dba0834b685cce16dad5f9b6606ea8a00e6da154f5dbded70fdc4dd"}, + {file = "numpy-1.23.4-cp310-cp310-win_amd64.whl", hash = "sha256:d331afac87c92373826af83d2b2b435f57b17a5c74e6268b79355b970626e329"}, + {file = "numpy-1.23.4-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:488a66cb667359534bc70028d653ba1cf307bae88eab5929cd707c761ff037db"}, + {file = "numpy-1.23.4-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:ce03305dd694c4873b9429274fd41fc7eb4e0e4dea07e0af97a933b079a5814f"}, + {file = "numpy-1.23.4-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8981d9b5619569899666170c7c9748920f4a5005bf79c72c07d08c8a035757b0"}, + {file = "numpy-1.23.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7a70a7d3ce4c0e9284e92285cba91a4a3f5214d87ee0e95928f3614a256a1488"}, + {file = "numpy-1.23.4-cp311-cp311-win32.whl", hash = "sha256:5e13030f8793e9ee42f9c7d5777465a560eb78fa7e11b1c053427f2ccab90c79"}, + {file = "numpy-1.23.4-cp311-cp311-win_amd64.whl", hash = "sha256:7607b598217745cc40f751da38ffd03512d33ec06f3523fb0b5f82e09f6f676d"}, + {file = "numpy-1.23.4-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:7ab46e4e7ec63c8a5e6dbf5c1b9e1c92ba23a7ebecc86c336cb7bf3bd2fb10e5"}, + {file = "numpy-1.23.4-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:a8aae2fb3180940011b4862b2dd3756616841c53db9734b27bb93813cd79fce6"}, + {file = "numpy-1.23.4-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8c053d7557a8f022ec823196d242464b6955a7e7e5015b719e76003f63f82d0f"}, + {file = "numpy-1.23.4-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a0882323e0ca4245eb0a3d0a74f88ce581cc33aedcfa396e415e5bba7bf05f68"}, + {file = "numpy-1.23.4-cp38-cp38-win32.whl", hash = "sha256:dada341ebb79619fe00a291185bba370c9803b1e1d7051610e01ed809ef3a4ba"}, + {file = "numpy-1.23.4-cp38-cp38-win_amd64.whl", hash = "sha256:0fe563fc8ed9dc4474cbf70742673fc4391d70f4363f917599a7fa99f042d5a8"}, + {file = "numpy-1.23.4-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:c67b833dbccefe97cdd3f52798d430b9d3430396af7cdb2a0c32954c3ef73894"}, + {file = "numpy-1.23.4-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:f76025acc8e2114bb664294a07ede0727aa75d63a06d2fae96bf29a81747e4a7"}, + {file = "numpy-1.23.4-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:12ac457b63ec8ded85d85c1e17d85efd3c2b0967ca39560b307a35a6703a4735"}, + {file = "numpy-1.23.4-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:95de7dc7dc47a312f6feddd3da2500826defdccbc41608d0031276a24181a2c0"}, + {file = "numpy-1.23.4-cp39-cp39-win32.whl", hash = "sha256:f2f390aa4da44454db40a1f0201401f9036e8d578a25f01a6e237cea238337ef"}, + {file = "numpy-1.23.4-cp39-cp39-win_amd64.whl", hash = "sha256:f260da502d7441a45695199b4e7fd8ca87db659ba1c78f2bbf31f934fe76ae0e"}, + {file = "numpy-1.23.4-pp38-pypy38_pp73-macosx_10_9_x86_64.whl", hash = "sha256:61be02e3bf810b60ab74e81d6d0d36246dbfb644a462458bb53b595791251911"}, + {file = "numpy-1.23.4-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:296d17aed51161dbad3c67ed6d164e51fcd18dbcd5dd4f9d0a9c6055dce30810"}, + {file = "numpy-1.23.4-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:4d52914c88b4930dafb6c48ba5115a96cbab40f45740239d9f4159c4ba779962"}, + {file = "numpy-1.23.4.tar.gz", hash = "sha256:ed2cc92af0efad20198638c69bb0fc2870a58dabfba6eb722c933b48556c686c"}, ] oauthlib = [ {file = "oauthlib-3.2.0-py3-none-any.whl", hash = "sha256:6db33440354787f9b7f3a6dbd4febf5d0f93758354060e802f6c06cb493022fe"}, diff --git a/pyproject.toml b/pyproject.toml index 9bc58959..8ee88ab0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,10 @@ pyarrow = "^9.0.0" lz4 = "^4.0.2" requests=">2.18.1" oauthlib=">=3.1.0" +numpy = [ + {version = "1.21.1", python = ">=3.7,<3.8"}, + {version = "1.23.4", python = ">=3.8"} +] [tool.poetry.dev-dependencies] pytest = "^7.1.2" From 0739ccc9f76e123e40e72eb697c5f868a58ce6b8 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Wed, 23 Nov 2022 13:30:17 -0600 Subject: [PATCH 13/41] Applied PR feedback around explicit response codes. Signed-off-by: Jesse Whitehouse --- src/databricks/sql/client.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 221f52fd..0c866b9c 100644 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -343,12 +343,22 @@ def _handle_staging_put(self, operation:str, presigned_url:str, local_file:str, with open(local_file, "rb") as fh: r = requests.put(url=presigned_url, data=fh, headers=headers) - OK = requests.codes.ok - CREATED = requests.codes.created + # fmt: off + # Design borrowed from: https://stackoverflow.com/a/2342589/5093960 + + OK = requests.codes.ok # 200 + CREATED = requests.codes.created # 201 + ACCEPTED = requests.codes.accepted # 202 + NO_CONTENT = requests.codes.no_content # 204 - if r.status_code not in [OK, CREATED]: + # fmt: on + + if r.status_code not in [OK, CREATED, NO_CONTENT, ACCEPTED]: raise Error(f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}") + if r.status_code == ACCEPTED: + logger.debug(f"Response code {ACCEPTED} from server indicates ingestion command was accepted " + + "but not yet applied on the server. It's possible this command may fail later.") def _handle_staging_get(self, operation:str, local_file:str, presigned_url:str, headers:dict=None): @@ -357,12 +367,12 @@ def _handle_staging_get(self, operation:str, local_file:str, presigned_url:str, Raise an exception if request fails. Returns no data. """ - OK = requests.codes.ok - with open(local_file, "wb") as fp: r = requests.get(url=presigned_url, headers=headers) - if r.status_code != OK: + # response.ok verifies the status code is not between 400-600. + # Any 2xx or 3xx will evaluate r.ok == True + if r.ok: raise Error(f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}") fp.write(r.content) From d3a3651b9a8fdfd5f6cf3a897234f7e335a8d5f0 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Wed, 23 Nov 2022 13:34:28 -0600 Subject: [PATCH 14/41] Applying PR feedback Signed-off-by: Jesse Whitehouse --- src/databricks/sql/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 0c866b9c..52d7381d 100644 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -372,7 +372,7 @@ def _handle_staging_get(self, operation:str, local_file:str, presigned_url:str, # response.ok verifies the status code is not between 400-600. # Any 2xx or 3xx will evaluate r.ok == True - if r.ok: + if not r.ok: raise Error(f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}") fp.write(r.content) From 72f917ee2a0182e1c2a4a35e2774adf197ab94af Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Wed, 23 Nov 2022 13:35:33 -0600 Subject: [PATCH 15/41] PR feedback Signed-off-by: Jesse Whitehouse --- CONTRIBUTING.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index c4c9280f..0ee6d92e 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -127,7 +127,7 @@ The `PySQLCoreTestSuite` namespace contains tests for all of the connector's bas The `PySQLLargeQueriesSuite` namespace contains long-running query tests and is kept separate. In general, if the `PySQLCoreTestSuite` passes then these tests will as well. -The `PySQLStagingIngestionTestSuite` namespace requires a cluster running DBR version > 13.x which supports staging ingestion commands. +The `PySQLStagingIngestionTestSuite` namespace requires a cluster running DBR version > 12.x which supports staging ingestion commands. The suites marked `[not documented]` require additional configuration which will be documented at a later time. ### Code formatting From fba64b7aaee2b72f111498f82691e7cc3a869ca0 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Wed, 23 Nov 2022 13:39:11 -0600 Subject: [PATCH 16/41] Black client.py Signed-off-by: Jesse Whitehouse --- src/databricks/sql/client.py | 56 +++++++++++++++++++++--------------- 1 file changed, 33 insertions(+), 23 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 52d7381d..4d3f4bcb 100644 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -30,7 +30,7 @@ def __init__( session_configuration: Dict[str, Any] = None, catalog: Optional[str] = None, schema: Optional[str] = None, - **kwargs + **kwargs, ) -> None: """ Connect to a Databricks SQL endpoint or a Databricks cluster. @@ -175,7 +175,7 @@ def read(self) -> Optional[OAuthToken]: http_path, (http_headers or []) + base_headers, auth_provider, - **kwargs + **kwargs, ) self._session_handle = self.thrift_backend.open_session( @@ -305,17 +305,18 @@ def _handle_staging_operation(self): row = self.active_result_set.fetchone() - # TODO: Experiment with DBR sending real headers. # The specification says headers will be in JSON format but the current null value is actually an empty list [] handler_args = { "operation": row.operation, "presigned_url": row.presignedUrl, "local_file": row.localFile, - "headers": json.loads(row.headers or "{}") + "headers": json.loads(row.headers or "{}"), } - - logger.debug(f"Attempting staging operation indicated by server: {row.operation} - {row.localFile}") + + logger.debug( + f"Attempting staging operation indicated by server: {row.operation} - {row.localFile}" + ) # TODO: Create a retry loop here to re-attempt if the request times out or fails if row.operation == "GET": @@ -326,12 +327,15 @@ def _handle_staging_operation(self): # Local file isn't needed to remove a remote resource handler_args.pop("local_file") return self._handle_staging_remove(**handler_args) - else: - raise Error(f"Operation {row.operation} is not supported. " + - "Supported operations are GET, PUT, and REMOVE") - + else: + raise Error( + f"Operation {row.operation} is not supported. " + + "Supported operations are GET, PUT, and REMOVE" + ) - def _handle_staging_put(self, operation:str, presigned_url:str, local_file:str, headers:dict=None): + def _handle_staging_put( + self, operation: str, presigned_url: str, local_file: str, headers: dict = None + ): """Make an HTTP PUT request Raise an exception if request fails. Returns no data. @@ -354,14 +358,19 @@ def _handle_staging_put(self, operation:str, presigned_url:str, local_file:str, # fmt: on if r.status_code not in [OK, CREATED, NO_CONTENT, ACCEPTED]: - raise Error(f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}") - + raise Error( + f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}" + ) + if r.status_code == ACCEPTED: - logger.debug(f"Response code {ACCEPTED} from server indicates ingestion command was accepted " + - "but not yet applied on the server. It's possible this command may fail later.") - + logger.debug( + f"Response code {ACCEPTED} from server indicates ingestion command was accepted " + + "but not yet applied on the server. It's possible this command may fail later." + ) - def _handle_staging_get(self, operation:str, local_file:str, presigned_url:str, headers:dict=None): + def _handle_staging_get( + self, operation: str, local_file: str, presigned_url: str, headers: dict = None + ): """Make an HTTP GET request, create a local file with the received data Raise an exception if request fails. Returns no data. @@ -373,16 +382,17 @@ def _handle_staging_get(self, operation:str, local_file:str, presigned_url:str, # response.ok verifies the status code is not between 400-600. # Any 2xx or 3xx will evaluate r.ok == True if not r.ok: - raise Error(f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}") - + raise Error( + f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}" + ) + fp.write(r.content) - - - def _handle_staging_remove(self, operation, presigned_url, local_file, headers=None): + def _handle_staging_remove( + self, operation, presigned_url, local_file, headers=None + ): pass - def execute( self, operation: str, parameters: Optional[Dict[str, str]] = None ) -> "Cursor": From c27a3d6009832928f0b4d3adfd4ff11088ad449c Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Wed, 23 Nov 2022 13:50:49 -0600 Subject: [PATCH 17/41] Refactor e2e test to use a single teste for PUT, GET, and REMOVE Signed-off-by: Jesse Whitehouse --- src/databricks/sql/client.py | 5 +++-- tests/e2e/driver_tests.py | 25 ++++++++++++++----------- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 4d3f4bcb..d0d30a22 100644 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -389,9 +389,10 @@ def _handle_staging_get( fp.write(r.content) def _handle_staging_remove( - self, operation, presigned_url, local_file, headers=None + self, operation: str, presigned_url: str, headers: dict = None ): - pass + """Make an HTTP DELETE request to the presigned_url""" + raise Error("Remove is not yet implemented") def execute( self, operation: str, parameters: Optional[Dict[str, str]] = None diff --git a/tests/e2e/driver_tests.py b/tests/e2e/driver_tests.py index 5dd0b04a..cb64957b 100644 --- a/tests/e2e/driver_tests.py +++ b/tests/e2e/driver_tests.py @@ -643,7 +643,11 @@ class PySQLStagingIngestionTestSuite(PySQLTestCase): if staging_ingestion_user is None: raise ValueError("To run these tests you must designate a `staging_ingestion_user` environment variable. This will the user associated with the personal access token.") - def test_staging_ingestion_put_and_get(self): + def test_staging_ingestion_life_cycle(self): + """PUT a file into the staging location + GET the file from the staging location + REMOVE the file from the staging location + """ fh, temp_path = tempfile.mkstemp() @@ -672,18 +676,17 @@ def test_staging_ingestion_put_and_get(self): assert fetched_text == original_text - os.remove(temp_path) - os.remove(new_temp_path) + remove_query = f"REMOVE 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv''" - def test_staging_ingestion_delete(self): - # Test stub to be completed when we implement DELETE. We need to guarantee this file exists before we attempt to remove it. - - with self.connection() as conn: - cursor = conn.cursor() - query = f"REMOVE 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv''" - with pytest.raises(Error): - cursor.execute(query) + # Should raise an exception because REMOVE is not yet implemented + with pytest.raises(Error): + with self.connection() as conn: + cursor = conn.cursor() + cursor.execute(remove_query) + + os.remove(temp_path) + os.remove(new_temp_path) def main(cli_args): From 19ca706c2ad8ef9f776cfaad02cf31b28250f3c0 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Wed, 23 Nov 2022 13:59:03 -0600 Subject: [PATCH 18/41] Make REMOVE command work Signed-off-by: Jesse Whitehouse --- src/databricks/sql/client.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index d0d30a22..e6577eae 100644 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -310,12 +310,12 @@ def _handle_staging_operation(self): handler_args = { "operation": row.operation, "presigned_url": row.presignedUrl, - "local_file": row.localFile, + "local_file": getattr(row, "localFile", None), "headers": json.loads(row.headers or "{}"), } logger.debug( - f"Attempting staging operation indicated by server: {row.operation} - {row.localFile}" + f"Attempting staging operation indicated by server: {row.operation} - {getattr(row, 'localFile', '')}" ) # TODO: Create a retry loop here to re-attempt if the request times out or fails @@ -392,7 +392,13 @@ def _handle_staging_remove( self, operation: str, presigned_url: str, headers: dict = None ): """Make an HTTP DELETE request to the presigned_url""" - raise Error("Remove is not yet implemented") + + r = requests.delete(url=presigned_url, headers=headers) + + if not r.ok: + raise Error( + f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}" + ) def execute( self, operation: str, parameters: Optional[Dict[str, str]] = None From 0167bd98318409c2519279c5c9eaeeb1d89dfcc3 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Wed, 23 Nov 2022 13:59:16 -0600 Subject: [PATCH 19/41] These methods don't need to know the `operation` Signed-off-by: Jesse Whitehouse --- src/databricks/sql/client.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index e6577eae..ba8d702d 100644 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -334,7 +334,7 @@ def _handle_staging_operation(self): ) def _handle_staging_put( - self, operation: str, presigned_url: str, local_file: str, headers: dict = None + self, presigned_url: str, local_file: str, headers: dict = None ): """Make an HTTP PUT request @@ -369,7 +369,7 @@ def _handle_staging_put( ) def _handle_staging_get( - self, operation: str, local_file: str, presigned_url: str, headers: dict = None + self, local_file: str, presigned_url: str, headers: dict = None ): """Make an HTTP GET request, create a local file with the received data @@ -388,9 +388,7 @@ def _handle_staging_get( fp.write(r.content) - def _handle_staging_remove( - self, operation: str, presigned_url: str, headers: dict = None - ): + def _handle_staging_remove(self, presigned_url: str, headers: dict = None): """Make an HTTP DELETE request to the presigned_url""" r = requests.delete(url=presigned_url, headers=headers) From 85e4d7cae067a035c2d81b280d18b7aff6f580c4 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Wed, 23 Nov 2022 14:03:19 -0600 Subject: [PATCH 20/41] Remove single quote that broke query Signed-off-by: Jesse Whitehouse --- tests/e2e/driver_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/e2e/driver_tests.py b/tests/e2e/driver_tests.py index cb64957b..91f27037 100644 --- a/tests/e2e/driver_tests.py +++ b/tests/e2e/driver_tests.py @@ -676,7 +676,7 @@ def test_staging_ingestion_life_cycle(self): assert fetched_text == original_text - remove_query = f"REMOVE 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv''" + remove_query = f"REMOVE 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv'" # Should raise an exception because REMOVE is not yet implemented From 713002d2b8ea87e051106a27baf5dc6d0ef7d622 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Wed, 23 Nov 2022 14:08:47 -0600 Subject: [PATCH 21/41] Remove unneeded argument Signed-off-by: Jesse Whitehouse --- src/databricks/sql/client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index ba8d702d..aa43db84 100644 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -308,7 +308,6 @@ def _handle_staging_operation(self): # TODO: Experiment with DBR sending real headers. # The specification says headers will be in JSON format but the current null value is actually an empty list [] handler_args = { - "operation": row.operation, "presigned_url": row.presignedUrl, "local_file": getattr(row, "localFile", None), "headers": json.loads(row.headers or "{}"), From fc06ef87e27da9dec6b3ce33c6c576bf7fba59f0 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Wed, 23 Nov 2022 14:09:47 -0600 Subject: [PATCH 22/41] Expect operation to succeed Signed-off-by: Jesse Whitehouse --- tests/e2e/driver_tests.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/e2e/driver_tests.py b/tests/e2e/driver_tests.py index 91f27037..582d17da 100644 --- a/tests/e2e/driver_tests.py +++ b/tests/e2e/driver_tests.py @@ -678,12 +678,9 @@ def test_staging_ingestion_life_cycle(self): remove_query = f"REMOVE 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv'" - - # Should raise an exception because REMOVE is not yet implemented - with pytest.raises(Error): - with self.connection() as conn: - cursor = conn.cursor() - cursor.execute(remove_query) + with self.connection() as conn: + cursor = conn.cursor() + cursor.execute(remove_query) os.remove(temp_path) os.remove(new_temp_path) From cafa17de6af192fbc7ea4858b1500a885b351c9b Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Wed, 23 Nov 2022 14:13:01 -0600 Subject: [PATCH 23/41] Black PySQLStagingIngestionTestSuite only Signed-off-by: Jesse Whitehouse --- tests/e2e/driver_tests.py | 39 +++++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/tests/e2e/driver_tests.py b/tests/e2e/driver_tests.py index 582d17da..1e195cd8 100644 --- a/tests/e2e/driver_tests.py +++ b/tests/e2e/driver_tests.py @@ -634,26 +634,30 @@ def test_initial_namespace(self): class PySQLStagingIngestionTestSuite(PySQLTestCase): """Simple namespace for ingestion tests. These should be run against DBR >13.x - + In addition to connection credentials (host, path, token) this suite requires an env var named staging_ingestion_user""" staging_ingestion_user = os.getenv("staging_ingestion_user") if staging_ingestion_user is None: - raise ValueError("To run these tests you must designate a `staging_ingestion_user` environment variable. This will the user associated with the personal access token.") + raise ValueError( + "To run these tests you must designate a `staging_ingestion_user` environment variable. This will the user associated with the personal access token." + ) def test_staging_ingestion_life_cycle(self): """PUT a file into the staging location - GET the file from the staging location - REMOVE the file from the staging location + GET the file from the staging location + REMOVE the file from the staging location """ - fh, temp_path = tempfile.mkstemp() + # PUT should succeed + + fh, temp_path = tempfile.mkstemp() original_text = "hello world!".encode("utf-8") - with open(fh, 'wb') as fp: + with open(fh, "wb") as fp: fp.write(original_text) with self.connection() as conn: @@ -661,8 +665,7 @@ def test_staging_ingestion_life_cycle(self): query = f"PUT '{temp_path}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE" cursor.execute(query) - # TODO: What is the acceptance test for a successful staging operation? - # For now, let's GET the file back and compare it to the original + # GET should succeed new_fh, new_temp_path = tempfile.mkstemp() @@ -671,17 +674,29 @@ def test_staging_ingestion_life_cycle(self): query = f"GET 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' TO '{new_temp_path}'" cursor.execute(query) - with open(new_fh, 'rb') as fp: + with open(new_fh, "rb") as fp: fetched_text = fp.read() assert fetched_text == original_text - - remove_query = f"REMOVE 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv'" + + # REMOVE should succeed + + remove_query = ( + f"REMOVE 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv'" + ) with self.connection() as conn: cursor = conn.cursor() cursor.execute(remove_query) - + + # GET after REMOVE should fail + + with pytest.raises(Error): + with self.connection() as conn: + cursor = conn.cursor() + query = f"GET 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' TO '{new_temp_path}'" + cursor.execute(query) + os.remove(temp_path) os.remove(new_temp_path) From a508a1c35d955b3ae4e4f92ba28d912981e4a275 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Wed, 23 Nov 2022 14:17:20 -0600 Subject: [PATCH 24/41] Tidy up comments in e2e test Signed-off-by: Jesse Whitehouse --- tests/e2e/driver_tests.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/e2e/driver_tests.py b/tests/e2e/driver_tests.py index 1e195cd8..bab8679f 100644 --- a/tests/e2e/driver_tests.py +++ b/tests/e2e/driver_tests.py @@ -633,7 +633,7 @@ def test_initial_namespace(self): self.assertEqual(cursor.fetchone()[0], table_name) class PySQLStagingIngestionTestSuite(PySQLTestCase): - """Simple namespace for ingestion tests. These should be run against DBR >13.x + """Simple namespace for ingestion tests. These should be run against DBR >12.x In addition to connection credentials (host, path, token) this suite requires an env var named staging_ingestion_user""" @@ -649,6 +649,7 @@ def test_staging_ingestion_life_cycle(self): """PUT a file into the staging location GET the file from the staging location REMOVE the file from the staging location + Try to GET the file again expecting to raise an exception """ # PUT should succeed From ce80df0300f9e6923ac3be1f3d7c10fa0fcf2fc2 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Wed, 23 Nov 2022 14:23:21 -0600 Subject: [PATCH 25/41] Basic e2e test scaffolded in. Currently fails. Signed-off-by: Jesse Whitehouse --- tests/e2e/driver_tests.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/tests/e2e/driver_tests.py b/tests/e2e/driver_tests.py index bab8679f..3c04955d 100644 --- a/tests/e2e/driver_tests.py +++ b/tests/e2e/driver_tests.py @@ -661,7 +661,7 @@ def test_staging_ingestion_life_cycle(self): with open(fh, "wb") as fp: fp.write(original_text) - with self.connection() as conn: + with self.connection(extra_params={"uploads_base_path": temp_path}) as conn: cursor = conn.cursor() query = f"PUT '{temp_path}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE" cursor.execute(query) @@ -702,6 +702,25 @@ def test_staging_ingestion_life_cycle(self): os.remove(new_temp_path) + def test_staging_ingestion_put_fails_without_uploadsbasepath(self): + """PUT operations are not supported unless the connection was built with + a parameter called uploads_base_path + """ + + fh, temp_path = tempfile.mkstemp() + + original_text = "hello world!".encode("utf-8") + + with open(fh, "wb") as fp: + fp.write(original_text) + + with pytest.raises(Error): + with self.connection() as conn: + cursor = conn.cursor() + query = f"PUT '{temp_path}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE" + cursor.execute(query) + + def main(cli_args): global get_args_from_env get_args_from_env = True From 36885a401193464f05e5daaf7a70045c49b205bb Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Wed, 23 Nov 2022 14:31:49 -0600 Subject: [PATCH 26/41] Only allow ingestion commands when base_uploads_path is specified Signed-off-by: Jesse Whitehouse --- src/databricks/sql/client.py | 11 +++++++++-- src/databricks/sql/thrift_backend.py | 2 ++ tests/e2e/driver_tests.py | 26 ++++++++++++-------------- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index aa43db84..909def76 100644 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -299,10 +299,15 @@ def _check_not_closed(self): if not self.open: raise Error("Attempting operation on closed cursor") - def _handle_staging_operation(self): + def _handle_staging_operation(self, uploads_base_path: str): """Fetch the HTTP request instruction from a staging ingestion command and call the designated handler.""" + if uploads_base_path is None: + raise Error( + "You must provide an uploads_base_path when initialising a connection to perform ingestion commands" + ) + row = self.active_result_set.fetchone() # TODO: Experiment with DBR sending real headers. @@ -433,7 +438,9 @@ def execute( ) if execute_response.is_staging_operation: - self._handle_staging_operation() + self._handle_staging_operation( + uploads_base_path=self.thrift_backend.uploads_base_path + ) return self diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index c4b19fa8..5d2468a0 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -61,6 +61,7 @@ def __init__( http_path: str, http_headers, auth_provider: AuthProvider, + uploads_base_path: str = None, **kwargs, ): # Internal arguments in **kwargs: @@ -110,6 +111,7 @@ def __init__( else: raise ValueError("No valid connection settings.") + self.uploads_base_path = uploads_base_path self._initialize_retry_args(kwargs) self._use_arrow_native_complex_types = kwargs.get( "_use_arrow_native_complex_types", True diff --git a/tests/e2e/driver_tests.py b/tests/e2e/driver_tests.py index 3c04955d..5fa70f67 100644 --- a/tests/e2e/driver_tests.py +++ b/tests/e2e/driver_tests.py @@ -662,38 +662,36 @@ def test_staging_ingestion_life_cycle(self): fp.write(original_text) with self.connection(extra_params={"uploads_base_path": temp_path}) as conn: + cursor = conn.cursor() query = f"PUT '{temp_path}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE" cursor.execute(query) - # GET should succeed + # GET should succeed - new_fh, new_temp_path = tempfile.mkstemp() + new_fh, new_temp_path = tempfile.mkstemp() - with self.connection() as conn: cursor = conn.cursor() query = f"GET 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' TO '{new_temp_path}'" cursor.execute(query) - with open(new_fh, "rb") as fp: - fetched_text = fp.read() + with open(new_fh, "rb") as fp: + fetched_text = fp.read() - assert fetched_text == original_text + assert fetched_text == original_text - # REMOVE should succeed + # REMOVE should succeed - remove_query = ( - f"REMOVE 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv'" - ) + remove_query = ( + f"REMOVE 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv'" + ) - with self.connection() as conn: cursor = conn.cursor() cursor.execute(remove_query) - # GET after REMOVE should fail + # GET after REMOVE should fail - with pytest.raises(Error): - with self.connection() as conn: + with pytest.raises(Error): cursor = conn.cursor() query = f"GET 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' TO '{new_temp_path}'" cursor.execute(query) From c0c09d4377ba0a1dcce12c72cc0a8490610ab104 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Wed, 23 Nov 2022 14:48:08 -0600 Subject: [PATCH 27/41] Restrict local file operations to descendents of uploads_base_path Fix lifecycle e2e test so it honours these requirements Signed-off-by: Jesse Whitehouse --- src/databricks/sql/client.py | 13 +++++++++-- tests/e2e/driver_tests.py | 43 +++++++++++++++++++++++++++--------- 2 files changed, 44 insertions(+), 12 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 909def76..c467eee1 100644 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -4,6 +4,7 @@ import pyarrow import requests import json +import os from databricks.sql import __version__ from databricks.sql import * @@ -301,15 +302,23 @@ def _check_not_closed(self): def _handle_staging_operation(self, uploads_base_path: str): """Fetch the HTTP request instruction from a staging ingestion command - and call the designated handler.""" + and call the designated handler. + + Raise an exception if localFile is specified by the server but the localFile + is not descended from uploads_base_path. + """ if uploads_base_path is None: raise Error( "You must provide an uploads_base_path when initialising a connection to perform ingestion commands" ) - + row = self.active_result_set.fetchone() + if getattr(row, "localFile", None): + if os.path.commonpath([row.localFile, uploads_base_path]) != uploads_base_path: + raise Error("Local file operations are restricted to paths within the configured uploads_base_path") + # TODO: Experiment with DBR sending real headers. # The specification says headers will be in JSON format but the current null value is actually an empty list [] handler_args = { diff --git a/tests/e2e/driver_tests.py b/tests/e2e/driver_tests.py index 5fa70f67..3432e730 100644 --- a/tests/e2e/driver_tests.py +++ b/tests/e2e/driver_tests.py @@ -667,29 +667,31 @@ def test_staging_ingestion_life_cycle(self): query = f"PUT '{temp_path}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE" cursor.execute(query) - # GET should succeed + # GET should succeed - new_fh, new_temp_path = tempfile.mkstemp() + new_fh, new_temp_path = tempfile.mkstemp() + with self.connection(extra_params={"uploads_base_path": new_temp_path}) as conn: cursor = conn.cursor() query = f"GET 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' TO '{new_temp_path}'" cursor.execute(query) - with open(new_fh, "rb") as fp: - fetched_text = fp.read() + with open(new_fh, "rb") as fp: + fetched_text = fp.read() - assert fetched_text == original_text + assert fetched_text == original_text - # REMOVE should succeed + # REMOVE should succeed - remove_query = ( - f"REMOVE 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv'" - ) + remove_query = ( + f"REMOVE 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv'" + ) + with self.connection(extra_params={"uploads_base_path": "/"}) as conn: cursor = conn.cursor() cursor.execute(remove_query) - # GET after REMOVE should fail + # GET after REMOVE should fail with pytest.raises(Error): cursor = conn.cursor() @@ -718,6 +720,27 @@ def test_staging_ingestion_put_fails_without_uploadsbasepath(self): query = f"PUT '{temp_path}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE" cursor.execute(query) + def test_staging_ingestion_put_fails_if_localFile_not_in_uploads_base_path(self): + + + fh, temp_path = tempfile.mkstemp() + + original_text = "hello world!".encode("utf-8") + + with open(fh, "wb") as fp: + fp.write(original_text) + + base_path, filename = os.path.split(temp_path) + + # Add junk to base_path + base_path = os.path.join(base_path, "temp") + + with pytest.raises(Error): + with self.connection(extra_params={"uploads_base_path": base_path}) as conn: + cursor = conn.cursor() + query = f"PUT '{temp_path}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE" + cursor.execute(query) + def main(cli_args): global get_args_from_env From f6127950d0aaa68575eb24cb0bcceb066b6830e5 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Tue, 20 Dec 2022 13:42:07 -0600 Subject: [PATCH 28/41] Remove per PR feedback Signed-off-by: Jesse Whitehouse --- src/databricks/sql/thrift_backend.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 5d2468a0..d972f72d 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -735,7 +735,6 @@ def _results_message_to_execute_response(self, resp, operation_state): .to_pybytes() ) lz4_compressed = t_result_set_metadata_resp.lz4Compressed - # TODO: will this fail if metadata doesn't include `isStagingOperation`? is_staging_operation = t_result_set_metadata_resp.isStagingOperation if direct_results and direct_results.resultSet: assert direct_results.resultSet.results.startRowOffset == 0 From e609ef3d843a6474b55162f990b8969d5d5633e6 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Tue, 20 Dec 2022 13:45:52 -0600 Subject: [PATCH 29/41] Add check for null local_file per PR feedback Signed-off-by: Jesse Whitehouse --- src/databricks/sql/client.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index c467eee1..23ef6068 100644 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -389,6 +389,9 @@ def _handle_staging_get( Raise an exception if request fails. Returns no data. """ + if local_file is None: + raise Error("Cannot perform GET without specifying a local_file") + with open(local_file, "wb") as fp: r = requests.get(url=presigned_url, headers=headers) From cdbe2d6b9c560209391aef497ec3798ede8408d3 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Tue, 20 Dec 2022 13:47:16 -0600 Subject: [PATCH 30/41] Open output stream _after_ successful HTTP request Changed per PR feedback Signed-off-by: Jesse Whitehouse --- src/databricks/sql/client.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 23ef6068..df4288f8 100644 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -392,16 +392,16 @@ def _handle_staging_get( if local_file is None: raise Error("Cannot perform GET without specifying a local_file") - with open(local_file, "wb") as fp: - r = requests.get(url=presigned_url, headers=headers) + r = requests.get(url=presigned_url, headers=headers) - # response.ok verifies the status code is not between 400-600. - # Any 2xx or 3xx will evaluate r.ok == True - if not r.ok: - raise Error( - f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}" - ) + # response.ok verifies the status code is not between 400-600. + # Any 2xx or 3xx will evaluate r.ok == True + if not r.ok: + raise Error( + f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}" + ) + with open(local_file, "wb") as fp: fp.write(r.content) def _handle_staging_remove(self, presigned_url: str, headers: dict = None): From 34a0362a81de1c5ceeeaca681d084fb2e9ed5249 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Tue, 20 Dec 2022 15:43:07 -0600 Subject: [PATCH 31/41] Resolve relative paths before comparing row.localFile to uploads_base_path Applies feedback from PR review Signed-off-by: Jesse Whitehouse --- src/databricks/sql/client.py | 11 ++++++++--- tests/e2e/driver_tests.py | 18 ++++++++++++++++++ 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index df4288f8..0ba1c544 100644 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -312,18 +312,23 @@ def _handle_staging_operation(self, uploads_base_path: str): raise Error( "You must provide an uploads_base_path when initialising a connection to perform ingestion commands" ) - + row = self.active_result_set.fetchone() + # Must set to None in cases where server response does not include localFile + abs_localFile = None + if getattr(row, "localFile", None): - if os.path.commonpath([row.localFile, uploads_base_path]) != uploads_base_path: + abs_localFile = os.path.abspath(row.localFile) + abs_uploads_base_path = os.path.abspath(uploads_base_path) + if os.path.commonpath([abs_localFile, abs_uploads_base_path]) != abs_uploads_base_path: raise Error("Local file operations are restricted to paths within the configured uploads_base_path") # TODO: Experiment with DBR sending real headers. # The specification says headers will be in JSON format but the current null value is actually an empty list [] handler_args = { "presigned_url": row.presignedUrl, - "local_file": getattr(row, "localFile", None), + "local_file": abs_localFile, "headers": json.loads(row.headers or "{}"), } diff --git a/tests/e2e/driver_tests.py b/tests/e2e/driver_tests.py index 3432e730..9bdad151 100644 --- a/tests/e2e/driver_tests.py +++ b/tests/e2e/driver_tests.py @@ -741,6 +741,24 @@ def test_staging_ingestion_put_fails_if_localFile_not_in_uploads_base_path(self) query = f"PUT '{temp_path}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE" cursor.execute(query) + def test_staging_ingestion_put_fails_if_absolute_localFile_not_in_uploads_base_path(self): + """ + This test confirms that uploads_base_path and target_file are resolved into absolute paths. + """ + + # If these two paths are not resolved absolutely, they appear to share a common path of /var/www/html + # after resolution their common path is only /var/www which should raise an exception + # Because the common path must always be equal to uploads_base_path + uploads_base_path = "/var/www/html" + target_file = "/var/www/html/../html1/not_allowed.html" + + with pytest.raises(Error): + with self.connection(extra_params={"uploads_base_path": uploads_base_path}) as conn: + cursor = conn.cursor() + query = f"PUT '{target_file}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE" + cursor.execute(query) + + def main(cli_args): global get_args_from_env From c8a64c78d8d9ac0477e2dbf582d03d16239cb7b5 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Tue, 20 Dec 2022 16:03:07 -0600 Subject: [PATCH 32/41] Add test that PUT fails if file exists in staging location and OVERWRITE not set Added following PR feedback Signed-off-by: Jesse Whitehouse --- tests/e2e/driver_tests.py | 40 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/tests/e2e/driver_tests.py b/tests/e2e/driver_tests.py index 9bdad151..76835b7c 100644 --- a/tests/e2e/driver_tests.py +++ b/tests/e2e/driver_tests.py @@ -741,6 +741,46 @@ def test_staging_ingestion_put_fails_if_localFile_not_in_uploads_base_path(self) query = f"PUT '{temp_path}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE" cursor.execute(query) + def test_staging_ingestion_put_fails_if_file_exists_and_overwrite_not_set(self): + """PUT a file into the staging location twice. First command should succeed. Second should fail. + """ + + fh, temp_path = tempfile.mkstemp() + + original_text = "hello world!".encode("utf-8") + + with open(fh, "wb") as fp: + fp.write(original_text) + + def perform_put(): + with self.connection(extra_params={"uploads_base_path": temp_path}) as conn: + cursor = conn.cursor() + query = f"PUT '{temp_path}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/12/15/file1.csv'" + cursor.execute(query) + + def perform_remove(): + remove_query = ( + f"REMOVE 'stage://tmp/{self.staging_ingestion_user}/tmp/12/15/file1.csv'" + ) + + with self.connection(extra_params={"uploads_base_path": "/"}) as conn: + cursor = conn.cursor() + cursor.execute(remove_query) + + + # Make sure file does not exist + perform_remove() + + # Put the file + perform_put() + + # Try to put it again + with pytest.raises(sql.exc.ServerOperationError, match="FILE_IN_STAGING_PATH_ALREADY_EXISTS"): + perform_put() + + # Clean up after ourselves + perform_remove() + def test_staging_ingestion_put_fails_if_absolute_localFile_not_in_uploads_base_path(self): """ This test confirms that uploads_base_path and target_file are resolved into absolute paths. From d48d3f367392fd4cf1f166210c45c609a28d3002 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Tue, 20 Dec 2022 16:06:19 -0600 Subject: [PATCH 33/41] Add tests: operations fail to modify another user's staging location Added following PR feedback Signed-off-by: Jesse Whitehouse --- tests/e2e/driver_tests.py | 46 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/tests/e2e/driver_tests.py b/tests/e2e/driver_tests.py index 76835b7c..3a5701cc 100644 --- a/tests/e2e/driver_tests.py +++ b/tests/e2e/driver_tests.py @@ -781,6 +781,52 @@ def perform_remove(): # Clean up after ourselves perform_remove() + def test_staging_ingestion_fails_to_modify_another_staging_user(self): + """The server should only allow modification of the staging_ingestion_user's files + """ + + some_other_user = "mary.poppins@databricks.com" + + fh, temp_path = tempfile.mkstemp() + + original_text = "hello world!".encode("utf-8") + + with open(fh, "wb") as fp: + fp.write(original_text) + + def perform_put(): + with self.connection(extra_params={"uploads_base_path": temp_path}) as conn: + cursor = conn.cursor() + query = f"PUT '{temp_path}' INTO 'stage://tmp/{some_other_user}/tmp/12/15/file1.csv' OVERWRITE" + cursor.execute(query) + + def perform_remove(): + remove_query = ( + f"REMOVE 'stage://tmp/{some_other_user}/tmp/12/15/file1.csv'" + ) + + with self.connection(extra_params={"uploads_base_path": "/"}) as conn: + cursor = conn.cursor() + cursor.execute(remove_query) + + def perform_get(): + with self.connection(extra_params={"uploads_base_path": temp_path}) as conn: + cursor = conn.cursor() + query = f"GET 'stage://tmp/{some_other_user}/tmp/11/15/file1.csv' TO '{temp_path}'" + cursor.execute(query) + + # PUT should fail with permissions error + with pytest.raises(sql.exc.ServerOperationError, match="PERMISSION_DENIED"): + perform_put() + + # REMOVE should fail with permissions error + with pytest.raises(sql.exc.ServerOperationError, match="PERMISSION_DENIED"): + perform_remove() + + # GET should fail with permissions error + with pytest.raises(sql.exc.ServerOperationError, match="PERMISSION_DENIED"): + perform_get() + def test_staging_ingestion_put_fails_if_absolute_localFile_not_in_uploads_base_path(self): """ This test confirms that uploads_base_path and target_file are resolved into absolute paths. From e0037e0a8a5e5b224f3d031ec6de6a5f330470e8 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Tue, 20 Dec 2022 16:09:04 -0600 Subject: [PATCH 34/41] Add test that ingestion command fails if local file is blank Added after PR feedback Signed-off-by: Jesse Whitehouse --- tests/e2e/driver_tests.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/e2e/driver_tests.py b/tests/e2e/driver_tests.py index 3a5701cc..9022e7e3 100644 --- a/tests/e2e/driver_tests.py +++ b/tests/e2e/driver_tests.py @@ -844,6 +844,16 @@ def test_staging_ingestion_put_fails_if_absolute_localFile_not_in_uploads_base_p query = f"PUT '{target_file}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE" cursor.execute(query) + def test_staging_ingestion_empty_local_path_fails_to_parse_at_server(self): + uploads_base_path = "/var/www/html" + target_file = "" + + with pytest.raises(Error, match="EMPTY_LOCAL_FILE_IN_STAGING_ACCESS_QUERY"): + with self.connection(extra_params={"uploads_base_path": uploads_base_path}) as conn: + cursor = conn.cursor() + query = f"PUT '{target_file}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE" + cursor.execute(query) + def main(cli_args): From 3fa5d84b7e3f4b25e3ce8826e7e2bc0b66e2bdee Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Tue, 20 Dec 2022 16:11:14 -0600 Subject: [PATCH 35/41] Add test that invalid staging path will fail at server Added after PR feedback Signed-off-by: Jesse Whitehouse --- tests/e2e/driver_tests.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/e2e/driver_tests.py b/tests/e2e/driver_tests.py index 9022e7e3..4bff7b5c 100644 --- a/tests/e2e/driver_tests.py +++ b/tests/e2e/driver_tests.py @@ -854,6 +854,16 @@ def test_staging_ingestion_empty_local_path_fails_to_parse_at_server(self): query = f"PUT '{target_file}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE" cursor.execute(query) + def test_staging_ingestion_invalid_staging_path_fails_at_server(self): + uploads_base_path = "/var/www/html" + target_file = "index.html" + + with pytest.raises(Error, match="INVALID_STAGING_PATH_IN_STAGING_ACCESS_QUERY"): + with self.connection(extra_params={"uploads_base_path": uploads_base_path}) as conn: + cursor = conn.cursor() + query = f"PUT '{target_file}' INTO 'stageRANDOMSTRINGOFCHARACTERS://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE" + cursor.execute(query) + def main(cli_args): From 4824b6824480a2fa98f74b5497b01156d25d4a7a Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Thu, 22 Dec 2022 11:18:42 -0600 Subject: [PATCH 36/41] Basic usage example (needs tweaking) Signed-off-by: Jesse Whitehouse --- examples/README.md | 3 ++- examples/staging_ingestion.py | 48 +++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) create mode 100644 examples/staging_ingestion.py diff --git a/examples/README.md b/examples/README.md index 74446ade..c4fe8ad6 100644 --- a/examples/README.md +++ b/examples/README.md @@ -35,4 +35,5 @@ To run all of these examples you can clone the entire repository to your disk. O - **`interactive_oauth.py`** shows the simplest example of authenticating by OAuth (no need for a PAT generated in the DBSQL UI) while Bring Your Own IDP is in public preview. When you run the script it will open a browser window so you can authenticate. Afterward, the script fetches some sample data from Databricks and prints it to the screen. For this script, the OAuth token is not persisted which means you need to authenticate every time you run the script. - **`persistent_oauth.py`** shows a more advanced example of authenticating by OAuth while Bring Your Own IDP is in public preview. In this case, it shows how to use a sublcass of `OAuthPersistence` to reuse an OAuth token across script executions. - **`set_user_agent.py`** shows how to customize the user agent header used for Thrift commands. In -this example the string `ExamplePartnerTag` will be added to the the user agent on every request. \ No newline at end of file +this example the string `ExamplePartnerTag` will be added to the the user agent on every request. +- **`staging_ingestion.py`** shows how the connector handles Databricks' experimental staging ingestion commands `GET`, `PUT`, and `REMOVE`. \ No newline at end of file diff --git a/examples/staging_ingestion.py b/examples/staging_ingestion.py new file mode 100644 index 00000000..4b09225a --- /dev/null +++ b/examples/staging_ingestion.py @@ -0,0 +1,48 @@ +from databricks import sql +import os + + +""" +Databricks experimentally supports data ingestion of local files via a cloud staging location. +Ingestion commands will work on DBR >12. And you must include an uploads_base_path kwarg when +calling sql.connect(). + +Use databricks-sql-connector to PUT files into the staging location where Databricks can access them: + + PUT '/path/to/local/data.csv' INTO 'stage://tmp/some.user@databricks.com/salesdata/september.csv' OVERWRITE + +Files in a staging location can also be retrieved with a GET command + + GET 'stage://tmp/some.user@databricks.com/salesdata/september.csv' TO 'data.csv' + +and deleted with a REMOVE command: + + REMOVE 'stage://tmp/some.user@databricks.com/salesdata/september.csv' + +Ingestion queries are passed to cursor.execute() like any other query. For GET and PUT commands, a local file +will be read or written. For security, this local file must be contained within, or descended from, the +uploads_base_path of the connection. + +Additionally, the connection can only manipulate files within the cloud storage location of the authenticated user. + +To run this script: + +1. Set the INGESTION_USER constant to the account email address of the authenticated user +2. Set the FILEPATH constant to the path of a file that will be uploaded +""" + +INGESTION_USER = "user.name@example.com" +FILEPATH = "example.csv" + +_complete_path = os.path.realpath(FILEPATH) +uploads_base_path = os.path.split(_complete_path)[:-1] + + +with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"), + http_path = os.getenv("DATABRICKS_HTTP_PATH"), + access_token = os.getenv("DATABRICKS_TOKEN"), + uploads_base_path = uploads_base_path) as connection: + + with connection.cursor() as cursor: + query = f"PUT '{_complete_path}' INTO 'stage://tmp/{INGESTION_USER}/pysql_examples/demo.csv' OVERWRITE" + cursor.execute(query) \ No newline at end of file From 469f35fe3906ea9943f4397bd73feb991342e904 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Thu, 22 Dec 2022 15:36:40 -0600 Subject: [PATCH 37/41] Add samples of GET and REMOVE Signed-off-by: Jesse Whitehouse --- examples/staging_ingestion.py | 59 ++++++++++++++++++++++++++++------- 1 file changed, 48 insertions(+), 11 deletions(-) diff --git a/examples/staging_ingestion.py b/examples/staging_ingestion.py index 4b09225a..76d9485b 100644 --- a/examples/staging_ingestion.py +++ b/examples/staging_ingestion.py @@ -1,7 +1,6 @@ from databricks import sql import os - """ Databricks experimentally supports data ingestion of local files via a cloud staging location. Ingestion commands will work on DBR >12. And you must include an uploads_base_path kwarg when @@ -28,21 +27,59 @@ To run this script: 1. Set the INGESTION_USER constant to the account email address of the authenticated user -2. Set the FILEPATH constant to the path of a file that will be uploaded +2. Set the FILEPATH constant to the path of a file that will be uploaded (this example assumes its a CSV file) +3. Run this file """ -INGESTION_USER = "user.name@example.com" +INGESTION_USER = "some.user@example.com" FILEPATH = "example.csv" +# FILEPATH can be relative to the current directory. +# Resolve it into an absolute path _complete_path = os.path.realpath(FILEPATH) -uploads_base_path = os.path.split(_complete_path)[:-1] +if not os.path.exists(_complete_path): + + # It's easiest to save a file in the same directory as this script. But any path to a file will work. + raise Exception( + "You need to set FILEPATH in this script to a file that actually exists." + ) + +# Set uploads_base_path equal to the directory that contains FILEPATH +uploads_base_path = os.path.split(_complete_path)[0] + +with sql.connect( + server_hostname=os.getenv("DATABRICKS_SERVER_HOSTNAME"), + http_path=os.getenv("DATABRICKS_HTTP_PATH"), + access_token=os.getenv("DATABRICKS_TOKEN"), + uploads_base_path=uploads_base_path, +) as connection: + + with connection.cursor() as cursor: + + # Ingestion commands are executed like any other SQL. + # Here's a sample PUT query. You can remove OVERWRITE at the end to avoid silently overwriting data. + query = f"PUT '{_complete_path}' INTO 'stage://tmp/{INGESTION_USER}/pysql_examples/demo.csv' OVERWRITE" + + print(f"Uploading {FILEPATH} to staging location") + cursor.execute(query) + print("Upload was successful") + + temp_fp = os.path.realpath("temp.csv") + + # Here's a sample GET query. Note that `temp_fp` must also be contained within, or descended from, + # the uploads_base_path. + query = ( + f"GET 'stage://tmp/{INGESTION_USER}/pysql_examples/demo.csv' TO '{temp_fp}'" + ) + + print(f"Fetching from staging location into new file called temp.csv") + cursor.execute(query) + print("Download was successful") -with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"), - http_path = os.getenv("DATABRICKS_HTTP_PATH"), - access_token = os.getenv("DATABRICKS_TOKEN"), - uploads_base_path = uploads_base_path) as connection: + # Here's a sample REMOVE query. It cleans up the the demo.csv created in our first query + query = f"REMOVE 'stage://tmp/{INGESTION_USER}/pysql_examples/demo.csv'" - with connection.cursor() as cursor: - query = f"PUT '{_complete_path}' INTO 'stage://tmp/{INGESTION_USER}/pysql_examples/demo.csv' OVERWRITE" - cursor.execute(query) \ No newline at end of file + print("Removing demo.csv from staging location") + cursor.execute(query) + print("Remove was successful") From bdb948ad4e5fc3a77255616e03a69d36056ee5fd Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Wed, 28 Dec 2022 11:33:05 -0600 Subject: [PATCH 38/41] Refactor to allow uploads_base_path to be either a single string object or a list of strings. Signed-off-by: Jesse Whitehouse --- examples/staging_ingestion.py | 4 ++- src/databricks/sql/client.py | 32 ++++++++++++++++++----- src/databricks/sql/thrift_backend.py | 3 ++- tests/e2e/driver_tests.py | 39 ++++++++++++++++++++++++++++ 4 files changed, 69 insertions(+), 9 deletions(-) diff --git a/examples/staging_ingestion.py b/examples/staging_ingestion.py index 76d9485b..e164df33 100644 --- a/examples/staging_ingestion.py +++ b/examples/staging_ingestion.py @@ -19,7 +19,7 @@ REMOVE 'stage://tmp/some.user@databricks.com/salesdata/september.csv' Ingestion queries are passed to cursor.execute() like any other query. For GET and PUT commands, a local file -will be read or written. For security, this local file must be contained within, or descended from, the +will be read or written. For security, this local file must be contained within, or descended from, an uploads_base_path of the connection. Additionally, the connection can only manipulate files within the cloud storage location of the authenticated user. @@ -29,6 +29,8 @@ 1. Set the INGESTION_USER constant to the account email address of the authenticated user 2. Set the FILEPATH constant to the path of a file that will be uploaded (this example assumes its a CSV file) 3. Run this file + +Note: uploads_base_path can be either a Pathlike object or a list of Pathlike objects. """ INGESTION_USER = "some.user@example.com" diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 0ba1c544..9d758993 100644 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -300,29 +300,47 @@ def _check_not_closed(self): if not self.open: raise Error("Attempting operation on closed cursor") - def _handle_staging_operation(self, uploads_base_path: str): + def _handle_staging_operation(self, uploads_base_path: Union[List, str]): """Fetch the HTTP request instruction from a staging ingestion command and call the designated handler. - + Raise an exception if localFile is specified by the server but the localFile is not descended from uploads_base_path. """ - if uploads_base_path is None: + if isinstance(uploads_base_path, type(str())): + _uploads_base_paths = [uploads_base_path] + elif isinstance(uploads_base_path, type(list())): + _uploads_base_paths = uploads_base_path + else: raise Error( - "You must provide an uploads_base_path when initialising a connection to perform ingestion commands" + "You must provide at least one uploads_base_path when initialising a connection to perform ingestion commands" ) + abs_uploads_base_paths = [os.path.abspath(i) for i in _uploads_base_paths] + row = self.active_result_set.fetchone() # Must set to None in cases where server response does not include localFile abs_localFile = None + # Default to not allow staging operations + allow_operation = False if getattr(row, "localFile", None): abs_localFile = os.path.abspath(row.localFile) - abs_uploads_base_path = os.path.abspath(uploads_base_path) - if os.path.commonpath([abs_localFile, abs_uploads_base_path]) != abs_uploads_base_path: - raise Error("Local file operations are restricted to paths within the configured uploads_base_path") + for abs_uploads_base_path in abs_uploads_base_paths: + # If the indicated local file matches at least one allowed base path, allow the operation + if ( + os.path.commonpath([abs_localFile, abs_uploads_base_path]) + == abs_uploads_base_path + ): + allow_operation = True + else: + continue + if not allow_operation: + raise Error( + "Local file operations are restricted to paths within the configured uploads_base_path" + ) # TODO: Experiment with DBR sending real headers. # The specification says headers will be in JSON format but the current null value is actually an empty list [] diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index d972f72d..6a03e780 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -6,6 +6,7 @@ import threading import lz4.frame from ssl import CERT_NONE, CERT_REQUIRED, create_default_context +from typing import List, Union import pyarrow import thrift.transport.THttpClient @@ -61,7 +62,7 @@ def __init__( http_path: str, http_headers, auth_provider: AuthProvider, - uploads_base_path: str = None, + uploads_base_path: Union[str, List] = None, **kwargs, ): # Internal arguments in **kwargs: diff --git a/tests/e2e/driver_tests.py b/tests/e2e/driver_tests.py index 4bff7b5c..7ff9027d 100644 --- a/tests/e2e/driver_tests.py +++ b/tests/e2e/driver_tests.py @@ -864,6 +864,45 @@ def test_staging_ingestion_invalid_staging_path_fails_at_server(self): query = f"PUT '{target_file}' INTO 'stageRANDOMSTRINGOFCHARACTERS://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE" cursor.execute(query) + def test_staging_ingestion_supports_multiple_uploadsbasepath_values(self): + """uploads_base_path may be either a path-like object or a list of path-like objects. + + This test confirms that two configured base paths: + 1 - doesn't raise an exception + 2 - allows uploads from both paths + 3 - doesn't allow uploads from a third path + """ + + def generate_file_and_path_and_queries(): + """ + 1. Makes a temp file with some contents. + 2. Write a query to PUT it into a staging location + 3. Write a query to REMOVE it from that location (for cleanup) + """ + fh, temp_path = tempfile.mkstemp() + with open(fh, "wb") as fp: + original_text = "hello world!".encode("utf-8") + fp.write(original_text) + put_query = f"PUT '{temp_path}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/{id(temp_path)}.csv' OVERWRITE" + remove_query = f"REMOVE 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/{id(temp_path)}.csv'" + return fh, temp_path, put_query, remove_query + + fh1, temp_path1, put_query1, remove_query1 = generate_file_and_path_and_queries() + fh2, temp_path2, put_query2, remove_query2 = generate_file_and_path_and_queries() + fh3, temp_path3, put_query3, remove_query3 = generate_file_and_path_and_queries() + + with self.connection(extra_params={"uploads_base_path": [temp_path1, temp_path2]}) as conn: + cursor = conn.cursor() + + cursor.execute(put_query1) + cursor.execute(put_query2) + + with pytest.raises(Error, match="Local file operations are restricted to paths within the configured uploads_base_path"): + cursor.execute(put_query3) + + # Then clean up the files we made + cursor.execute(remove_query1) + cursor.execute(remove_query2) def main(cli_args): From 0261b7a172f9d94d26d9ea1f52d6179f4c995d84 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Thu, 29 Dec 2022 13:44:33 -0600 Subject: [PATCH 39/41] Refactor uploads_base_path to staging_allowed_local_path Signed-off-by: Jesse Whitehouse --- examples/staging_ingestion.py | 16 ++++----- src/databricks/sql/client.py | 26 +++++++-------- src/databricks/sql/thrift_backend.py | 4 +-- tests/e2e/driver_tests.py | 50 ++++++++++++++-------------- 4 files changed, 48 insertions(+), 48 deletions(-) diff --git a/examples/staging_ingestion.py b/examples/staging_ingestion.py index e164df33..2980506d 100644 --- a/examples/staging_ingestion.py +++ b/examples/staging_ingestion.py @@ -3,7 +3,7 @@ """ Databricks experimentally supports data ingestion of local files via a cloud staging location. -Ingestion commands will work on DBR >12. And you must include an uploads_base_path kwarg when +Ingestion commands will work on DBR >12. And you must include a staging_allowed_local_path kwarg when calling sql.connect(). Use databricks-sql-connector to PUT files into the staging location where Databricks can access them: @@ -19,8 +19,8 @@ REMOVE 'stage://tmp/some.user@databricks.com/salesdata/september.csv' Ingestion queries are passed to cursor.execute() like any other query. For GET and PUT commands, a local file -will be read or written. For security, this local file must be contained within, or descended from, an -uploads_base_path of the connection. +will be read or written. For security, this local file must be contained within, or descended from, a +staging_allowed_local_path of the connection. Additionally, the connection can only manipulate files within the cloud storage location of the authenticated user. @@ -30,7 +30,7 @@ 2. Set the FILEPATH constant to the path of a file that will be uploaded (this example assumes its a CSV file) 3. Run this file -Note: uploads_base_path can be either a Pathlike object or a list of Pathlike objects. +Note: staging_allowed_local_path can be either a Pathlike object or a list of Pathlike objects. """ INGESTION_USER = "some.user@example.com" @@ -47,14 +47,14 @@ "You need to set FILEPATH in this script to a file that actually exists." ) -# Set uploads_base_path equal to the directory that contains FILEPATH -uploads_base_path = os.path.split(_complete_path)[0] +# Set staging_allowed_local_path equal to the directory that contains FILEPATH +staging_allowed_local_path = os.path.split(_complete_path)[0] with sql.connect( server_hostname=os.getenv("DATABRICKS_SERVER_HOSTNAME"), http_path=os.getenv("DATABRICKS_HTTP_PATH"), access_token=os.getenv("DATABRICKS_TOKEN"), - uploads_base_path=uploads_base_path, + staging_allowed_local_path=staging_allowed_local_path, ) as connection: with connection.cursor() as cursor: @@ -70,7 +70,7 @@ temp_fp = os.path.realpath("temp.csv") # Here's a sample GET query. Note that `temp_fp` must also be contained within, or descended from, - # the uploads_base_path. + # the staging_allowed_local_path. query = ( f"GET 'stage://tmp/{INGESTION_USER}/pysql_examples/demo.csv' TO '{temp_fp}'" ) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 9d758993..e4116e6a 100644 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -300,24 +300,24 @@ def _check_not_closed(self): if not self.open: raise Error("Attempting operation on closed cursor") - def _handle_staging_operation(self, uploads_base_path: Union[List, str]): + def _handle_staging_operation(self, staging_allowed_local_path: Union[List, str]): """Fetch the HTTP request instruction from a staging ingestion command and call the designated handler. Raise an exception if localFile is specified by the server but the localFile - is not descended from uploads_base_path. + is not descended from staging_allowed_local_path. """ - if isinstance(uploads_base_path, type(str())): - _uploads_base_paths = [uploads_base_path] - elif isinstance(uploads_base_path, type(list())): - _uploads_base_paths = uploads_base_path + if isinstance(staging_allowed_local_path, type(str())): + _staging_allowed_local_paths = [staging_allowed_local_path] + elif isinstance(staging_allowed_local_path, type(list())): + _staging_allowed_local_paths = staging_allowed_local_path else: raise Error( - "You must provide at least one uploads_base_path when initialising a connection to perform ingestion commands" + "You must provide at least one staging_allowed_local_path when initialising a connection to perform ingestion commands" ) - abs_uploads_base_paths = [os.path.abspath(i) for i in _uploads_base_paths] + abs_staging_allowed_local_paths = [os.path.abspath(i) for i in _staging_allowed_local_paths] row = self.active_result_set.fetchone() @@ -328,18 +328,18 @@ def _handle_staging_operation(self, uploads_base_path: Union[List, str]): allow_operation = False if getattr(row, "localFile", None): abs_localFile = os.path.abspath(row.localFile) - for abs_uploads_base_path in abs_uploads_base_paths: + for abs_staging_allowed_local_path in abs_staging_allowed_local_paths: # If the indicated local file matches at least one allowed base path, allow the operation if ( - os.path.commonpath([abs_localFile, abs_uploads_base_path]) - == abs_uploads_base_path + os.path.commonpath([abs_localFile, abs_staging_allowed_local_path]) + == abs_staging_allowed_local_path ): allow_operation = True else: continue if not allow_operation: raise Error( - "Local file operations are restricted to paths within the configured uploads_base_path" + "Local file operations are restricted to paths within the configured staging_allowed_local_path" ) # TODO: Experiment with DBR sending real headers. @@ -474,7 +474,7 @@ def execute( if execute_response.is_staging_operation: self._handle_staging_operation( - uploads_base_path=self.thrift_backend.uploads_base_path + staging_allowed_local_path=self.thrift_backend.staging_allowed_local_path ) return self diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 6a03e780..3fda75b2 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -62,7 +62,7 @@ def __init__( http_path: str, http_headers, auth_provider: AuthProvider, - uploads_base_path: Union[str, List] = None, + staging_allowed_local_path: Union[str, List] = None, **kwargs, ): # Internal arguments in **kwargs: @@ -112,7 +112,7 @@ def __init__( else: raise ValueError("No valid connection settings.") - self.uploads_base_path = uploads_base_path + self.staging_allowed_local_path = staging_allowed_local_path self._initialize_retry_args(kwargs) self._use_arrow_native_complex_types = kwargs.get( "_use_arrow_native_complex_types", True diff --git a/tests/e2e/driver_tests.py b/tests/e2e/driver_tests.py index 7ff9027d..1c09d70e 100644 --- a/tests/e2e/driver_tests.py +++ b/tests/e2e/driver_tests.py @@ -661,7 +661,7 @@ def test_staging_ingestion_life_cycle(self): with open(fh, "wb") as fp: fp.write(original_text) - with self.connection(extra_params={"uploads_base_path": temp_path}) as conn: + with self.connection(extra_params={"staging_allowed_local_path": temp_path}) as conn: cursor = conn.cursor() query = f"PUT '{temp_path}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE" @@ -671,7 +671,7 @@ def test_staging_ingestion_life_cycle(self): new_fh, new_temp_path = tempfile.mkstemp() - with self.connection(extra_params={"uploads_base_path": new_temp_path}) as conn: + with self.connection(extra_params={"staging_allowed_local_path": new_temp_path}) as conn: cursor = conn.cursor() query = f"GET 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' TO '{new_temp_path}'" cursor.execute(query) @@ -687,7 +687,7 @@ def test_staging_ingestion_life_cycle(self): f"REMOVE 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv'" ) - with self.connection(extra_params={"uploads_base_path": "/"}) as conn: + with self.connection(extra_params={"staging_allowed_local_path": "/"}) as conn: cursor = conn.cursor() cursor.execute(remove_query) @@ -702,9 +702,9 @@ def test_staging_ingestion_life_cycle(self): os.remove(new_temp_path) - def test_staging_ingestion_put_fails_without_uploadsbasepath(self): + def test_staging_ingestion_put_fails_without_staging_allowed_local_path(self): """PUT operations are not supported unless the connection was built with - a parameter called uploads_base_path + a parameter called staging_allowed_local_path """ fh, temp_path = tempfile.mkstemp() @@ -720,7 +720,7 @@ def test_staging_ingestion_put_fails_without_uploadsbasepath(self): query = f"PUT '{temp_path}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE" cursor.execute(query) - def test_staging_ingestion_put_fails_if_localFile_not_in_uploads_base_path(self): + def test_staging_ingestion_put_fails_if_localFile_not_in_staging_allowed_local_path(self): fh, temp_path = tempfile.mkstemp() @@ -736,7 +736,7 @@ def test_staging_ingestion_put_fails_if_localFile_not_in_uploads_base_path(self) base_path = os.path.join(base_path, "temp") with pytest.raises(Error): - with self.connection(extra_params={"uploads_base_path": base_path}) as conn: + with self.connection(extra_params={"staging_allowed_local_path": base_path}) as conn: cursor = conn.cursor() query = f"PUT '{temp_path}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE" cursor.execute(query) @@ -753,7 +753,7 @@ def test_staging_ingestion_put_fails_if_file_exists_and_overwrite_not_set(self): fp.write(original_text) def perform_put(): - with self.connection(extra_params={"uploads_base_path": temp_path}) as conn: + with self.connection(extra_params={"staging_allowed_local_path": temp_path}) as conn: cursor = conn.cursor() query = f"PUT '{temp_path}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/12/15/file1.csv'" cursor.execute(query) @@ -763,7 +763,7 @@ def perform_remove(): f"REMOVE 'stage://tmp/{self.staging_ingestion_user}/tmp/12/15/file1.csv'" ) - with self.connection(extra_params={"uploads_base_path": "/"}) as conn: + with self.connection(extra_params={"staging_allowed_local_path": "/"}) as conn: cursor = conn.cursor() cursor.execute(remove_query) @@ -795,7 +795,7 @@ def test_staging_ingestion_fails_to_modify_another_staging_user(self): fp.write(original_text) def perform_put(): - with self.connection(extra_params={"uploads_base_path": temp_path}) as conn: + with self.connection(extra_params={"staging_allowed_local_path": temp_path}) as conn: cursor = conn.cursor() query = f"PUT '{temp_path}' INTO 'stage://tmp/{some_other_user}/tmp/12/15/file1.csv' OVERWRITE" cursor.execute(query) @@ -805,12 +805,12 @@ def perform_remove(): f"REMOVE 'stage://tmp/{some_other_user}/tmp/12/15/file1.csv'" ) - with self.connection(extra_params={"uploads_base_path": "/"}) as conn: + with self.connection(extra_params={"staging_allowed_local_path": "/"}) as conn: cursor = conn.cursor() cursor.execute(remove_query) def perform_get(): - with self.connection(extra_params={"uploads_base_path": temp_path}) as conn: + with self.connection(extra_params={"staging_allowed_local_path": temp_path}) as conn: cursor = conn.cursor() query = f"GET 'stage://tmp/{some_other_user}/tmp/11/15/file1.csv' TO '{temp_path}'" cursor.execute(query) @@ -827,45 +827,45 @@ def perform_get(): with pytest.raises(sql.exc.ServerOperationError, match="PERMISSION_DENIED"): perform_get() - def test_staging_ingestion_put_fails_if_absolute_localFile_not_in_uploads_base_path(self): + def test_staging_ingestion_put_fails_if_absolute_localFile_not_in_staging_allowed_local_path(self): """ - This test confirms that uploads_base_path and target_file are resolved into absolute paths. + This test confirms that staging_allowed_local_path and target_file are resolved into absolute paths. """ # If these two paths are not resolved absolutely, they appear to share a common path of /var/www/html # after resolution their common path is only /var/www which should raise an exception - # Because the common path must always be equal to uploads_base_path - uploads_base_path = "/var/www/html" + # Because the common path must always be equal to staging_allowed_local_path + staging_allowed_local_path = "/var/www/html" target_file = "/var/www/html/../html1/not_allowed.html" with pytest.raises(Error): - with self.connection(extra_params={"uploads_base_path": uploads_base_path}) as conn: + with self.connection(extra_params={"staging_allowed_local_path": staging_allowed_local_path}) as conn: cursor = conn.cursor() query = f"PUT '{target_file}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE" cursor.execute(query) def test_staging_ingestion_empty_local_path_fails_to_parse_at_server(self): - uploads_base_path = "/var/www/html" + staging_allowed_local_path = "/var/www/html" target_file = "" with pytest.raises(Error, match="EMPTY_LOCAL_FILE_IN_STAGING_ACCESS_QUERY"): - with self.connection(extra_params={"uploads_base_path": uploads_base_path}) as conn: + with self.connection(extra_params={"staging_allowed_local_path": staging_allowed_local_path}) as conn: cursor = conn.cursor() query = f"PUT '{target_file}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE" cursor.execute(query) def test_staging_ingestion_invalid_staging_path_fails_at_server(self): - uploads_base_path = "/var/www/html" + staging_allowed_local_path = "/var/www/html" target_file = "index.html" with pytest.raises(Error, match="INVALID_STAGING_PATH_IN_STAGING_ACCESS_QUERY"): - with self.connection(extra_params={"uploads_base_path": uploads_base_path}) as conn: + with self.connection(extra_params={"staging_allowed_local_path": staging_allowed_local_path}) as conn: cursor = conn.cursor() query = f"PUT '{target_file}' INTO 'stageRANDOMSTRINGOFCHARACTERS://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE" cursor.execute(query) - def test_staging_ingestion_supports_multiple_uploadsbasepath_values(self): - """uploads_base_path may be either a path-like object or a list of path-like objects. + def test_staging_ingestion_supports_multiple_staging_allowed_local_path_values(self): + """staging_allowed_local_path may be either a path-like object or a list of path-like objects. This test confirms that two configured base paths: 1 - doesn't raise an exception @@ -891,13 +891,13 @@ def generate_file_and_path_and_queries(): fh2, temp_path2, put_query2, remove_query2 = generate_file_and_path_and_queries() fh3, temp_path3, put_query3, remove_query3 = generate_file_and_path_and_queries() - with self.connection(extra_params={"uploads_base_path": [temp_path1, temp_path2]}) as conn: + with self.connection(extra_params={"staging_allowed_local_path": [temp_path1, temp_path2]}) as conn: cursor = conn.cursor() cursor.execute(put_query1) cursor.execute(put_query2) - with pytest.raises(Error, match="Local file operations are restricted to paths within the configured uploads_base_path"): + with pytest.raises(Error, match="Local file operations are restricted to paths within the configured staging_allowed_local_path"): cursor.execute(put_query3) # Then clean up the files we made From 00d8a494f911cb62b785e90114c1d2f41c2f8a02 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Thu, 29 Dec 2022 23:28:22 -0600 Subject: [PATCH 40/41] Fix mypy static type failures Signed-off-by: Jesse Whitehouse --- src/databricks/sql/client.py | 4 +++- src/databricks/sql/thrift_backend.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index e4116e6a..59e7e368 100644 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -300,7 +300,7 @@ def _check_not_closed(self): if not self.open: raise Error("Attempting operation on closed cursor") - def _handle_staging_operation(self, staging_allowed_local_path: Union[List, str]): + def _handle_staging_operation(self, staging_allowed_local_path: Union[None, str, List[str]]): """Fetch the HTTP request instruction from a staging ingestion command and call the designated handler. @@ -319,7 +319,9 @@ def _handle_staging_operation(self, staging_allowed_local_path: Union[List, str] abs_staging_allowed_local_paths = [os.path.abspath(i) for i in _staging_allowed_local_paths] + assert self.active_result_set is not None row = self.active_result_set.fetchone() + assert row is not None # Must set to None in cases where server response does not include localFile abs_localFile = None diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 3fda75b2..de505a8a 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -62,7 +62,7 @@ def __init__( http_path: str, http_headers, auth_provider: AuthProvider, - staging_allowed_local_path: Union[str, List] = None, + staging_allowed_local_path: Union[None, str, List[str]] = None, **kwargs, ): # Internal arguments in **kwargs: From 7a602e604da5308d2d543b548680e8d5ef2f05eb Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Thu, 29 Dec 2022 23:40:34 -0600 Subject: [PATCH 41/41] Black src files Signed-off-by: Jesse Whitehouse --- src/databricks/sql/client.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 59e7e368..863a6749 100644 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -300,7 +300,9 @@ def _check_not_closed(self): if not self.open: raise Error("Attempting operation on closed cursor") - def _handle_staging_operation(self, staging_allowed_local_path: Union[None, str, List[str]]): + def _handle_staging_operation( + self, staging_allowed_local_path: Union[None, str, List[str]] + ): """Fetch the HTTP request instruction from a staging ingestion command and call the designated handler. @@ -317,7 +319,9 @@ def _handle_staging_operation(self, staging_allowed_local_path: Union[None, str, "You must provide at least one staging_allowed_local_path when initialising a connection to perform ingestion commands" ) - abs_staging_allowed_local_paths = [os.path.abspath(i) for i in _staging_allowed_local_paths] + abs_staging_allowed_local_paths = [ + os.path.abspath(i) for i in _staging_allowed_local_paths + ] assert self.active_result_set is not None row = self.active_result_set.fetchone()