Skip to content

Introduce Backend Interface (DatabricksClient) #573

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 149 commits into from
May 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
149 commits
Select commit Hold shift + click to select a range
5f00532
move py.typed to correct places (#403)
wyattscarpenter Jul 2, 2024
59ed5ce
Upgrade mypy (#406)
wyattscarpenter Jul 3, 2024
c95951d
Do not retry failing requests with status code 401 (#408)
Hodnebo Jul 3, 2024
335d918
[PECO-1715] Remove username/password (BasicAuth) auth option (#409)
jackyhu-db Jul 4, 2024
ee4f94c
[PECO-1751] Refactor CloudFetch downloader: handle files sequentially…
kravets-levko Jul 11, 2024
1c8bb11
Fix CloudFetch retry policy to be compatible with all `urllib3` versi…
kravets-levko Jul 11, 2024
9de280e
Disable SSL verification for CloudFetch links (#414)
kravets-levko Jul 16, 2024
04b626a
Prepare relese 3.3.0 (#415)
kravets-levko Jul 17, 2024
2a01173
Fix pandas 2.2.2 support (#416)
kfollesdal Jul 26, 2024
b1faa09
[PECO-1801] Make OAuth as the default authenticator if no authenticat…
jackyhu-db Aug 1, 2024
270edcf
[PECO-1857] Use SSL options with HTTPS connection pool (#425)
kravets-levko Aug 22, 2024
8523fd3
Prepare release v3.4.0 (#430)
kravets-levko Aug 27, 2024
763f070
[PECO-1926] Create a non pyarrow flow to handle small results for the…
jprakash-db Oct 3, 2024
1e0d9d5
[PECO-1961] On non-retryable error, ensure PySQL includes useful info…
shivam2680 Oct 3, 2024
890cdd7
Reformatted all the files using black (#448)
jprakash-db Oct 3, 2024
9bdee1d
Prepare release v3.5.0 (#457)
jackyhu-db Oct 18, 2024
cdd7a19
[PECO-2051] Add custom auth headers into cloud fetch request (#460)
jackyhu-db Oct 25, 2024
fcc2da9
Prepare release 3.6.0 (#461)
jackyhu-db Oct 25, 2024
d354309
[ PECO - 1768 ] PySQL: adjust HTTP retry logic to align with Go and N…
jprakash-db Nov 20, 2024
d63544e
[ PECO-2065 ] Create the async execution flow for the PySQL Connector…
jprakash-db Nov 26, 2024
5bbf223
Fix for check_types github action failing (#472)
jprakash-db Nov 26, 2024
9c62b21
Remove upper caps on dependencies (#452)
arredond Dec 5, 2024
7bb7ca6
Updated the doc to specify native parameters in PUT operation is not …
jprakash-db Dec 6, 2024
438a080
Incorrect rows in inline fetch result (#479)
jprakash-db Dec 22, 2024
eb50411
Bumped up to version 3.7.0 (#482)
jprakash-db Dec 23, 2024
2a5b9c7
PySQL Connector split into connector and sqlalchemy (#444)
jprakash-db Dec 27, 2024
d31aa59
Removed CI CD for python3.8 (#490)
jprakash-db Jan 17, 2025
3e62c90
Added CI CD upto python 3.12 (#491)
jprakash-db Jan 18, 2025
f9a6b13
Merging changes from v3.7.1 release (#488)
jprakash-db Jan 18, 2025
a941575
Bumped up to version 4.0.0 (#493)
jprakash-db Jan 22, 2025
032c276
Updated action's version (#455)
newwingbird Feb 27, 2025
d36889d
Support Python 3.13 and update deps (#510)
dhirschfeld Feb 27, 2025
22e5ce4
Improve debugging + fix PR review template (#514)
samikshya-db Mar 2, 2025
7772403
Forward porting all changes into 4.x.x. uptil v3.7.3 (#529)
jprakash-db Mar 7, 2025
8b27150
Updated the actions/cache version (#532)
jprakash-db Mar 7, 2025
398db45
Updated the CODEOWNERS (#531)
jprakash-db Mar 7, 2025
c962b63
Add version check for urllib3 in backoff calculation (#526)
shivam2680 Mar 11, 2025
c246872
[ES-1372353] make user_agent_header part of public API (#530)
shivam2680 Mar 12, 2025
326f338
Updates runner used to run DCO check to use databricks-protected-runn…
madhav-db Mar 12, 2025
37e73a9
Support multiple timestamp formats in non arrow flow (#533)
jprakash-db Mar 18, 2025
3d7123c
prepare release for v4.0.1 (#534)
shivam2680 Mar 19, 2025
132e1b7
Relaxed bound for python-dateutil (#538)
jprakash-db Apr 1, 2025
46090c0
Bumped up the version for 4.0.2 (#539)
jprakash-db Apr 1, 2025
28249c0
Added example for async execute query (#537)
jprakash-db Apr 1, 2025
5ab0a2c
Added urllib3 version check (#547)
jprakash-db Apr 21, 2025
6528cd1
Bump version to 4.0.3 (#549)
jprakash-db Apr 22, 2025
8f7754b
Cleanup fields as they might be deprecated/removed/change in the futu…
vikrantpuppala May 9, 2025
f7d3865
Refactor decimal conversion in PyArrow tables to use direct casting (…
jayantsing-db May 12, 2025
61cc398
[PECOBLR-361] convert column table to arrow if arrow present (#551)
shivam2680 May 16, 2025
554d011
decouple session class from existing Connection
varun-edachali-dbx May 20, 2025
6f28297
add open property to Connection to ensure maintenance of existing API
varun-edachali-dbx May 20, 2025
983ec03
update unit tests to address ThriftBackend through session instead of…
varun-edachali-dbx May 20, 2025
6f3b5b7
chore: move session specific tests from test_client to test_session
varun-edachali-dbx May 20, 2025
29a2840
formatting (black)
varun-edachali-dbx May 20, 2025
0d28b69
use connection open property instead of long chain through session
varun-edachali-dbx May 20, 2025
8cb8cdd
trigger integration workflow
varun-edachali-dbx May 20, 2025
4495f9b
fix: ensure open attribute of Connection never fails
varun-edachali-dbx May 21, 2025
c744117
introduce databricksClient interface and thrift backend implementation
varun-edachali-dbx May 22, 2025
ef5a06b
change names of ThriftBackend -> ThriftDatabricksClient in tests
varun-edachali-dbx May 22, 2025
abbaaa5
fix: remove excess debug log
varun-edachali-dbx May 22, 2025
33765cb
fix: replace thrift_backend with backend in result set param
varun-edachali-dbx May 22, 2025
788d8c7
fix: replace module replacement with concrete mock instance in execut…
varun-edachali-dbx May 22, 2025
4debbd3
formatting: black + re-organise backend into new dir
varun-edachali-dbx May 23, 2025
0e6e215
fix: sql.thrift_backend -> sql.backend.thrift_backend in tests and ex…
varun-edachali-dbx May 23, 2025
925394c
Update CODEOWNERS (#562)
jprakash-db May 21, 2025
4ad6c8d
Enhance Cursor close handling and context manager exception managemen…
madhav-db May 21, 2025
51369c8
PECOBLR-86 improve logging on python driver (#556)
saishreeeee May 22, 2025
9541464
Update github actions run conditions (#569)
jprakash-db May 26, 2025
cbdd3d7
remove un-necessary example change
varun-edachali-dbx May 26, 2025
ca38e95
[empty commit] trigger integration tests
varun-edachali-dbx May 26, 2025
b40c0fd
fix: use backend in Cursor, not thrift_backend
varun-edachali-dbx May 26, 2025
35ed462
fix: backend references in integration tests
varun-edachali-dbx May 26, 2025
37f3af1
fix: thrift_backend -> backend in ResultSet reference in e2e test
varun-edachali-dbx May 26, 2025
09c5e2f
introduce normalised sessionId and CommandId for (near) complete back…
varun-edachali-dbx May 27, 2025
4ce6aab
fix: Any is not defined
varun-edachali-dbx May 27, 2025
307f447
fix: get_session_id_hex() is not defined
varun-edachali-dbx May 27, 2025
802d8dc
command_handle -> command_id in ExecuteResponse
varun-edachali-dbx May 27, 2025
944d446
fix: active op handle -> active command id in Cursor
varun-edachali-dbx May 27, 2025
6338083
fixed (most) tests by accounting for normalised Session interface
varun-edachali-dbx May 27, 2025
3658a91
fix: convert command id to operationHandle in status_request
varun-edachali-dbx May 27, 2025
8ef6ed6
decouple session class from existing Connection
varun-edachali-dbx May 20, 2025
61300b2
add open property to Connection to ensure maintenance of existing API
varun-edachali-dbx May 20, 2025
44e7d17
formatting (black)
varun-edachali-dbx May 20, 2025
d2035ea
use connection open property instead of long chain through session
varun-edachali-dbx May 20, 2025
8b4451b
trigger integration workflow
varun-edachali-dbx May 20, 2025
d21d2c3
fix: ensure open attribute of Connection never fails
varun-edachali-dbx May 21, 2025
21068a3
fix: de-complicate earlier connection open logic
varun-edachali-dbx May 23, 2025
476e763
Revert "fix: de-complicate earlier connection open logic"
varun-edachali-dbx May 23, 2025
1e1cf1e
[empty commit] attempt to trigger ci e2e workflow
varun-edachali-dbx May 23, 2025
b408c2c
PECOBLR-86 improve logging on python driver (#556)
saishreeeee May 22, 2025
73649f2
Revert "Merge remote-tracking branch 'upstream/sea-migration' into de…
varun-edachali-dbx May 23, 2025
a61df99
Reapply "Merge remote-tracking branch 'upstream/sea-migration' into d…
varun-edachali-dbx May 23, 2025
e1a2c0e
fix: separate session opening logic from instantiation
varun-edachali-dbx May 23, 2025
71ba9d5
chore: use get_handle() instead of private session attribute in client
varun-edachali-dbx May 24, 2025
160ba9f
fix: remove accidentally removed assertions
varun-edachali-dbx May 26, 2025
6b3436f
generalise open session, fix session tests to consider positional args
varun-edachali-dbx May 27, 2025
30849dc
formatting (black)
varun-edachali-dbx May 27, 2025
4d455bb
correct session logic after duplication during merge
varun-edachali-dbx May 27, 2025
6fc0834
args -> kwargs in tests
varun-edachali-dbx May 27, 2025
d254e48
delegate protocol version to SessionId
varun-edachali-dbx May 28, 2025
370627d
ids -> backend/types
varun-edachali-dbx May 28, 2025
ca1b57d
update open session with normalised SessionId
varun-edachali-dbx May 28, 2025
6c120c0
Merging changes from v3.7.1 release (#488)
jprakash-db Jan 18, 2025
cdf6865
Support Python 3.13 and update deps (#510)
dhirschfeld Feb 27, 2025
12ce717
Updated the actions/cache version (#532)
jprakash-db Mar 7, 2025
1215fd8
Add version check for urllib3 in backoff calculation (#526)
shivam2680 Mar 11, 2025
dd083f6
Support multiple timestamp formats in non arrow flow (#533)
jprakash-db Mar 18, 2025
8d30436
Added example for async execute query (#537)
jprakash-db Apr 1, 2025
066aef9
Added urllib3 version check (#547)
jprakash-db Apr 21, 2025
1ed3514
decouple session class from existing Connection
varun-edachali-dbx May 20, 2025
ca80f94
formatting (black)
varun-edachali-dbx May 20, 2025
6027fb1
use connection open property instead of long chain through session
varun-edachali-dbx May 20, 2025
7a2f9b5
trigger integration workflow
varun-edachali-dbx May 20, 2025
39294e9
fix: ensure open attribute of Connection never fails
varun-edachali-dbx May 21, 2025
709e910
Revert "fix: de-complicate earlier connection open logic"
varun-edachali-dbx May 23, 2025
1ad0ace
[empty commit] attempt to trigger ci e2e workflow
varun-edachali-dbx May 23, 2025
913da63
PECOBLR-86 improve logging on python driver (#556)
saishreeeee May 22, 2025
d8159e7
Revert "Merge remote-tracking branch 'upstream/sea-migration' into de…
varun-edachali-dbx May 23, 2025
0b91183
Reapply "Merge remote-tracking branch 'upstream/sea-migration' into d…
varun-edachali-dbx May 23, 2025
ff78b5f
fix: separate session opening logic from instantiation
varun-edachali-dbx May 23, 2025
c1d53d2
Enhance Cursor close handling and context manager exception managemen…
madhav-db May 21, 2025
a5a8e51
PECOBLR-86 improve logging on python driver (#556)
saishreeeee May 22, 2025
f7be10c
New Complex type test table + Github Action changes (#575)
jprakash-db May 28, 2025
a888dd6
remove excess logs, assertions, instantiations
varun-edachali-dbx May 28, 2025
29a2985
Merge remote-tracking branch 'origin/sea-migration' into backend-inte…
varun-edachali-dbx May 28, 2025
9b9735e
formatting (black) + remove excess log (merge artifact)
varun-edachali-dbx May 28, 2025
0a8226c
fix typing
varun-edachali-dbx May 28, 2025
42263c4
remove un-necessary check
varun-edachali-dbx May 28, 2025
ac984e4
remove un-necessary replace call
varun-edachali-dbx May 28, 2025
8da84e8
introduce __str__ methods for CommandId and SessionId
varun-edachali-dbx May 28, 2025
eecc67d
docstrings for DatabricksClient interface
varun-edachali-dbx May 29, 2025
9800636
stronger typing of Cursor and ExecuteResponse
varun-edachali-dbx May 29, 2025
e07f56c
remove utility functions from backend interface, fix circular import
varun-edachali-dbx May 29, 2025
73fb141
rename info to properties
varun-edachali-dbx May 29, 2025
d838653
newline for cleanliness
varun-edachali-dbx May 29, 2025
6654f06
fix circular import
varun-edachali-dbx May 29, 2025
89425f9
formatting (black)
varun-edachali-dbx May 29, 2025
93e55e8
to_hex_id -> get_hex_id
varun-edachali-dbx May 29, 2025
7689d75
better comment on protocol version getter
varun-edachali-dbx May 29, 2025
1ec8c45
formatting (black)
varun-edachali-dbx May 29, 2025
bc5e716
move guid to hex id to new utils module
varun-edachali-dbx May 29, 2025
8b6ba56
formatting (black)
varun-edachali-dbx May 29, 2025
c0b989c
move staging allowed local path to connection props
varun-edachali-dbx May 29, 2025
ece7616
add strong return type for execute_command
varun-edachali-dbx May 30, 2025
765bbd7
skip auth, error handling in databricksclient interface
varun-edachali-dbx May 30, 2025
54077f7
chore: docstring + line width
varun-edachali-dbx May 30, 2025
91357f4
get_id -> get_guid
varun-edachali-dbx May 30, 2025
26c017d
chore: docstring
varun-edachali-dbx May 30, 2025
8f085d5
fix: to_hex_id -> to_hex_guid
varun-edachali-dbx May 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
344 changes: 344 additions & 0 deletions src/databricks/sql/backend/databricks_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,344 @@
"""
Abstract client interface for interacting with Databricks SQL services.

Implementations of this class are responsible for:
- Managing connections to Databricks SQL services
- Executing SQL queries and commands
- Retrieving query results
- Fetching metadata about catalogs, schemas, tables, and columns
"""

from abc import ABC, abstractmethod
from typing import Dict, Tuple, List, Optional, Any, Union, TYPE_CHECKING

if TYPE_CHECKING:
from databricks.sql.client import Cursor

from databricks.sql.thrift_api.TCLIService import ttypes
from databricks.sql.backend.types import SessionId, CommandId
from databricks.sql.utils import ExecuteResponse
from databricks.sql.types import SSLOptions


class DatabricksClient(ABC):
# == Connection and Session Management ==
@abstractmethod
def open_session(
self,
session_configuration: Optional[Dict[str, Any]],
catalog: Optional[str],
schema: Optional[str],
) -> SessionId:
"""
Opens a new session with the Databricks SQL service.

This method establishes a new session with the server and returns a session
identifier that can be used for subsequent operations.

Args:
session_configuration: Optional dictionary of configuration parameters for the session
catalog: Optional catalog name to use as the initial catalog for the session
schema: Optional schema name to use as the initial schema for the session

Returns:
SessionId: A session identifier object that can be used for subsequent operations

Raises:
Error: If the session configuration is invalid
OperationalError: If there's an error establishing the session
InvalidServerResponseError: If the server response is invalid or unexpected
"""
pass

@abstractmethod
def close_session(self, session_id: SessionId) -> None:
"""
Closes an existing session with the Databricks SQL service.

This method terminates the session identified by the given session ID and
releases any resources associated with it.

Args:
session_id: The session identifier returned by open_session()

Raises:
ValueError: If the session ID is invalid
OperationalError: If there's an error closing the session
"""
pass

# == Query Execution, Command Management ==
@abstractmethod
def execute_command(
self,
operation: str,
session_id: SessionId,
max_rows: int,
max_bytes: int,
lz4_compression: bool,
cursor: "Cursor",
use_cloud_fetch: bool,
parameters: List[ttypes.TSparkParameter],
async_op: bool,
enforce_embedded_schema_correctness: bool,
) -> Optional[ExecuteResponse]:
"""
Executes a SQL command or query within the specified session.

This method sends a SQL command to the server for execution and handles
the response. It can operate in both synchronous and asynchronous modes.

Args:
operation: The SQL command or query to execute
session_id: The session identifier in which to execute the command
max_rows: Maximum number of rows to fetch in a single fetch batch
max_bytes: Maximum number of bytes to fetch in a single fetch batch
lz4_compression: Whether to use LZ4 compression for result data
cursor: The cursor object that will handle the results
use_cloud_fetch: Whether to use cloud fetch for retrieving large result sets
parameters: List of parameters to bind to the query
async_op: Whether to execute the command asynchronously
enforce_embedded_schema_correctness: Whether to enforce schema correctness

Returns:
If async_op is False, returns an ExecuteResponse object containing the
query results and metadata. If async_op is True, returns None and the
results must be fetched later using get_execution_result().

Raises:
ValueError: If the session ID is invalid
OperationalError: If there's an error executing the command
ServerOperationError: If the server encounters an error during execution
"""
pass

@abstractmethod
def cancel_command(self, command_id: CommandId) -> None:
"""
Cancels a running command or query.

This method attempts to cancel a command that is currently being executed.
It can be called from a different thread than the one executing the command.

Args:
command_id: The command identifier to cancel

Raises:
ValueError: If the command ID is invalid
OperationalError: If there's an error canceling the command
"""
pass

@abstractmethod
def close_command(self, command_id: CommandId) -> ttypes.TStatus:
"""
Closes a command and releases associated resources.

This method informs the server that the client is done with the command
and any resources associated with it can be released.

Args:
command_id: The command identifier to close

Returns:
ttypes.TStatus: The status of the close operation

Raises:
ValueError: If the command ID is invalid
OperationalError: If there's an error closing the command
"""
pass

@abstractmethod
def get_query_state(self, command_id: CommandId) -> ttypes.TOperationState:
"""
Gets the current state of a query or command.

This method retrieves the current execution state of a command from the server.

Args:
command_id: The command identifier to check

Returns:
ttypes.TOperationState: The current state of the command

Raises:
ValueError: If the command ID is invalid
OperationalError: If there's an error retrieving the state
ServerOperationError: If the command is in an error state
DatabaseError: If the command has been closed unexpectedly
"""
pass

@abstractmethod
def get_execution_result(
self,
command_id: CommandId,
cursor: "Cursor",
) -> ExecuteResponse:
"""
Retrieves the results of a previously executed command.

This method fetches the results of a command that was executed asynchronously
or retrieves additional results from a command that has more rows available.

Args:
command_id: The command identifier for which to retrieve results
cursor: The cursor object that will handle the results

Returns:
ExecuteResponse: An object containing the query results and metadata

Raises:
ValueError: If the command ID is invalid
OperationalError: If there's an error retrieving the results
"""
pass

# == Metadata Operations ==
@abstractmethod
def get_catalogs(
self,
session_id: SessionId,
max_rows: int,
max_bytes: int,
cursor: "Cursor",
) -> ExecuteResponse:
"""
Retrieves a list of available catalogs.

This method fetches metadata about all catalogs available in the current
session's context.

Args:
session_id: The session identifier
max_rows: Maximum number of rows to fetch in a single batch
max_bytes: Maximum number of bytes to fetch in a single batch
cursor: The cursor object that will handle the results

Returns:
ExecuteResponse: An object containing the catalog metadata

Raises:
ValueError: If the session ID is invalid
OperationalError: If there's an error retrieving the catalogs
"""
pass

@abstractmethod
def get_schemas(
self,
session_id: SessionId,
max_rows: int,
max_bytes: int,
cursor: "Cursor",
catalog_name: Optional[str] = None,
schema_name: Optional[str] = None,
) -> ExecuteResponse:
"""
Retrieves a list of schemas, optionally filtered by catalog and schema name patterns.

This method fetches metadata about schemas available in the specified catalog
or all catalogs if no catalog is specified.

Args:
session_id: The session identifier
max_rows: Maximum number of rows to fetch in a single batch
max_bytes: Maximum number of bytes to fetch in a single batch
cursor: The cursor object that will handle the results
catalog_name: Optional catalog name pattern to filter by
schema_name: Optional schema name pattern to filter by

Returns:
ExecuteResponse: An object containing the schema metadata

Raises:
ValueError: If the session ID is invalid
OperationalError: If there's an error retrieving the schemas
"""
pass

@abstractmethod
def get_tables(
self,
session_id: SessionId,
max_rows: int,
max_bytes: int,
cursor: "Cursor",
catalog_name: Optional[str] = None,
schema_name: Optional[str] = None,
table_name: Optional[str] = None,
table_types: Optional[List[str]] = None,
) -> ExecuteResponse:
"""
Retrieves a list of tables, optionally filtered by catalog, schema, table name, and table types.

This method fetches metadata about tables available in the specified catalog
and schema, or all catalogs and schemas if not specified.

Args:
session_id: The session identifier
max_rows: Maximum number of rows to fetch in a single batch
max_bytes: Maximum number of bytes to fetch in a single batch
cursor: The cursor object that will handle the results
catalog_name: Optional catalog name pattern to filter by
schema_name: Optional schema name pattern to filter by
table_name: Optional table name pattern to filter by
table_types: Optional list of table types to filter by (e.g., ['TABLE', 'VIEW'])

Returns:
ExecuteResponse: An object containing the table metadata

Raises:
ValueError: If the session ID is invalid
OperationalError: If there's an error retrieving the tables
"""
pass

@abstractmethod
def get_columns(
self,
session_id: SessionId,
max_rows: int,
max_bytes: int,
cursor: "Cursor",
catalog_name: Optional[str] = None,
schema_name: Optional[str] = None,
table_name: Optional[str] = None,
column_name: Optional[str] = None,
) -> ExecuteResponse:
"""
Retrieves a list of columns, optionally filtered by catalog, schema, table, and column name patterns.

This method fetches metadata about columns available in the specified table,
or all tables if not specified.

Args:
session_id: The session identifier
max_rows: Maximum number of rows to fetch in a single batch
max_bytes: Maximum number of bytes to fetch in a single batch
cursor: The cursor object that will handle the results
catalog_name: Optional catalog name pattern to filter by
schema_name: Optional schema name pattern to filter by
table_name: Optional table name pattern to filter by
column_name: Optional column name pattern to filter by

Returns:
ExecuteResponse: An object containing the column metadata

Raises:
ValueError: If the session ID is invalid
OperationalError: If there's an error retrieving the columns
"""
pass

@property
@abstractmethod
def max_download_threads(self) -> int:
"""
Gets the maximum number of download threads for cloud fetch operations.

Returns:
int: The maximum number of download threads
"""
pass
Loading
Loading