|
| 1 | +""" |
| 2 | +databricks-sql-connector includes a SQLAlchemy dialect compatible with Databricks SQL. |
| 3 | +It aims to be a drop-in replacement for the crflynn/sqlalchemy-databricks project, that implements |
| 4 | +more of the Databricks API, particularly around table reflection, Alembic usage, and data |
| 5 | +ingestion with pandas. |
| 6 | +
|
| 7 | +Because of the extent of SQLAlchemy's capabilities it isn't feasible to provide examples of every |
| 8 | +usage in a single script, so we only provide a basic one here. More examples are found in our test |
| 9 | +suite at tests/e2e/sqlalchemy/test_basic.py and in the PR that implements this change: |
| 10 | +
|
| 11 | +https://github.com/databricks/databricks-sql-python/pull/57 |
| 12 | +
|
| 13 | +# What's already supported |
| 14 | +
|
| 15 | +Most of the functionality is demonstrated in the e2e tests mentioned above. The below list we |
| 16 | +derived from those test method names: |
| 17 | +
|
| 18 | + - Create and drop tables with SQLAlchemy Core |
| 19 | + - Create and drop tables with SQLAlchemy ORM |
| 20 | + - Read created tables via reflection |
| 21 | + - Modify column nullability |
| 22 | + - Insert records manually |
| 23 | + - Insert records with pandas.to_sql (note that this does not work for DataFrames with indexes) |
| 24 | +
|
| 25 | +This connector also aims to support Alembic for programmatic delta table schema maintenance. This |
| 26 | +behaviour is not yet backed by integration tests, which will follow in a subsequent PR as we learn |
| 27 | +more about customer use cases there. That said, the following behaviours have been tested manually: |
| 28 | +
|
| 29 | + - Autogenerate revisions with alembic revision --autogenerate |
| 30 | + - Upgrade and downgrade between revisions with `alembic upgrade <revision hash>` and |
| 31 | + `alembic downgrade <revision hash>` |
| 32 | +
|
| 33 | +# Known Gaps |
| 34 | + - MAP, ARRAY, and STRUCT types: this dialect can read these types out as strings. But you cannot |
| 35 | + define a SQLAlchemy model with databricks.sqlalchemy.dialect.types.DatabricksMap (e.g.) because |
| 36 | + we haven't implemented them yet. |
| 37 | + - Constraints: with the addition of information_schema to Unity Catalog, Databricks SQL supports |
| 38 | + foreign key and primary key constraints. This dialect can write these constraints but the ability |
| 39 | + for alembic to reflect and modify them programmatically has not been tested. |
| 40 | +""" |
| 41 | + |
| 42 | +import os |
| 43 | +from sqlalchemy.orm import declarative_base, Session |
| 44 | +from sqlalchemy import Column, String, Integer, BOOLEAN, create_engine, select |
| 45 | + |
| 46 | +host = os.getenv("DATABRICKS_SERVER_HOSTNAME") |
| 47 | +http_path = os.getenv("DATABRICKS_HTTP_PATH") |
| 48 | +access_token = os.getenv("DATABRICKS_TOKEN") |
| 49 | +catalog = os.getenv("DATABRICKS_CATALOG") |
| 50 | +schema = os.getenv("DATABRICKS_SCHEMA") |
| 51 | + |
| 52 | + |
| 53 | +# Extra arguments are passed untouched to the driver |
| 54 | +# See thrift_backend.py for complete list |
| 55 | +extra_connect_args = { |
| 56 | + "_tls_verify_hostname": True, |
| 57 | + "_user_agent_entry": "PySQL Example Script", |
| 58 | +} |
| 59 | + |
| 60 | +engine = create_engine( |
| 61 | + f"databricks://token:{access_token}@{host}?http_path={http_path}&catalog={catalog}&schema={schema}", |
| 62 | + connect_args=extra_connect_args, |
| 63 | +) |
| 64 | +session = Session(bind=engine) |
| 65 | +base = declarative_base(bind=engine) |
| 66 | + |
| 67 | + |
| 68 | +class SampleObject(base): |
| 69 | + |
| 70 | + __tablename__ = "mySampleTable" |
| 71 | + |
| 72 | + name = Column(String(255), primary_key=True) |
| 73 | + episodes = Column(Integer) |
| 74 | + some_bool = Column(BOOLEAN) |
| 75 | + |
| 76 | + |
| 77 | +base.metadata.create_all() |
| 78 | + |
| 79 | +sample_object_1 = SampleObject(name="Bim Adewunmi", episodes=6, some_bool=True) |
| 80 | +sample_object_2 = SampleObject(name="Miki Meek", episodes=12, some_bool=False) |
| 81 | + |
| 82 | +session.add(sample_object_1) |
| 83 | +session.add(sample_object_2) |
| 84 | + |
| 85 | +session.commit() |
| 86 | + |
| 87 | +stmt = select(SampleObject).where(SampleObject.name.in_(["Bim Adewunmi", "Miki Meek"])) |
| 88 | + |
| 89 | +output = [i for i in session.scalars(stmt)] |
| 90 | +assert len(output) == 2 |
| 91 | + |
| 92 | +base.metadata.drop_all() |
0 commit comments