Skip to content

Add SQLAlchemy Dialect #57

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 17, 2023
Merged

Add SQLAlchemy Dialect #57

merged 3 commits into from
Feb 17, 2023

Conversation

susodapop
Copy link
Contributor

@susodapop susodapop commented Oct 14, 2022

Description

This pull request implements a first-party SQLAlchemy dialect compatible with Databricks SQL. It aims to be a drop-in replacement for sqlalchemy-databricks that implements more of the Databricks API, particularly around table reflection, Alembic usage, and data ingestion with pandas.

Adding a dialect for SQLAlchemy is not a well-documented process so this work was guided by the included e2e tests. I implemented only those methods of the dialect needed to pass our tests.

What's already supported

Most of the functionality is demonstrated in the e2e tests included in this pull request. The below list we derived from those test method names:

  • Create and drop tables with SQLAlchemy Core
  • Create and drop tables with SQLAlchemy ORM
  • Read created tables via reflection
  • Modify column nullability
  • Insert records manually
  • Insert records with pandas.to_sql (note that this does not work for DataFrames with indexes)

This connector also aims to support Alembic for programmatic delta table schema maintenance. This behaviour is not yet backed by integration tests, which will follow in a subsequent PR as we learn more about customer use cases there. That said, the following behaviours have been tested manually:

  • Autogenerate revisions with alembic revision --autogenerate
  • Upgrade and downgrade between revisions with alembic upgrade <revision hash> and alembic downgrade <revision hash>

What's not supported

  • MAP, ARRAY, and STRUCT types: this dialect can read these types out as strings. But you cannot define a SQLAlchemy model with databricks.sqlalchemy.dialect.types.DatabricksMap (e.g.) because we haven't implemented the logic necessary to layer these. This is a priority for development.
  • Constraints: with the addition of support the information_schema in Unity Catalog, Databricks SQL supports foreign key and primary key constraints. This dialect can write these constraints but the ability for alembic to reflect and modify them programmatically has not been tested.

Basic usage

IMPORTANT ⚠️ The connection string format has changed since the earliest commits. The prefix is now databricks:// and not databricks+thrift://

from sqlalchemy import create_engine

engine = create_engine("databricks://token:dapi*****@*****.cloud.databricks.com/?http_path=******&catalog=****&schema=***")
engine.execute("select something")

Use ORM to create a table and insert records

host="****"
http_path="***"
access_token="***"
catalog="***"
schema="***"


import datetime
from sqlalchemy.orm import declarative_base, Session
from sqlalchemy import Column, String, Integer, BOOLEAN, create_engine, select

engine = create_engine(f"databricks://token:{access_token}@{host}?http_path={http_path}&catalog={catalog}&schema={schema}")
session  = Session(bind=engine)
base = declarative_base(bind=engine)


class SampleObject(base):

    __tablename__ = "PySQLTest_{}".format(datetime.datetime.utcnow().strftime("%s"))

    name = Column(String(255), primary_key=True)
    episodes = Column(Integer),
    some_bool = Column(BOOLEAN)

base.metadata.create_all()

sample_object_1 = SampleObject(name="Bim Adewunmi", episodes=6, some_bool=True)
sample_object_2 = SampleObject(name="Miki Meek", episodes=12, some_bool=False)
session.add(sample_object_1)
session.add(sample_object_2)
session.commit()

stmt = select(SampleObject).where(SampleObject.name.in_(["Bim Adewunmi", "Miki Meek"]))

output = [i for i in session.scalars(stmt)]
assert len(output) == 2

base.metadata.drop_all()

Bulk insert data

import os, datetime, random
from sqlalchemy import create_engine, select, insert, Column, MetaData, Table
from sqlalchemy.types import Integer, String

HOST = os.environ.get("host")
HTTP_PATH = os.environ.get("http_path")
ACCESS_TOKEN = os.environ.get("access_token")
CATALOG = os.environ.get("catalog")
SCHEMA = os.environ.get("schema")

db_engine = create_engine(f"databricks://token:{ACCESS_TOKEN}@{HOST}?http_path={HTTP_PATH}&catalog={CATALOG}&schema={SCHEMA}")
metadata_obj = MetaData(bind=db_engine)

table_name = "PySQLTest_{}".format(datetime.datetime.utcnow().strftime("%s"))
names = ["Bim", "Miki", "Sarah", "Ira"]
rows = [{"name": names[i%3], "number": random.choice(range(10000))} for i in range(10000)]

SampleTable = Table(
        table_name,
        metadata_obj,
        Column("name", String(255)),
        Column("number", Integer)
)

# Create SampleTable ~5 seconds
metadata_obj.create_all()

# Insert 10k rows takes < 3 seconds
db_engine.execute(insert(SampleTable).values(rows))

results = db_engine.execute(select(SampleTable)).all()

assert len(results) == 10_000

# Drop the SampleTable
metadata_obj.drop_all()

Basic alembic workflow

After you have installed databricks-sql-connector that includes the dialect, you can run alembic init to generate an env.py and alembic.ini file. You should not need to modify alembic.ini but you need to modify env.py to do the following:

  • Import the SQLAlchemy MetaData object against which you declared your models and set target_metadata equal to it.
  • Update run_migrations_offline to import your SQLAlchemy connection string and set url equal to it
  • Update run_migrations_online to use a connectable engine

Here is an example env.py where the needed information is available in a file called main.py at the same directory level as env.py:

