Skip to content

Cloud fetch does not guarantee correct column names on queries when the plan is cached #522

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
ewengillies opened this issue Feb 27, 2025 · 3 comments
Assignees

Comments

@ewengillies
Copy link

Motivation

We use Databricks SQL to grab data regularly. We do this interactively, so sometimes we slightly tweak queries and rerun them. Something about the caching involved in use_cloud_fetch=True can mean our returned query does not have the correct column names when aliases are used. Basic way to trigger this is:

  1. Ensure use_cloud_fetch=True in the connection.
  2. Write a query with a non-trivial plan against a managed delta table.
  3. Ensure the returned data is large enough to actually utilize cloud fetch (not 100% sure if this is important).
  4. Have an alias in the final select statement, something like some_column as a .
  5. Run the query.
  6. Rename the alias, i.e. some_column as b.
  7. Rerun the query.
  8. Note that the query run in 7 has the schema (column names) of the query run in 5.
  9. This means that some_column is still called a and not called b in query 7.

Minimum Reproducible Example

I made an MRE for convenience using the taxi dataset. This is with version 4. I've seen it consistently in version 3 as well. Haven't tested 2. Here is my pip freeze for v4 setup from minimal install of pip install 'databricks-sql-connector[pyarrow]' :

certifi==2025.1.31
charset-normalizer==3.4.1
databricks-sql-connector==4.0.0
et_xmlfile==2.0.0
idna==3.10
lz4==4.4.3
numpy==1.26.4
oauthlib==3.2.2
openpyxl==3.1.5
pandas==2.2.3
pyarrow==19.0.1
python-dateutil==2.9.0.post0
pytz==2025.1
requests==2.32.3
six==1.17.0
thrift==0.20.0
tzdata==2025.1
urllib3==2.3.0

Here is the code. I generate an example managed table from the samples.nyctaxi.trips data so its truly reproducible: https://docs.databricks.com/aws/en/discover/databricks-datasets#unity-catalog-datasets

from databricks.sql import connect

SERVER_HOSTNAME = '<some_server>'
HTTP_PATH = '/sql/1.0/warehouses/<some_warehouse_id>'
ACCESS_TOKEN = '<some_access_token>'
EXAMPLE_TABLE_NAME = '<some_catalog>.<some_schema>.minimum_example_table'

def run_query_via_databricks_sql(query: str, use_cloud_fetch: bool = False,):
    with connect(
        server_hostname=SERVER_HOSTNAME,
        http_path=HTTP_PATH,
        access_token=ACCESS_TOKEN,
        use_cloud_fetch=use_cloud_fetch,
    ) as connection:
        with connection.cursor() as cursor:
            cursor.execute(query)
            data = cursor.fetchall_arrow()
            data = data.to_pandas()
            return data

# Create a managed table to play with from the Delta Shared table
# We do a cross join here just to generate enough rows to trigger
# cloud fetch and hence the bug.
print("Creating the example table")
create_the_example_table = f"""
create or replace table {EXAMPLE_TABLE_NAME} as SELECT
   some_trips.tpep_pickup_datetime,
   some_trips.tpep_dropoff_datetime,
   some_trips.trip_distance,
   some_trips.fare_amount,
   some_trips.pickup_zip,
   some_trips.dropoff_zip
FROM
  samples.nyctaxi.trips as some_trips
full outer join 
  samples.nyctaxi.trips
limit 20000000
"""
run_query_via_databricks_sql(create_the_example_table)

# Query A ends with an alias of "as a"
query_a = f"""
with some_trips as (
    SELECT
       tpep_pickup_datetime,
       tpep_dropoff_datetime,
       trip_distance,
       fare_amount,
       pickup_zip,
       dropoff_zip
    FROM
       {EXAMPLE_TABLE_NAME}
),
grouped_trips_pickup as (
    SELECT
       some_trips.pickup_zip,
       count(*) as trip_count
    FROM
       some_trips
    GROUP BY
       some_trips.pickup_zip
),
grouped_trips_dropoff as (
    SELECT
       some_trips.dropoff_zip,
       count(*) as trip_count
    FROM
       some_trips
    GROUP BY
       some_trips.dropoff_zip
),
trips_with_grouped_pickup_and_dropoff as (
    SELECT
       some_trips.*,
       grouped_trips_pickup.trip_count as pickup_trip_count,
       grouped_trips_dropoff.trip_count as dropoff_trip_count
    FROM
       some_trips
    LEFT JOIN
       grouped_trips_pickup
    ON
       some_trips.pickup_zip = grouped_trips_pickup.pickup_zip
    LEFT JOIN
       grouped_trips_dropoff
    ON
       some_trips.dropoff_zip = grouped_trips_dropoff.dropoff_zip
)
select
    tpep_pickup_datetime,
    dropoff_trip_count as a
from
    trips_with_grouped_pickup_and_dropoff
"""
# Query B ends with an alias of "as b"
query_b = query_a.replace(" as a", " as b")

# Without cloud fetch, correct schema in both
print("Running query a without cloud fetch")
df_a__no_cloud_fetch = run_query_via_databricks_sql(query_a, use_cloud_fetch=False)
print(f"Get back a column named 'a': {df_a__no_cloud_fetch.columns.to_list()=}")
print("Running query b without cloud fetch")
df_b__no_cloud_fetch = run_query_via_databricks_sql(query_b, use_cloud_fetch=False)
print(f"Get back a column named 'b': {df_b__no_cloud_fetch.columns.to_list()=}")

# With cloud fetch, schema is not updated in query_b
print("Running query a WITH cloud fetch")
df_a__with_cloud_fetch = run_query_via_databricks_sql(query_a, use_cloud_fetch=True)
print(f"Get back a column named 'a': {df_a__with_cloud_fetch.columns.to_list()=}")
print("Running query b WITH cloud fetch")
df_b__with_cloud_fetch = run_query_via_databricks_sql(query_b, use_cloud_fetch=True)
print(f"DO NOT back a column named 'b', still have column named 'a': {df_b__with_cloud_fetch.columns.to_list()=}")

What I get back is this:

Creating the example table
Running query a without cloud fetch
Get back a column named 'a': df_a__no_cloud_fetch.columns.to_list()=['tpep_pickup_datetime', 'a']
Running query b without cloud fetch
Get back a column named 'b': df_b__no_cloud_fetch.columns.to_list()=['tpep_pickup_datetime', 'b']
Running query a WITH cloud fetch
Get back a column named 'a': df_a__with_cloud_fetch.columns.to_list()=['tpep_pickup_datetime', 'a']
Running query b WITH cloud fetch
DO NOT back a column named 'b', still have column named 'a': df_b__with_cloud_fetch.columns.to_list()=['tpep_pickup_datetime', 'a']

Issues

  1. We don't get the column names back we expect.
  2. Potentially, if two aliases are used and swapped, columns can be mislabelled (haven't checked this case).
@madhav-db madhav-db self-assigned this Feb 28, 2025
@jprakash-db
Copy link
Contributor

@ewengillies The fix for this issue has been raised and will make it into the next PySQL Connector release >=v3.7.3 #505

@ewengillies
Copy link
Author

hi @jprakash-db - thanks for getting back to me so quickly. Can you help me understand the PR you linked - it looks like the default option will be set to false. Does that imply that users have to explicitly "opt-in" to this bug fix? Or is it not viewed as a bug by the maintainers?

@jprakash-db
Copy link
Contributor

@ewengillies The issue occurs because our query parser returns the same results from cache when it views the results from the queries as same. Also this issue seems does not seem to occur frequently, and as of now it is an 'opt-in' fix

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants