-
Notifications
You must be signed in to change notification settings - Fork 112
SeaDatabricksClient: Add Metadata Commands #593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: sea-migration
Are you sure you want to change the base?
Changes from 105 commits
138c2ae
3e3ab94
4a78165
0dac4aa
1b794c7
da5a6fe
686ade4
31e6c83
69ea238
66d7517
71feef9
ae9862f
d8aa69e
db139bc
b977b12
da615c0
0da04a6
ea9d456
8985c62
d9bcdbe
ee9fa1c
24c6152
67fd101
271fcaf
bf26ea3
ed7cf91
dae15e3
db5bbea
d5d3699
6137a3d
75b0773
4494dcd
4d0aeca
7cece5e
8977c06
0216d7a
4cb15fd
dee47f7
e385d5b
484064e
030edf8
30f8266
033ae73
33821f4
3e22c6c
787f1f7
165c4f3
a6e40d0
52e3088
641c09b
8bd12d8
ffded6e
227f6b3
68657a3
3940eec
37813ba
267c9f4
2967119
47fd60d
982fdf2
9e14d48
be1997e
e8e8ee7
05ee4e7
3ffa898
2952d8d
89e2aa0
cbace3f
c075b07
c62f76d
199402e
8ac574b
398ca70
b1acc5b
ef2a7ee
699942d
af8f74e
5540c5c
efe3881
36ab59b
1d57c99
df6dac2
ad0e527
ed446a0
38e4b5c
94879c0
1809956
da5260c
0385ffb
349c021
6229848
fd52356
64e58b0
0a2cdfd
90bb09c
cd22389
82e0f8b
e64b81b
5ab9bbe
1ab6e87
f469c24
68ec65f
ffd478e
f6d873d
28675f5
3578659
8713023
22dc252
390f592
35f1ef0
a515d26
59b1330
293e356
dd40beb
14057ac
a4d5bdb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
""" | ||
Client-side filtering utilities for Databricks SQL connector. | ||
This module provides filtering capabilities for result sets returned by different backends. | ||
""" | ||
|
||
import logging | ||
from typing import ( | ||
List, | ||
Optional, | ||
Any, | ||
Callable, | ||
cast, | ||
TYPE_CHECKING, | ||
) | ||
|
||
from databricks.sql.backend.sea.backend import SeaDatabricksClient | ||
from databricks.sql.backend.types import ExecuteResponse | ||
|
||
if TYPE_CHECKING: | ||
varun-edachali-dbx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
from databricks.sql.result_set import ResultSet, SeaResultSet | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class ResultSetFilter: | ||
""" | ||
A general-purpose filter for result sets that can be applied to any backend. | ||
varun-edachali-dbx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
This class provides methods to filter result sets based on various criteria, | ||
similar to the client-side filtering in the JDBC connector. | ||
varun-edachali-dbx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
|
||
@staticmethod | ||
def _filter_sea_result_set( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is specific to SEA result set and can't be used for a generic result set class? let's try to make it generic for a result set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need some service specific methods at some point during the filtering process to know what kind of result set to return, since our concrete instances are service specific. I tried to keep the root methods invoked ( |
||
result_set: "SeaResultSet", filter_func: Callable[[List[Any]], bool] | ||
) -> "SeaResultSet": | ||
""" | ||
Filter a SEA result set using the provided filter function. | ||
Args: | ||
result_set: The SEA result set to filter | ||
filter_func: Function that takes a row and returns True if the row should be included | ||
Returns: | ||
A filtered SEA result set | ||
""" | ||
|
||
# Get all remaining rows | ||
all_rows = result_set.results.remaining_rows() | ||
|
||
# Filter rows | ||
filtered_rows = [row for row in all_rows if filter_func(row)] | ||
|
||
# Import SeaResultSet here to avoid circular imports | ||
from databricks.sql.result_set import SeaResultSet | ||
|
||
# Reuse the command_id from the original result set | ||
command_id = result_set.command_id | ||
|
||
# Create an ExecuteResponse with the filtered data | ||
execute_response = ExecuteResponse( | ||
command_id=command_id, | ||
status=result_set.status, | ||
description=result_set.description, | ||
has_been_closed_server_side=result_set.has_been_closed_server_side, | ||
lz4_compressed=result_set.lz4_compressed, | ||
arrow_schema_bytes=result_set._arrow_schema_bytes, | ||
is_staging_operation=False, | ||
) | ||
|
||
# Create a new ResultData object with filtered data | ||
from databricks.sql.backend.sea.models.base import ResultData | ||
|
||
result_data = ResultData(data=filtered_rows, external_links=None) | ||
|
||
# Create a new SeaResultSet with the filtered data | ||
filtered_result_set = SeaResultSet( | ||
connection=result_set.connection, | ||
execute_response=execute_response, | ||
sea_client=cast(SeaDatabricksClient, result_set.backend), | ||
buffer_size_bytes=result_set.buffer_size_bytes, | ||
arraysize=result_set.arraysize, | ||
result_data=result_data, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could you remind me what is the significance of this result_data param in result set? is this present in the base class? Is this an optional param and is used to create a result set with hard-coded rows? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is not present in the base class, it is an instance of a |
||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the whole implementation can be improved. you are essentially first downloading the complete result set and then initialising a new one. a filter method should ideally just take the object to be filtered and return true/false on it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Introducing a filter method that is utilised in the fetch phase would lead to a lot of specialised code for the table queries during the fetch phase. Currently, all that the execution relevant methods ( The fetch phase from this step on is completely invariant of the kind of query that took place. If we want to use a separate |
||
|
||
return filtered_result_set | ||
|
||
@staticmethod | ||
def filter_by_column_values( | ||
result_set: "ResultSet", | ||
column_index: int, | ||
allowed_values: List[str], | ||
case_sensitive: bool = False, | ||
) -> "ResultSet": | ||
""" | ||
Filter a result set by values in a specific column. | ||
Args: | ||
result_set: The result set to filter | ||
column_index: The index of the column to filter on | ||
allowed_values: List of allowed values for the column | ||
case_sensitive: Whether to perform case-sensitive comparison | ||
Returns: | ||
A filtered result set | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as above |
||
""" | ||
|
||
# Convert to uppercase for case-insensitive comparison if needed | ||
if not case_sensitive: | ||
allowed_values = [v.upper() for v in allowed_values] | ||
|
||
# Determine the type of result set and apply appropriate filtering | ||
from databricks.sql.result_set import SeaResultSet | ||
|
||
if isinstance(result_set, SeaResultSet): | ||
return ResultSetFilter._filter_sea_result_set( | ||
result_set, | ||
lambda row: ( | ||
len(row) > column_index | ||
and isinstance(row[column_index], str) | ||
and ( | ||
row[column_index].upper() | ||
if not case_sensitive | ||
else row[column_index] | ||
) | ||
in allowed_values | ||
), | ||
) | ||
|
||
# For other result set types, return the original (should be handled by specific implementations) | ||
logger.warning( | ||
f"Filtering not implemented for result set type: {type(result_set).__name__}" | ||
) | ||
return result_set | ||
|
||
@staticmethod | ||
def filter_tables_by_type( | ||
result_set: "ResultSet", table_types: Optional[List[str]] = None | ||
) -> "ResultSet": | ||
""" | ||
Filter a result set of tables by the specified table types. | ||
This is a client-side filter that processes the result set after it has been | ||
retrieved from the server. It filters out tables whose type does not match | ||
any of the types in the table_types list. | ||
Args: | ||
result_set: The original result set containing tables | ||
table_types: List of table types to include (e.g., ["TABLE", "VIEW"]) | ||
Returns: | ||
A filtered result set containing only tables of the specified types | ||
""" | ||
|
||
# Default table types if none specified | ||
DEFAULT_TABLE_TYPES = ["TABLE", "VIEW", "SYSTEM TABLE"] | ||
valid_types = ( | ||
table_types if table_types and len(table_types) > 0 else DEFAULT_TABLE_TYPES | ||
) | ||
|
||
# Table type is the 6th column (index 5) | ||
return ResultSetFilter.filter_by_column_values( | ||
result_set, 5, valid_types, case_sensitive=True | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as above |
Uh oh!
There was an error while loading. Please reload this page.