from logging.config import fileConfig

from sqlalchemy import engine_from_config
from sqlalchemy import pool

from alembic import context

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
from main import base
target_metadata = base.metadata

# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.


def run_migrations_offline() -> None:
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    from main import sqla_uri
    url = sqla_uri
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()


def run_migrations_online() -> None:
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """
    from main import engine
    connectable = engine

    with connectable.connect() as connection:
        context.configure(
            connection=connection, target_metadata=target_metadata
        )

        with context.begin_transaction():
            context.run_migrations()


if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

You can make your initial migration by running alembic revision --autogenerate -m "Initial". This will generate a fresh revision in the versions directory and you should see your model described. To generate the resulting table(s) in Databricks you should run alembic upgrade head. The Alembic tutorial is a good place to learn about creating subsequent revisions, downgrading etc.

@MHzl
Copy link

MHzl commented Jan 26, 2023

Thanks for the feature. 😊 Is there an ETA when this will be merged?

@susodapop
Copy link
Contributor Author

Within the next couple weeks @BMiHe 👍

@susodapop susodapop marked this pull request as ready for review January 31, 2023 22:44
Copy link
Contributor

@andrefurlan-db andrefurlan-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a bunch of comments, but mostly due to ignorance. I will try to run the code and see what happens.

@oke-aditya
Copy link

Any plans to support SQLAlchemy 2.x as 1.x series is now discontinued.

@susodapop
Copy link
Contributor Author

Any plans to support SQLAlchemy 2.x as 1.x series is now discontinued.

@oke-aditya Yes. The dialect already supports usage with SQLAlchemy 2.0's API. We'll actually bump the dependency version in a future release.

Copy link
Contributor

@andrefurlan-db andrefurlan-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

Jesse Whitehouse added 2 commits February 17, 2023 16:37
Signed-off-by: Jesse Whitehouse <[email protected]>
Signed-off-by: Jesse Whitehouse <[email protected]>
Signed-off-by: Jesse Whitehouse <[email protected]>
@susodapop susodapop merged commit 3eaaac9 into main Feb 17, 2023
@susodapop susodapop deleted the PECO-231 branch February 17, 2023 23:07
@ahsankhawaja
Copy link

Hi Guys, with the latest 2.4.0 can we do multi table transactions ? rollback ?
Many Thanks

@susodapop
Copy link
Contributor Author

@ahsankhawaja The version of this connector is not relevant to your question. Databricks doesn't use transactions, so the connector doesn't support them either.

@ahsankhawaja
Copy link

ahsankhawaja commented Mar 7, 2023

Databricks does transactions https://learn.microsoft.com/en-us/azure/databricks/lakehouse/acid#--how-are-transactions-scoped-on-azure-databricks

or is there something else that you were referring to?

Also the fact that Python connector underneath using PEP 249 – Python Database API Specification v2.0, which has connection object with commit / rollbacks (think these are not implemented yet) https://peps.python.org/pep-0249/#connection-methods ?

Thanks for taking time to answer

@susodapop
Copy link
Contributor Author

@ahsankhawaja Good questions! But no, Databricks does not support SQL transactions.

Databricks does transactions https://learn.microsoft.com/en-us/azure/databricks/lakehouse/acid#--how-are-transactions-scoped-on-azure-databricks

The documentation you linked references ACID transactions, which are a feature of the storage layer called Delta Lake. These are different than SQL transactions. It's the same word but refers to a different concept.

Also the fact that Python connector underneath using PEP 249

PEP-249 only requires commit support when the Database back-end supports it. Quoting from the doc you linked: "Database modules that do not support transactions should implement this method with void functionality." Which is exactly what this connector does.

@oke-aditya
Copy link

So this means we can't use this connector to do full CRUD operations on delta tables?
Only Select is possible??

@susodapop
Copy link
Contributor Author

So this means we can't use this connector to do full CRUD operations on delta tables?

No, it doesn't mean that at all! You can absolutely do CRUD on delta tables.

databricks-sql-connector is just a way to write SQL statements and send them to a Databricks cluster. Any valid SQL will work. That includes SELECT, INSERT, DELETE, GRANT, SET and dozens of other keywords (the whole language spec is here). For context, if you've ever run a query in Databricks SQL through your browser it used databricks-sql-connector :)

@ahsankhawaja's question was about SQL transaction support i.e. writing a query that includes BEGIN TRANSACTION and COMMIT TRANSACTION statements. Spark SQL / Databricks SQL don't have this syntax so they won't work with databricks-sql-connector.

@ahsankhawaja
Copy link

as @susodapop said @oke-aditya yes can do, I have build an API on top of it that does select, crud ops so I can expose my Lakehouse to any language / application, I was more intersted in using Lakehouse as backend for web apps, so we can have same platform doing all things, I saw Databricks released RESTAPI support as well other day https://www.databricks.com/blog/2023/03/07/databricks-sql-statement-execution-api-announcing-public-preview.html, but that was just basic select no crud there. this connector is awesome, I wish at some point we can add transactions support in there.

Awesome work Jess

@susodapop
Copy link
Contributor Author

I wish at some point we can add transactions support in there.

Please communicate this with your contact at Databricks! I know there's interest in Multi-statement transaction support. The best way to increase its priority is to ask for it concretely. The more customers ask for it the more traction it receives. That needs to happen through your Databricks contact rather than this open source forum, that way it can be routed to the correct places internally :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants