|
| 1 | +from databricks import sql |
| 2 | +import os |
| 3 | + |
| 4 | +""" |
| 5 | +Databricks experimentally supports data ingestion of local files via a cloud staging location. |
| 6 | +Ingestion commands will work on DBR >12. And you must include a staging_allowed_local_path kwarg when |
| 7 | +calling sql.connect(). |
| 8 | +
|
| 9 | +Use databricks-sql-connector to PUT files into the staging location where Databricks can access them: |
| 10 | +
|
| 11 | + PUT '/path/to/local/data.csv' INTO 'stage://tmp/[email protected]/salesdata/september.csv' OVERWRITE |
| 12 | +
|
| 13 | +Files in a staging location can also be retrieved with a GET command |
| 14 | +
|
| 15 | + GET 'stage://tmp/[email protected]/salesdata/september.csv' TO 'data.csv' |
| 16 | +
|
| 17 | +and deleted with a REMOVE command: |
| 18 | +
|
| 19 | + REMOVE 'stage://tmp/[email protected]/salesdata/september.csv' |
| 20 | +
|
| 21 | +Ingestion queries are passed to cursor.execute() like any other query. For GET and PUT commands, a local file |
| 22 | +will be read or written. For security, this local file must be contained within, or descended from, a |
| 23 | +staging_allowed_local_path of the connection. |
| 24 | +
|
| 25 | +Additionally, the connection can only manipulate files within the cloud storage location of the authenticated user. |
| 26 | +
|
| 27 | +To run this script: |
| 28 | +
|
| 29 | +1. Set the INGESTION_USER constant to the account email address of the authenticated user |
| 30 | +2. Set the FILEPATH constant to the path of a file that will be uploaded (this example assumes its a CSV file) |
| 31 | +3. Run this file |
| 32 | +
|
| 33 | +Note: staging_allowed_local_path can be either a Pathlike object or a list of Pathlike objects. |
| 34 | +""" |
| 35 | + |
| 36 | +INGESTION_USER = "[email protected]" |
| 37 | +FILEPATH = "example.csv" |
| 38 | + |
| 39 | +# FILEPATH can be relative to the current directory. |
| 40 | +# Resolve it into an absolute path |
| 41 | +_complete_path = os.path.realpath(FILEPATH) |
| 42 | + |
| 43 | +if not os.path.exists(_complete_path): |
| 44 | + |
| 45 | + # It's easiest to save a file in the same directory as this script. But any path to a file will work. |
| 46 | + raise Exception( |
| 47 | + "You need to set FILEPATH in this script to a file that actually exists." |
| 48 | + ) |
| 49 | + |
| 50 | +# Set staging_allowed_local_path equal to the directory that contains FILEPATH |
| 51 | +staging_allowed_local_path = os.path.split(_complete_path)[0] |
| 52 | + |
| 53 | +with sql.connect( |
| 54 | + server_hostname=os.getenv("DATABRICKS_SERVER_HOSTNAME"), |
| 55 | + http_path=os.getenv("DATABRICKS_HTTP_PATH"), |
| 56 | + access_token=os.getenv("DATABRICKS_TOKEN"), |
| 57 | + staging_allowed_local_path=staging_allowed_local_path, |
| 58 | +) as connection: |
| 59 | + |
| 60 | + with connection.cursor() as cursor: |
| 61 | + |
| 62 | + # Ingestion commands are executed like any other SQL. |
| 63 | + # Here's a sample PUT query. You can remove OVERWRITE at the end to avoid silently overwriting data. |
| 64 | + query = f"PUT '{_complete_path}' INTO 'stage://tmp/{INGESTION_USER}/pysql_examples/demo.csv' OVERWRITE" |
| 65 | + |
| 66 | + print(f"Uploading {FILEPATH} to staging location") |
| 67 | + cursor.execute(query) |
| 68 | + print("Upload was successful") |
| 69 | + |
| 70 | + temp_fp = os.path.realpath("temp.csv") |
| 71 | + |
| 72 | + # Here's a sample GET query. Note that `temp_fp` must also be contained within, or descended from, |
| 73 | + # the staging_allowed_local_path. |
| 74 | + query = ( |
| 75 | + f"GET 'stage://tmp/{INGESTION_USER}/pysql_examples/demo.csv' TO '{temp_fp}'" |
| 76 | + ) |
| 77 | + |
| 78 | + print(f"Fetching from staging location into new file called temp.csv") |
| 79 | + cursor.execute(query) |
| 80 | + print("Download was successful") |
| 81 | + |
| 82 | + # Here's a sample REMOVE query. It cleans up the the demo.csv created in our first query |
| 83 | + query = f"REMOVE 'stage://tmp/{INGESTION_USER}/pysql_examples/demo.csv'" |
| 84 | + |
| 85 | + print("Removing demo.csv from staging location") |
| 86 | + cursor.execute(query) |
| 87 | + print("Remove was successful") |
0 commit comments