Skip to content

Merge staging ingestion into main #78

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ export access_token=""
There are several e2e test suites available:
- `PySQLCoreTestSuite`
- `PySQLLargeQueriesSuite`
- `PySQLStagingIngestionTestSuite`
- `PySQLRetryTestSuite.HTTP503Suite` **[not documented]**
- `PySQLRetryTestSuite.HTTP429Suite` **[not documented]**
- `PySQLUnityCatalogTestSuite` **[not documented]**
Expand All @@ -122,6 +123,12 @@ To execute the core test suite:
poetry run python -m pytest tests/e2e/driver_tests.py::PySQLCoreTestSuite
```

The `PySQLCoreTestSuite` namespace contains tests for all of the connector's basic features and behaviours. This is the default namespace where tests should be written unless they require specially configured clusters or take an especially long-time to execute by design.

The `PySQLLargeQueriesSuite` namespace contains long-running query tests and is kept separate. In general, if the `PySQLCoreTestSuite` passes then these tests will as well.

The `PySQLStagingIngestionTestSuite` namespace requires a cluster running DBR version > 12.x which supports staging ingestion commands.

The suites marked `[not documented]` require additional configuration which will be documented at a later time.
### Code formatting

Expand Down
3 changes: 2 additions & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ To run all of these examples you can clone the entire repository to your disk. O
- **`interactive_oauth.py`** shows the simplest example of authenticating by OAuth (no need for a PAT generated in the DBSQL UI) while Bring Your Own IDP is in public preview. When you run the script it will open a browser window so you can authenticate. Afterward, the script fetches some sample data from Databricks and prints it to the screen. For this script, the OAuth token is not persisted which means you need to authenticate every time you run the script.
- **`persistent_oauth.py`** shows a more advanced example of authenticating by OAuth while Bring Your Own IDP is in public preview. In this case, it shows how to use a sublcass of `OAuthPersistence` to reuse an OAuth token across script executions.
- **`set_user_agent.py`** shows how to customize the user agent header used for Thrift commands. In
this example the string `ExamplePartnerTag` will be added to the the user agent on every request.
this example the string `ExamplePartnerTag` will be added to the the user agent on every request.
- **`staging_ingestion.py`** shows how the connector handles Databricks' experimental staging ingestion commands `GET`, `PUT`, and `REMOVE`.
87 changes: 87 additions & 0 deletions examples/staging_ingestion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from databricks import sql
import os

"""
Databricks experimentally supports data ingestion of local files via a cloud staging location.
Ingestion commands will work on DBR >12. And you must include a staging_allowed_local_path kwarg when
calling sql.connect().

Use databricks-sql-connector to PUT files into the staging location where Databricks can access them:

PUT '/path/to/local/data.csv' INTO 'stage://tmp/[email protected]/salesdata/september.csv' OVERWRITE

Files in a staging location can also be retrieved with a GET command

GET 'stage://tmp/[email protected]/salesdata/september.csv' TO 'data.csv'

and deleted with a REMOVE command:

REMOVE 'stage://tmp/[email protected]/salesdata/september.csv'

Ingestion queries are passed to cursor.execute() like any other query. For GET and PUT commands, a local file
will be read or written. For security, this local file must be contained within, or descended from, a
staging_allowed_local_path of the connection.

Additionally, the connection can only manipulate files within the cloud storage location of the authenticated user.

To run this script:

1. Set the INGESTION_USER constant to the account email address of the authenticated user
2. Set the FILEPATH constant to the path of a file that will be uploaded (this example assumes its a CSV file)
3. Run this file

Note: staging_allowed_local_path can be either a Pathlike object or a list of Pathlike objects.
"""

INGESTION_USER = "[email protected]"
FILEPATH = "example.csv"

# FILEPATH can be relative to the current directory.
# Resolve it into an absolute path
_complete_path = os.path.realpath(FILEPATH)

if not os.path.exists(_complete_path):

# It's easiest to save a file in the same directory as this script. But any path to a file will work.
raise Exception(
"You need to set FILEPATH in this script to a file that actually exists."
)

# Set staging_allowed_local_path equal to the directory that contains FILEPATH
staging_allowed_local_path = os.path.split(_complete_path)[0]

with sql.connect(
server_hostname=os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path=os.getenv("DATABRICKS_HTTP_PATH"),
access_token=os.getenv("DATABRICKS_TOKEN"),
staging_allowed_local_path=staging_allowed_local_path,
) as connection:

with connection.cursor() as cursor:

# Ingestion commands are executed like any other SQL.
# Here's a sample PUT query. You can remove OVERWRITE at the end to avoid silently overwriting data.
query = f"PUT '{_complete_path}' INTO 'stage://tmp/{INGESTION_USER}/pysql_examples/demo.csv' OVERWRITE"

print(f"Uploading {FILEPATH} to staging location")
cursor.execute(query)
print("Upload was successful")

temp_fp = os.path.realpath("temp.csv")

# Here's a sample GET query. Note that `temp_fp` must also be contained within, or descended from,
# the staging_allowed_local_path.
query = (
f"GET 'stage://tmp/{INGESTION_USER}/pysql_examples/demo.csv' TO '{temp_fp}'"
)

print(f"Fetching from staging location into new file called temp.csv")
cursor.execute(query)
print("Download was successful")

# Here's a sample REMOVE query. It cleans up the the demo.csv created in our first query
query = f"REMOVE 'stage://tmp/{INGESTION_USER}/pysql_examples/demo.csv'"

print("Removing demo.csv from staging location")
cursor.execute(query)
print("Remove was successful")
Loading