Skip to content

Commit 57370b3

Browse files
Introduce Backend Interface (DatabricksClient) (#573)
NOTE: the `test_complex_types` e2e test was not working at the time of this merge. The test must be triggered when the test is back up and running as intended. * remove excess logs, assertions, instantiations large merge artifacts Signed-off-by: varun-edachali-dbx <[email protected]> * formatting (black) + remove excess log (merge artifact) Signed-off-by: varun-edachali-dbx <[email protected]> * fix typing Signed-off-by: varun-edachali-dbx <[email protected]> * remove un-necessary check Signed-off-by: varun-edachali-dbx <[email protected]> * remove un-necessary replace call Signed-off-by: varun-edachali-dbx <[email protected]> * introduce __str__ methods for CommandId and SessionId Signed-off-by: varun-edachali-dbx <[email protected]> * docstrings for DatabricksClient interface Signed-off-by: varun-edachali-dbx <[email protected]> * stronger typing of Cursor and ExecuteResponse Signed-off-by: varun-edachali-dbx <[email protected]> * remove utility functions from backend interface, fix circular import Signed-off-by: varun-edachali-dbx <[email protected]> * rename info to properties Signed-off-by: varun-edachali-dbx <[email protected]> * newline for cleanliness Signed-off-by: varun-edachali-dbx <[email protected]> * fix circular import Signed-off-by: varun-edachali-dbx <[email protected]> * formatting (black) Signed-off-by: varun-edachali-dbx <[email protected]> * to_hex_id -> get_hex_id Signed-off-by: varun-edachali-dbx <[email protected]> * better comment on protocol version getter Signed-off-by: varun-edachali-dbx <[email protected]> * formatting (black) Signed-off-by: varun-edachali-dbx <[email protected]> * move guid to hex id to new utils module Signed-off-by: varun-edachali-dbx <[email protected]> * formatting (black) Signed-off-by: varun-edachali-dbx <[email protected]> * move staging allowed local path to connection props Signed-off-by: varun-edachali-dbx <[email protected]> * add strong return type for execute_command Signed-off-by: varun-edachali-dbx <[email protected]> * skip auth, error handling in databricksclient interface Signed-off-by: varun-edachali-dbx <[email protected]> * chore: docstring + line width Signed-off-by: varun-edachali-dbx <[email protected]> * get_id -> get_guid Signed-off-by: varun-edachali-dbx <[email protected]> * chore: docstring Signed-off-by: varun-edachali-dbx <[email protected]> * fix: to_hex_id -> to_hex_guid Signed-off-by: varun-edachali-dbx <[email protected]> --------- Signed-off-by: varun-edachali-dbx <[email protected]>
1 parent 5e5147b commit 57370b3

15 files changed

+1185
-406
lines changed
Lines changed: 344 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,344 @@
1+
"""
2+
Abstract client interface for interacting with Databricks SQL services.
3+
4+
Implementations of this class are responsible for:
5+
- Managing connections to Databricks SQL services
6+
- Executing SQL queries and commands
7+
- Retrieving query results
8+
- Fetching metadata about catalogs, schemas, tables, and columns
9+
"""
10+
11+
from abc import ABC, abstractmethod
12+
from typing import Dict, Tuple, List, Optional, Any, Union, TYPE_CHECKING
13+
14+
if TYPE_CHECKING:
15+
from databricks.sql.client import Cursor
16+
17+
from databricks.sql.thrift_api.TCLIService import ttypes
18+
from databricks.sql.backend.types import SessionId, CommandId
19+
from databricks.sql.utils import ExecuteResponse
20+
from databricks.sql.types import SSLOptions
21+
22+
23+
class DatabricksClient(ABC):
24+
# == Connection and Session Management ==
25+
@abstractmethod
26+
def open_session(
27+
self,
28+
session_configuration: Optional[Dict[str, Any]],
29+
catalog: Optional[str],
30+
schema: Optional[str],
31+
) -> SessionId:
32+
"""
33+
Opens a new session with the Databricks SQL service.
34+
35+
This method establishes a new session with the server and returns a session
36+
identifier that can be used for subsequent operations.
37+
38+
Args:
39+
session_configuration: Optional dictionary of configuration parameters for the session
40+
catalog: Optional catalog name to use as the initial catalog for the session
41+
schema: Optional schema name to use as the initial schema for the session
42+
43+
Returns:
44+
SessionId: A session identifier object that can be used for subsequent operations
45+
46+
Raises:
47+
Error: If the session configuration is invalid
48+
OperationalError: If there's an error establishing the session
49+
InvalidServerResponseError: If the server response is invalid or unexpected
50+
"""
51+
pass
52+
53+
@abstractmethod
54+
def close_session(self, session_id: SessionId) -> None:
55+
"""
56+
Closes an existing session with the Databricks SQL service.
57+
58+
This method terminates the session identified by the given session ID and
59+
releases any resources associated with it.
60+
61+
Args:
62+
session_id: The session identifier returned by open_session()
63+
64+
Raises:
65+
ValueError: If the session ID is invalid
66+
OperationalError: If there's an error closing the session
67+
"""
68+
pass
69+
70+
# == Query Execution, Command Management ==
71+
@abstractmethod
72+
def execute_command(
73+
self,
74+
operation: str,
75+
session_id: SessionId,
76+
max_rows: int,
77+
max_bytes: int,
78+
lz4_compression: bool,
79+
cursor: "Cursor",
80+
use_cloud_fetch: bool,
81+
parameters: List[ttypes.TSparkParameter],
82+
async_op: bool,
83+
enforce_embedded_schema_correctness: bool,
84+
) -> Optional[ExecuteResponse]:
85+
"""
86+
Executes a SQL command or query within the specified session.
87+
88+
This method sends a SQL command to the server for execution and handles
89+
the response. It can operate in both synchronous and asynchronous modes.
90+
91+
Args:
92+
operation: The SQL command or query to execute
93+
session_id: The session identifier in which to execute the command
94+
max_rows: Maximum number of rows to fetch in a single fetch batch
95+
max_bytes: Maximum number of bytes to fetch in a single fetch batch
96+
lz4_compression: Whether to use LZ4 compression for result data
97+
cursor: The cursor object that will handle the results
98+
use_cloud_fetch: Whether to use cloud fetch for retrieving large result sets
99+
parameters: List of parameters to bind to the query
100+
async_op: Whether to execute the command asynchronously
101+
enforce_embedded_schema_correctness: Whether to enforce schema correctness
102+
103+
Returns:
104+
If async_op is False, returns an ExecuteResponse object containing the
105+
query results and metadata. If async_op is True, returns None and the
106+
results must be fetched later using get_execution_result().
107+
108+
Raises:
109+
ValueError: If the session ID is invalid
110+
OperationalError: If there's an error executing the command
111+
ServerOperationError: If the server encounters an error during execution
112+
"""
113+
pass
114+
115+
@abstractmethod
116+
def cancel_command(self, command_id: CommandId) -> None:
117+
"""
118+
Cancels a running command or query.
119+
120+
This method attempts to cancel a command that is currently being executed.
121+
It can be called from a different thread than the one executing the command.
122+
123+
Args:
124+
command_id: The command identifier to cancel
125+
126+
Raises:
127+
ValueError: If the command ID is invalid
128+
OperationalError: If there's an error canceling the command
129+
"""
130+
pass
131+
132+
@abstractmethod
133+
def close_command(self, command_id: CommandId) -> ttypes.TStatus:
134+
"""
135+
Closes a command and releases associated resources.
136+
137+
This method informs the server that the client is done with the command
138+
and any resources associated with it can be released.
139+
140+
Args:
141+
command_id: The command identifier to close
142+
143+
Returns:
144+
ttypes.TStatus: The status of the close operation
145+
146+
Raises:
147+
ValueError: If the command ID is invalid
148+
OperationalError: If there's an error closing the command
149+
"""
150+
pass
151+
152+
@abstractmethod
153+
def get_query_state(self, command_id: CommandId) -> ttypes.TOperationState:
154+
"""
155+
Gets the current state of a query or command.
156+
157+
This method retrieves the current execution state of a command from the server.
158+
159+
Args:
160+
command_id: The command identifier to check
161+
162+
Returns:
163+
ttypes.TOperationState: The current state of the command
164+
165+
Raises:
166+
ValueError: If the command ID is invalid
167+
OperationalError: If there's an error retrieving the state
168+
ServerOperationError: If the command is in an error state
169+
DatabaseError: If the command has been closed unexpectedly
170+
"""
171+
pass
172+
173+
@abstractmethod
174+
def get_execution_result(
175+
self,
176+
command_id: CommandId,
177+
cursor: "Cursor",
178+
) -> ExecuteResponse:
179+
"""
180+
Retrieves the results of a previously executed command.
181+
182+
This method fetches the results of a command that was executed asynchronously
183+
or retrieves additional results from a command that has more rows available.
184+
185+
Args:
186+
command_id: The command identifier for which to retrieve results
187+
cursor: The cursor object that will handle the results
188+
189+
Returns:
190+
ExecuteResponse: An object containing the query results and metadata
191+
192+
Raises:
193+
ValueError: If the command ID is invalid
194+
OperationalError: If there's an error retrieving the results
195+
"""
196+
pass
197+
198+
# == Metadata Operations ==
199+
@abstractmethod
200+
def get_catalogs(
201+
self,
202+
session_id: SessionId,
203+
max_rows: int,
204+
max_bytes: int,
205+
cursor: "Cursor",
206+
) -> ExecuteResponse:
207+
"""
208+
Retrieves a list of available catalogs.
209+
210+
This method fetches metadata about all catalogs available in the current
211+
session's context.
212+
213+
Args:
214+
session_id: The session identifier
215+
max_rows: Maximum number of rows to fetch in a single batch
216+
max_bytes: Maximum number of bytes to fetch in a single batch
217+
cursor: The cursor object that will handle the results
218+
219+
Returns:
220+
ExecuteResponse: An object containing the catalog metadata
221+
222+
Raises:
223+
ValueError: If the session ID is invalid
224+
OperationalError: If there's an error retrieving the catalogs
225+
"""
226+
pass
227+
228+
@abstractmethod
229+
def get_schemas(
230+
self,
231+
session_id: SessionId,
232+
max_rows: int,
233+
max_bytes: int,
234+
cursor: "Cursor",
235+
catalog_name: Optional[str] = None,
236+
schema_name: Optional[str] = None,
237+
) -> ExecuteResponse:
238+
"""
239+
Retrieves a list of schemas, optionally filtered by catalog and schema name patterns.
240+
241+
This method fetches metadata about schemas available in the specified catalog
242+
or all catalogs if no catalog is specified.
243+
244+
Args:
245+
session_id: The session identifier
246+
max_rows: Maximum number of rows to fetch in a single batch
247+
max_bytes: Maximum number of bytes to fetch in a single batch
248+
cursor: The cursor object that will handle the results
249+
catalog_name: Optional catalog name pattern to filter by
250+
schema_name: Optional schema name pattern to filter by
251+
252+
Returns:
253+
ExecuteResponse: An object containing the schema metadata
254+
255+
Raises:
256+
ValueError: If the session ID is invalid
257+
OperationalError: If there's an error retrieving the schemas
258+
"""
259+
pass
260+
261+
@abstractmethod
262+
def get_tables(
263+
self,
264+
session_id: SessionId,
265+
max_rows: int,
266+
max_bytes: int,
267+
cursor: "Cursor",
268+
catalog_name: Optional[str] = None,
269+
schema_name: Optional[str] = None,
270+
table_name: Optional[str] = None,
271+
table_types: Optional[List[str]] = None,
272+
) -> ExecuteResponse:
273+
"""
274+
Retrieves a list of tables, optionally filtered by catalog, schema, table name, and table types.
275+
276+
This method fetches metadata about tables available in the specified catalog
277+
and schema, or all catalogs and schemas if not specified.
278+
279+
Args:
280+
session_id: The session identifier
281+
max_rows: Maximum number of rows to fetch in a single batch
282+
max_bytes: Maximum number of bytes to fetch in a single batch
283+
cursor: The cursor object that will handle the results
284+
catalog_name: Optional catalog name pattern to filter by
285+
schema_name: Optional schema name pattern to filter by
286+
table_name: Optional table name pattern to filter by
287+
table_types: Optional list of table types to filter by (e.g., ['TABLE', 'VIEW'])
288+
289+
Returns:
290+
ExecuteResponse: An object containing the table metadata
291+
292+
Raises:
293+
ValueError: If the session ID is invalid
294+
OperationalError: If there's an error retrieving the tables
295+
"""
296+
pass
297+
298+
@abstractmethod
299+
def get_columns(
300+
self,
301+
session_id: SessionId,
302+
max_rows: int,
303+
max_bytes: int,
304+
cursor: "Cursor",
305+
catalog_name: Optional[str] = None,
306+
schema_name: Optional[str] = None,
307+
table_name: Optional[str] = None,
308+
column_name: Optional[str] = None,
309+
) -> ExecuteResponse:
310+
"""
311+
Retrieves a list of columns, optionally filtered by catalog, schema, table, and column name patterns.
312+
313+
This method fetches metadata about columns available in the specified table,
314+
or all tables if not specified.
315+
316+
Args:
317+
session_id: The session identifier
318+
max_rows: Maximum number of rows to fetch in a single batch
319+
max_bytes: Maximum number of bytes to fetch in a single batch
320+
cursor: The cursor object that will handle the results
321+
catalog_name: Optional catalog name pattern to filter by
322+
schema_name: Optional schema name pattern to filter by
323+
table_name: Optional table name pattern to filter by
324+
column_name: Optional column name pattern to filter by
325+
326+
Returns:
327+
ExecuteResponse: An object containing the column metadata
328+
329+
Raises:
330+
ValueError: If the session ID is invalid
331+
OperationalError: If there's an error retrieving the columns
332+
"""
333+
pass
334+
335+
@property
336+
@abstractmethod
337+
def max_download_threads(self) -> int:
338+
"""
339+
Gets the maximum number of download threads for cloud fetch operations.
340+
341+
Returns:
342+
int: The maximum number of download threads
343+
"""
344+
pass

0 commit comments

Comments
 (0)