From 4c122b1813f18053a3f337991a9ac9d7ef9c27ae Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Thu, 15 May 2025 11:04:13 +0530 Subject: [PATCH 01/25] PECOBLR-86 Improve logging for debug level Signed-off-by: Sai Shree Pradhan --- src/databricks/sql/client.py | 3 +++ src/databricks/sql/thrift_backend.py | 7 +++++++ 2 files changed, 10 insertions(+) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index ea901c3a..46d710d9 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -214,6 +214,8 @@ def read(self) -> Optional[OAuthToken]: # use_cloud_fetch # Enable use of cloud fetch to extract large query results in parallel via cloud storage + logger.debug(f"Connection.__init__(server_hostname={server_hostname}, http_path={http_path})") + if access_token: access_token_kv = {"access_token": access_token} kwargs = {**kwargs, **access_token_kv} @@ -787,6 +789,7 @@ def execute( :returns self """ + logger.debug(f"Cursor.execute(operation={operation}, parameters={parameters})") param_approach = self._determine_parameter_approach(parameters) if param_approach == ParameterApproach.NONE: diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 2e3478d7..8b9dd9e8 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -131,6 +131,8 @@ def __init__( # max_download_threads # Number of threads for handling cloud fetch downloads. Defaults to 10 + logger.debug(f"ThriftBackend.__init__(server_hostname={server_hostname}, port={port}, http_path={http_path})") + port = port or 443 if kwargs.get("_connection_uri"): uri = kwargs.get("_connection_uri") @@ -390,6 +392,8 @@ def attempt_request(attempt): # TODO: don't use exception handling for GOS polling... + logger.debug(f"ThriftBackend.attempt_request: HTTPError: {err}") + gos_name = TCLIServiceClient.GetOperationStatus.__name__ if method.__name__ == gos_name: delay_default = ( @@ -404,6 +408,7 @@ def attempt_request(attempt): else: raise err except OSError as err: + logger.debug(f"ThriftBackend.attempt_request: OSError: {err}") error = err error_message = str(err) # fmt: off @@ -434,6 +439,7 @@ def attempt_request(attempt): else: logger.warning(log_string) except Exception as err: + logger.debug(f"ThriftBackend.attempt_request: Exception: {err}") error = err retry_delay = extract_retry_delay(attempt) error_message = ThriftBackend._extract_error_message_from_headers( @@ -1074,6 +1080,7 @@ def fetch_results( return queue, resp.hasMoreRows def close_command(self, op_handle): + logger.debug(f"ThriftBackend.close_command(op_handle={op_handle})") req = ttypes.TCloseOperationReq(operationHandle=op_handle) resp = self.make_request(self._client.CloseOperation, req) return resp.status From 23374be9a7ed96ab0e2cd982285557bc45d74feb Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Thu, 15 May 2025 11:10:28 +0530 Subject: [PATCH 02/25] PECOBLR-86 Improve logging for debug level Signed-off-by: Sai Shree Pradhan --- src/databricks/sql/thrift_backend.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 8b9dd9e8..0bcb18c7 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -894,6 +894,7 @@ def execute_command( ): assert session_handle is not None + logger.debug(f"ThriftBackend.execute_command(operation={operation}, session_handle={session_handle})") spark_arrow_types = ttypes.TSparkArrowTypes( timestampAsArrow=self._use_arrow_native_timestamps, decimalAsArrow=self._use_arrow_native_decimals, From 3971d3a3bcff1d0bda7e7caaf624820c87545670 Mon Sep 17 00:00:00 2001 From: saishreeeee Date: Thu, 15 May 2025 17:08:53 +0530 Subject: [PATCH 03/25] fixed format Signed-off-by: Sai Shree Pradhan --- src/databricks/sql/client.py | 4 +++- src/databricks/sql/thrift_backend.py | 15 ++++++++++----- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 46d710d9..08359d77 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -214,7 +214,9 @@ def read(self) -> Optional[OAuthToken]: # use_cloud_fetch # Enable use of cloud fetch to extract large query results in parallel via cloud storage - logger.debug(f"Connection.__init__(server_hostname={server_hostname}, http_path={http_path})") + logger.debug( + f"Connection.__init__(server_hostname={server_hostname}, http_path={http_path})" + ) if access_token: access_token_kv = {"access_token": access_token} diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 0bcb18c7..6e85392a 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -131,7 +131,9 @@ def __init__( # max_download_threads # Number of threads for handling cloud fetch downloads. Defaults to 10 - logger.debug(f"ThriftBackend.__init__(server_hostname={server_hostname}, port={port}, http_path={http_path})") + logger.debug( + f"ThriftBackend.__init__(server_hostname={server_hostname}, port={port}, http_path={http_path})" + ) port = port or 443 if kwargs.get("_connection_uri"): @@ -392,7 +394,7 @@ def attempt_request(attempt): # TODO: don't use exception handling for GOS polling... - logger.debug(f"ThriftBackend.attempt_request: HTTPError: {err}") + logger.error(f"ThriftBackend.attempt_request: HTTPError: {err}") gos_name = TCLIServiceClient.GetOperationStatus.__name__ if method.__name__ == gos_name: @@ -408,7 +410,7 @@ def attempt_request(attempt): else: raise err except OSError as err: - logger.debug(f"ThriftBackend.attempt_request: OSError: {err}") + logger.error(f"ThriftBackend.attempt_request: OSError: {err}") error = err error_message = str(err) # fmt: off @@ -439,7 +441,7 @@ def attempt_request(attempt): else: logger.warning(log_string) except Exception as err: - logger.debug(f"ThriftBackend.attempt_request: Exception: {err}") + logger.error(f"ThriftBackend.attempt_request: Exception: {err}") error = err retry_delay = extract_retry_delay(attempt) error_message = ThriftBackend._extract_error_message_from_headers( @@ -894,7 +896,10 @@ def execute_command( ): assert session_handle is not None - logger.debug(f"ThriftBackend.execute_command(operation={operation}, session_handle={session_handle})") + logger.debug( + f"ThriftBackend.execute_command(operation={operation}, session_handle={session_handle})" + ) + spark_arrow_types = ttypes.TSparkArrowTypes( timestampAsArrow=self._use_arrow_native_timestamps, decimalAsArrow=self._use_arrow_native_decimals, From 91e3f40bfbd3f4d4f5ed30fdc9e63fd16939ce89 Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Thu, 15 May 2025 18:09:40 +0530 Subject: [PATCH 04/25] used lazy logging Signed-off-by: Sai Shree Pradhan --- src/databricks/sql/client.py | 8 ++++++-- src/databricks/sql/thrift_backend.py | 15 ++++++++++----- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 08359d77..87f3bbb9 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -215,7 +215,9 @@ def read(self) -> Optional[OAuthToken]: # Enable use of cloud fetch to extract large query results in parallel via cloud storage logger.debug( - f"Connection.__init__(server_hostname={server_hostname}, http_path={http_path})" + "Connection.__init__(server_hostname=%s, http_path=%s)", + server_hostname, + http_path, ) if access_token: @@ -791,7 +793,9 @@ def execute( :returns self """ - logger.debug(f"Cursor.execute(operation={operation}, parameters={parameters})") + logger.debug( + "Cursor.execute(operation=%s, parameters=%s)", operation, parameters + ) param_approach = self._determine_parameter_approach(parameters) if param_approach == ParameterApproach.NONE: diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 6e85392a..f8861c67 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -132,7 +132,10 @@ def __init__( # Number of threads for handling cloud fetch downloads. Defaults to 10 logger.debug( - f"ThriftBackend.__init__(server_hostname={server_hostname}, port={port}, http_path={http_path})" + "ThriftBackend.__init__(server_hostname=%s, port=%s, http_path=%s)", + server_hostname, + port, + http_path, ) port = port or 443 @@ -394,7 +397,7 @@ def attempt_request(attempt): # TODO: don't use exception handling for GOS polling... - logger.error(f"ThriftBackend.attempt_request: HTTPError: {err}") + logger.error("ThriftBackend.attempt_request: HTTPError: %s", err) gos_name = TCLIServiceClient.GetOperationStatus.__name__ if method.__name__ == gos_name: @@ -410,7 +413,7 @@ def attempt_request(attempt): else: raise err except OSError as err: - logger.error(f"ThriftBackend.attempt_request: OSError: {err}") + logger.error("ThriftBackend.attempt_request: OSError: %s", err) error = err error_message = str(err) # fmt: off @@ -441,7 +444,7 @@ def attempt_request(attempt): else: logger.warning(log_string) except Exception as err: - logger.error(f"ThriftBackend.attempt_request: Exception: {err}") + logger.error("ThriftBackend.attempt_request: Exception: %s", err) error = err retry_delay = extract_retry_delay(attempt) error_message = ThriftBackend._extract_error_message_from_headers( @@ -897,7 +900,9 @@ def execute_command( assert session_handle is not None logger.debug( - f"ThriftBackend.execute_command(operation={operation}, session_handle={session_handle})" + "ThriftBackend.execute_command(operation=%s, session_handle=%s)", + operation, + session_handle, ) spark_arrow_types = ttypes.TSparkArrowTypes( From 6331fc1f098c7efa5f49e7a7d322ea70dc601295 Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Thu, 15 May 2025 18:27:31 +0530 Subject: [PATCH 05/25] changed debug to error logs Signed-off-by: Sai Shree Pradhan --- src/databricks/sql/thrift_backend.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index f8861c67..43f800dd 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -413,7 +413,6 @@ def attempt_request(attempt): else: raise err except OSError as err: - logger.error("ThriftBackend.attempt_request: OSError: %s", err) error = err error_message = str(err) # fmt: off From 63593a629c42ed0078a70b39aa7a4692f1c7fc27 Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Thu, 22 May 2025 16:18:06 +0530 Subject: [PATCH 06/25] added classes required for telemetry Signed-off-by: Sai Shree Pradhan --- examples/query_execute.py | 2 +- examples/test_telemetry.py | 20 +++++++++ src/databricks/sql/client.py | 45 +++++++++++++++++++ .../telemetry/DriverConnectionParameters.py | 24 ++++++++++ .../sql/telemetry/DriverErrorInfo.py | 11 +++++ .../telemetry/DriverSystemConfiguration.py | 37 +++++++++++++++ .../sql/telemetry/DriverVolumeOperation.py | 14 ++++++ .../sql/telemetry/FrontendLogContext.py | 11 +++++ .../sql/telemetry/FrontendLogEntry.py | 11 +++++ src/databricks/sql/telemetry/HostDetails.py | 11 +++++ .../sql/telemetry/NoopTelemetryClient.py | 11 +++++ .../sql/telemetry/SqlExecutionEvent.py | 15 +++++++ .../sql/telemetry/TelemetryClient.py | 36 +++++++++++++++ .../sql/telemetry/TelemetryClientContext.py | 11 +++++ .../sql/telemetry/TelemetryEvent.py | 25 +++++++++++ .../sql/telemetry/TelemetryFrontendLog.py | 15 +++++++ .../sql/telemetry/TelemetryHelper.py | 32 +++++++++++++ .../sql/telemetry/TelemetryRequest.py | 13 ++++++ .../sql/telemetry/TelemetryResponse.py | 13 ++++++ .../sql/telemetry/enums/AuthFlow.py | 8 ++++ .../sql/telemetry/enums/AuthMech.py | 7 +++ .../telemetry/enums/DatabricksClientType.py | 6 +++ .../enums/DriverVolumeOperationType.py | 10 +++++ .../telemetry/enums/ExecutionResultFormat.py | 8 ++++ .../sql/telemetry/enums/StatementType.py | 9 ++++ 25 files changed, 404 insertions(+), 1 deletion(-) create mode 100644 examples/test_telemetry.py create mode 100644 src/databricks/sql/telemetry/DriverConnectionParameters.py create mode 100644 src/databricks/sql/telemetry/DriverErrorInfo.py create mode 100644 src/databricks/sql/telemetry/DriverSystemConfiguration.py create mode 100644 src/databricks/sql/telemetry/DriverVolumeOperation.py create mode 100644 src/databricks/sql/telemetry/FrontendLogContext.py create mode 100644 src/databricks/sql/telemetry/FrontendLogEntry.py create mode 100644 src/databricks/sql/telemetry/HostDetails.py create mode 100644 src/databricks/sql/telemetry/NoopTelemetryClient.py create mode 100644 src/databricks/sql/telemetry/SqlExecutionEvent.py create mode 100644 src/databricks/sql/telemetry/TelemetryClient.py create mode 100644 src/databricks/sql/telemetry/TelemetryClientContext.py create mode 100644 src/databricks/sql/telemetry/TelemetryEvent.py create mode 100644 src/databricks/sql/telemetry/TelemetryFrontendLog.py create mode 100644 src/databricks/sql/telemetry/TelemetryHelper.py create mode 100644 src/databricks/sql/telemetry/TelemetryRequest.py create mode 100644 src/databricks/sql/telemetry/TelemetryResponse.py create mode 100644 src/databricks/sql/telemetry/enums/AuthFlow.py create mode 100644 src/databricks/sql/telemetry/enums/AuthMech.py create mode 100644 src/databricks/sql/telemetry/enums/DatabricksClientType.py create mode 100644 src/databricks/sql/telemetry/enums/DriverVolumeOperationType.py create mode 100644 src/databricks/sql/telemetry/enums/ExecutionResultFormat.py create mode 100644 src/databricks/sql/telemetry/enums/StatementType.py diff --git a/examples/query_execute.py b/examples/query_execute.py index 38d2f17a..d9ed5d8e 100644 --- a/examples/query_execute.py +++ b/examples/query_execute.py @@ -8,7 +8,7 @@ ) as connection: with connection.cursor() as cursor: - cursor.execute("SELECT * FROM default.diamonds LIMIT 2") + cursor.execute("SELECT * FROM main.eng_lumberjack.staging_frontend_log_sql_driver_log limit 1") result = cursor.fetchall() for row in result: diff --git a/examples/test_telemetry.py b/examples/test_telemetry.py new file mode 100644 index 00000000..4b419eaf --- /dev/null +++ b/examples/test_telemetry.py @@ -0,0 +1,20 @@ +import os +import databricks.sql as sql + +# Create connection with telemetry enabled +conn = sql.connect( + server_hostname=os.environ["DATABRICKS_SERVER_HOSTNAME"], + http_path=os.environ["DATABRICKS_HTTP_PATH"], + access_token=os.environ["DATABRICKS_TOKEN"], + enable_telemetry=True, # Enable telemetry + telemetry_batch_size=1 # Set batch size to 1 +) + +# Execute a simple query to generate telemetry +cursor = conn.cursor() +cursor.execute("SELECT * FROM main.eng_lumberjack.staging_frontend_log_sql_driver_log limit 1") +cursor.fetchall() + +# Close the connection +cursor.close() +conn.close() \ No newline at end of file diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 87f3bbb9..22b54df0 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -1,5 +1,21 @@ import time from typing import Dict, Tuple, List, Optional, Any, Union, Sequence +import uuid +from databricks.sql.telemetry.TelemetryEvent import TelemetryEvent +from databricks.sql.telemetry.DriverSystemConfiguration import DriverSystemConfiguration +from databricks.sql.telemetry.TelemetryClient import TelemetryClient +from databricks.sql.telemetry.NoopTelemetryClient import NoopTelemetryClient +from databricks.sql.telemetry.TelemetryFrontendLog import TelemetryFrontendLog +from databricks.sql.telemetry.FrontendLogContext import FrontendLogContext +from databricks.sql.telemetry.TelemetryClientContext import TelemetryClientContext +from databricks.sql.telemetry.FrontendLogEntry import FrontendLogEntry +from databricks.sql.auth.auth import AuthType +from databricks.sql.auth.authenticators import ( + DatabricksOAuthProvider, + ExternalAuthProvider, + AuthProvider, + AccessTokenAuthProvider, +) import pandas @@ -234,6 +250,32 @@ def read(self) -> Optional[OAuthToken]: server_hostname, **kwargs ) + self.server_telemetry_enabled = True + self.client_telemetry_enabled = kwargs.get("enable_telemetry", False) + self.telemetry_enabled = ( + self.client_telemetry_enabled and self.server_telemetry_enabled + ) + telemetry_batch_size = kwargs.get("telemetry_batch_size", 200) + + if self.telemetry_enabled: + self.telemetry_client = TelemetryClient( + host=self.host, + connection_uuid="test-connection-uuid", + auth_provider=auth_provider, + is_authenticated=( + isinstance(auth_provider, AccessTokenAuthProvider) + or isinstance(auth_provider, DatabricksOAuthProvider) + or isinstance(auth_provider, ExternalAuthProvider) + or ( + isinstance(auth_provider, AuthProvider) + and hasattr(auth_provider, "_header_factory") + ) + ), + batch_size=telemetry_batch_size, + ) + else: + self.telemetry_client = NoopTelemetryClient() + user_agent_entry = kwargs.get("user_agent_entry") if user_agent_entry is None: user_agent_entry = kwargs.get("_user_agent_entry") @@ -419,6 +461,9 @@ def _close(self, close_cursors=True) -> None: self.open = False + if hasattr(self, "telemetry_client"): + self.telemetry_client.close() + def commit(self): """No-op because Databricks does not support transactions""" pass diff --git a/src/databricks/sql/telemetry/DriverConnectionParameters.py b/src/databricks/sql/telemetry/DriverConnectionParameters.py new file mode 100644 index 00000000..0dc69512 --- /dev/null +++ b/src/databricks/sql/telemetry/DriverConnectionParameters.py @@ -0,0 +1,24 @@ +import json +from dataclasses import dataclass, asdict +from databricks.sql.telemetry.HostDetails import HostDetails +from databricks.sql.telemetry.enums.AuthMech import AuthMech +from databricks.sql.telemetry.enums.AuthFlow import AuthFlow +from databricks.sql.telemetry.enums.DatabricksClientType import DatabricksClientType + + +@dataclass +class DriverConnectionParameters: + http_path: str + driver_mode: DatabricksClientType + host_details: HostDetails + auth_mech: AuthMech + auth_flow: AuthFlow + auth_scope: str + discovery_url: str + allowed_volume_ingestion_paths: str + enable_complex_datatype_support: bool + azure_tenant_id: str + socket_timeout: int + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/DriverErrorInfo.py b/src/databricks/sql/telemetry/DriverErrorInfo.py new file mode 100644 index 00000000..83f52375 --- /dev/null +++ b/src/databricks/sql/telemetry/DriverErrorInfo.py @@ -0,0 +1,11 @@ +import json +from dataclasses import dataclass, asdict + + +@dataclass +class DriverErrorInfo: + error_name: str + stack_trace: str + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/DriverSystemConfiguration.py b/src/databricks/sql/telemetry/DriverSystemConfiguration.py new file mode 100644 index 00000000..60af0831 --- /dev/null +++ b/src/databricks/sql/telemetry/DriverSystemConfiguration.py @@ -0,0 +1,37 @@ +import json +from dataclasses import dataclass, asdict +import platform +import sys +import locale +from databricks.sql import __version__ + + +@dataclass +class DriverSystemConfiguration: + driver_version: str + os_name: str + os_version: str + os_arch: str + runtime_name: str + runtime_version: str + runtime_vendor: str + client_app_name: str + locale_name: str + driver_name: str + char_set_encoding: str + + def __init__(self): + self.driver_version = __version__ + self.os_name = platform.system() + self.os_version = platform.version() + self.os_arch = platform.machine() + self.runtime_name = platform.python_implementation() + self.runtime_version = platform.python_version() + self.runtime_vendor = sys.implementation.name + self.client_app_name = "databricks-sql-python" + self.locale_name = locale.getdefaultlocale()[0] + self.driver_name = "databricks-sql-python" + self.char_set_encoding = "UTF-8" + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/DriverVolumeOperation.py b/src/databricks/sql/telemetry/DriverVolumeOperation.py new file mode 100644 index 00000000..7a6e32e4 --- /dev/null +++ b/src/databricks/sql/telemetry/DriverVolumeOperation.py @@ -0,0 +1,14 @@ +import json +from dataclasses import dataclass, asdict +from databricks.sql.telemetry.enums.DriverVolumeOperationType import ( + DriverVolumeOperationType, +) + + +@dataclass +class DriverVolumeOperation: + volume_operation_type: DriverVolumeOperationType + volume_path: str + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/FrontendLogContext.py b/src/databricks/sql/telemetry/FrontendLogContext.py new file mode 100644 index 00000000..e6145e3e --- /dev/null +++ b/src/databricks/sql/telemetry/FrontendLogContext.py @@ -0,0 +1,11 @@ +import json +from dataclasses import dataclass, asdict +from databricks.sql.telemetry.TelemetryClientContext import TelemetryClientContext + + +@dataclass +class FrontendLogContext: + client_context: TelemetryClientContext + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/FrontendLogEntry.py b/src/databricks/sql/telemetry/FrontendLogEntry.py new file mode 100644 index 00000000..1351ec31 --- /dev/null +++ b/src/databricks/sql/telemetry/FrontendLogEntry.py @@ -0,0 +1,11 @@ +import json +from dataclasses import dataclass, asdict +from databricks.sql.telemetry.TelemetryEvent import TelemetryEvent + + +@dataclass +class FrontendLogEntry: + sql_driver_log: TelemetryEvent + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/HostDetails.py b/src/databricks/sql/telemetry/HostDetails.py new file mode 100644 index 00000000..3c288890 --- /dev/null +++ b/src/databricks/sql/telemetry/HostDetails.py @@ -0,0 +1,11 @@ +import json +from dataclasses import dataclass, asdict + + +@dataclass +class HostDetails: + host_url: str + port: int + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/NoopTelemetryClient.py b/src/databricks/sql/telemetry/NoopTelemetryClient.py new file mode 100644 index 00000000..d81f5bad --- /dev/null +++ b/src/databricks/sql/telemetry/NoopTelemetryClient.py @@ -0,0 +1,11 @@ +class NoopTelemetryClient: + # A no-operation telemetry client that implements the same interface but does nothing + + def export_event(self, event): + pass + + def flush(self): + pass + + def close(self): + pass diff --git a/src/databricks/sql/telemetry/SqlExecutionEvent.py b/src/databricks/sql/telemetry/SqlExecutionEvent.py new file mode 100644 index 00000000..9d2efae9 --- /dev/null +++ b/src/databricks/sql/telemetry/SqlExecutionEvent.py @@ -0,0 +1,15 @@ +import json +from dataclasses import dataclass, asdict +from databricks.sql.telemetry.enums.StatementType import StatementType +from databricks.sql.telemetry.enums.ExecutionResultFormat import ExecutionResultFormat + + +@dataclass +class SqlExecutionEvent: + statement_type: StatementType + is_compressed: bool + execution_result: ExecutionResultFormat + retry_count: int + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/TelemetryClient.py b/src/databricks/sql/telemetry/TelemetryClient.py new file mode 100644 index 00000000..22ee2ef6 --- /dev/null +++ b/src/databricks/sql/telemetry/TelemetryClient.py @@ -0,0 +1,36 @@ +import threading +import time +import json +import requests +from concurrent.futures import ThreadPoolExecutor + + +class TelemetryClient: + def __init__( + self, + host, + connection_uuid, + auth_provider=None, + is_authenticated=False, + batch_size=200, + ): + self.host = host + self.connection_uuid = connection_uuid + self.auth_provider = auth_provider + self.is_authenticated = is_authenticated + self.batch_size = batch_size + self.events_batch = [] + self.lock = threading.Lock() + self.executor = ThreadPoolExecutor( + max_workers=5 + ) # Thread pool for async operations + self.DriverConnectionParameters = None + + def export_event(self, event): + pass + + def flush(self): + pass + + def close(self): + pass diff --git a/src/databricks/sql/telemetry/TelemetryClientContext.py b/src/databricks/sql/telemetry/TelemetryClientContext.py new file mode 100644 index 00000000..f23e9289 --- /dev/null +++ b/src/databricks/sql/telemetry/TelemetryClientContext.py @@ -0,0 +1,11 @@ +from dataclasses import dataclass, asdict +import json + + +@dataclass +class TelemetryClientContext: + timestamp_millis: int + user_agent: str + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/TelemetryEvent.py b/src/databricks/sql/telemetry/TelemetryEvent.py new file mode 100644 index 00000000..fcbb2b77 --- /dev/null +++ b/src/databricks/sql/telemetry/TelemetryEvent.py @@ -0,0 +1,25 @@ +import json +from dataclasses import dataclass, asdict +from databricks.sql.telemetry.DriverSystemConfiguration import DriverSystemConfiguration +from databricks.sql.telemetry.DriverConnectionParameters import ( + DriverConnectionParameters, +) +from databricks.sql.telemetry.DriverVolumeOperation import DriverVolumeOperation +from databricks.sql.telemetry.SqlExecutionEvent import SqlExecutionEvent +from databricks.sql.telemetry.DriverErrorInfo import DriverErrorInfo + + +@dataclass +class TelemetryEvent: + session_id: str + sql_statement_id: str + system_configuration: DriverSystemConfiguration + driver_connection_params: DriverConnectionParameters + auth_type: str + vol_operation: DriverVolumeOperation + sql_operation: SqlExecutionEvent + error_info: DriverErrorInfo + latency: int + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/TelemetryFrontendLog.py b/src/databricks/sql/telemetry/TelemetryFrontendLog.py new file mode 100644 index 00000000..abbc1120 --- /dev/null +++ b/src/databricks/sql/telemetry/TelemetryFrontendLog.py @@ -0,0 +1,15 @@ +import json +from dataclasses import dataclass, asdict +from databricks.sql.telemetry.FrontendLogContext import FrontendLogContext +from databricks.sql.telemetry.FrontendLogEntry import FrontendLogEntry + + +@dataclass +class TelemetryFrontendLog: + workspace_id: int + frontend_log_event_id: str + context: FrontendLogContext + entry: FrontendLogEntry + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/TelemetryHelper.py b/src/databricks/sql/telemetry/TelemetryHelper.py new file mode 100644 index 00000000..e2e63585 --- /dev/null +++ b/src/databricks/sql/telemetry/TelemetryHelper.py @@ -0,0 +1,32 @@ +import platform +import sys +import uuid +import time +from typing import Optional + +from databricks.sql import __version__ +from databricks.sql.telemetry.DriverSystemConfiguration import DriverSystemConfiguration + + +class TelemetryHelper: + + # Singleton instance of DriverSystemConfiguration + _DRIVER_SYSTEM_CONFIGURATION = None + + @classmethod + def getDriverSystemConfiguration(cls) -> DriverSystemConfiguration: + if cls._DRIVER_SYSTEM_CONFIGURATION is None: + cls._DRIVER_SYSTEM_CONFIGURATION = DriverSystemConfiguration( + driverName="Databricks SQL Python Connector", + driverVersion=__version__, + runtimeName=f"Python {sys.version.split()[0]}", + runtimeVendor=platform.python_implementation(), + runtimeVersion=platform.python_version(), + osName=platform.system(), + osVersion=platform.release(), + osArch=platform.machine(), + clientAppName=None, + localeName=f"{platform.system()}_{platform.release()}", + charSetEncoding=sys.getdefaultencoding(), + ) + return cls._DRIVER_SYSTEM_CONFIGURATION diff --git a/src/databricks/sql/telemetry/TelemetryRequest.py b/src/databricks/sql/telemetry/TelemetryRequest.py new file mode 100644 index 00000000..8142e118 --- /dev/null +++ b/src/databricks/sql/telemetry/TelemetryRequest.py @@ -0,0 +1,13 @@ +import json +from dataclasses import dataclass, asdict +from typing import List, Optional + + +@dataclass +class TelemetryRequest: + uploadTime: int + items: List[str] + protoLogs: Optional[List[str]] + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/TelemetryResponse.py b/src/databricks/sql/telemetry/TelemetryResponse.py new file mode 100644 index 00000000..3b14050d --- /dev/null +++ b/src/databricks/sql/telemetry/TelemetryResponse.py @@ -0,0 +1,13 @@ +import json +from dataclasses import dataclass, asdict +from typing import List, Optional + + +@dataclass +class TelemetryResponse: + errors: List[str] + numSuccess: int + numProtoSuccess: int + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/enums/AuthFlow.py b/src/databricks/sql/telemetry/enums/AuthFlow.py new file mode 100644 index 00000000..2afc35c7 --- /dev/null +++ b/src/databricks/sql/telemetry/enums/AuthFlow.py @@ -0,0 +1,8 @@ +from enum import Enum + + +class AuthFlow(Enum): + TOKEN_PASSTHROUGH = "token_passthrough" + CLIENT_CREDENTIALS = "client_credentials" + BROWSER_BASED_AUTHENTICATION = "browser_based_authentication" + AZURE_MANAGED_IDENTITIES = "azure_managed_identities" diff --git a/src/databricks/sql/telemetry/enums/AuthMech.py b/src/databricks/sql/telemetry/enums/AuthMech.py new file mode 100644 index 00000000..6425eea4 --- /dev/null +++ b/src/databricks/sql/telemetry/enums/AuthMech.py @@ -0,0 +1,7 @@ +from enum import Enum + + +class AuthMech(Enum): + OTHER = "other" + PAT = "pat" + OAUTH = "oauth" diff --git a/src/databricks/sql/telemetry/enums/DatabricksClientType.py b/src/databricks/sql/telemetry/enums/DatabricksClientType.py new file mode 100644 index 00000000..8e08c355 --- /dev/null +++ b/src/databricks/sql/telemetry/enums/DatabricksClientType.py @@ -0,0 +1,6 @@ +from enum import Enum + + +class DatabricksClientType(Enum): + SEA = "SEA" + THRIFT = "THRIFT" diff --git a/src/databricks/sql/telemetry/enums/DriverVolumeOperationType.py b/src/databricks/sql/telemetry/enums/DriverVolumeOperationType.py new file mode 100644 index 00000000..581e56c2 --- /dev/null +++ b/src/databricks/sql/telemetry/enums/DriverVolumeOperationType.py @@ -0,0 +1,10 @@ +from enum import Enum + + +class DriverVolumeOperationType(Enum): + TYPE_UNSPECIFIED = "type_unspecified" + PUT = "put" + GET = "get" + DELETE = "delete" + LIST = "list" + QUERY = "query" diff --git a/src/databricks/sql/telemetry/enums/ExecutionResultFormat.py b/src/databricks/sql/telemetry/enums/ExecutionResultFormat.py new file mode 100644 index 00000000..23e18150 --- /dev/null +++ b/src/databricks/sql/telemetry/enums/ExecutionResultFormat.py @@ -0,0 +1,8 @@ +from enum import Enum + + +class ExecutionResultFormat(Enum): + FORMAT_UNSPECIFIED = "format_unspecified" + INLINE_ARROW = "inline_arrow" + EXTERNAL_LINKS = "external_links" + COLUMNAR_INLINE = "columnar_inline" diff --git a/src/databricks/sql/telemetry/enums/StatementType.py b/src/databricks/sql/telemetry/enums/StatementType.py new file mode 100644 index 00000000..cea86bab --- /dev/null +++ b/src/databricks/sql/telemetry/enums/StatementType.py @@ -0,0 +1,9 @@ +from enum import Enum + + +class StatementType(Enum): + NONE = "none" + QUERY = "query" + SQL = "sql" + UPDATE = "update" + METADATA = "metadata" From bb69dc9a30a8a1099cfbef34b2b15bf8bf1353a4 Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Thu, 22 May 2025 18:24:19 +0530 Subject: [PATCH 07/25] removed TelemetryHelper Signed-off-by: Sai Shree Pradhan --- src/databricks/sql/client.py | 36 ------------------- .../sql/telemetry/NoopTelemetryClient.py | 11 ------ .../sql/telemetry/TelemetryClient.py | 36 ------------------- .../sql/telemetry/TelemetryHelper.py | 32 ----------------- 4 files changed, 115 deletions(-) delete mode 100644 src/databricks/sql/telemetry/NoopTelemetryClient.py delete mode 100644 src/databricks/sql/telemetry/TelemetryClient.py delete mode 100644 src/databricks/sql/telemetry/TelemetryHelper.py diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 22b54df0..9eef14a2 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -1,22 +1,5 @@ import time from typing import Dict, Tuple, List, Optional, Any, Union, Sequence -import uuid -from databricks.sql.telemetry.TelemetryEvent import TelemetryEvent -from databricks.sql.telemetry.DriverSystemConfiguration import DriverSystemConfiguration -from databricks.sql.telemetry.TelemetryClient import TelemetryClient -from databricks.sql.telemetry.NoopTelemetryClient import NoopTelemetryClient -from databricks.sql.telemetry.TelemetryFrontendLog import TelemetryFrontendLog -from databricks.sql.telemetry.FrontendLogContext import FrontendLogContext -from databricks.sql.telemetry.TelemetryClientContext import TelemetryClientContext -from databricks.sql.telemetry.FrontendLogEntry import FrontendLogEntry -from databricks.sql.auth.auth import AuthType -from databricks.sql.auth.authenticators import ( - DatabricksOAuthProvider, - ExternalAuthProvider, - AuthProvider, - AccessTokenAuthProvider, -) - import pandas try: @@ -257,25 +240,6 @@ def read(self) -> Optional[OAuthToken]: ) telemetry_batch_size = kwargs.get("telemetry_batch_size", 200) - if self.telemetry_enabled: - self.telemetry_client = TelemetryClient( - host=self.host, - connection_uuid="test-connection-uuid", - auth_provider=auth_provider, - is_authenticated=( - isinstance(auth_provider, AccessTokenAuthProvider) - or isinstance(auth_provider, DatabricksOAuthProvider) - or isinstance(auth_provider, ExternalAuthProvider) - or ( - isinstance(auth_provider, AuthProvider) - and hasattr(auth_provider, "_header_factory") - ) - ), - batch_size=telemetry_batch_size, - ) - else: - self.telemetry_client = NoopTelemetryClient() - user_agent_entry = kwargs.get("user_agent_entry") if user_agent_entry is None: user_agent_entry = kwargs.get("_user_agent_entry") diff --git a/src/databricks/sql/telemetry/NoopTelemetryClient.py b/src/databricks/sql/telemetry/NoopTelemetryClient.py deleted file mode 100644 index d81f5bad..00000000 --- a/src/databricks/sql/telemetry/NoopTelemetryClient.py +++ /dev/null @@ -1,11 +0,0 @@ -class NoopTelemetryClient: - # A no-operation telemetry client that implements the same interface but does nothing - - def export_event(self, event): - pass - - def flush(self): - pass - - def close(self): - pass diff --git a/src/databricks/sql/telemetry/TelemetryClient.py b/src/databricks/sql/telemetry/TelemetryClient.py deleted file mode 100644 index 22ee2ef6..00000000 --- a/src/databricks/sql/telemetry/TelemetryClient.py +++ /dev/null @@ -1,36 +0,0 @@ -import threading -import time -import json -import requests -from concurrent.futures import ThreadPoolExecutor - - -class TelemetryClient: - def __init__( - self, - host, - connection_uuid, - auth_provider=None, - is_authenticated=False, - batch_size=200, - ): - self.host = host - self.connection_uuid = connection_uuid - self.auth_provider = auth_provider - self.is_authenticated = is_authenticated - self.batch_size = batch_size - self.events_batch = [] - self.lock = threading.Lock() - self.executor = ThreadPoolExecutor( - max_workers=5 - ) # Thread pool for async operations - self.DriverConnectionParameters = None - - def export_event(self, event): - pass - - def flush(self): - pass - - def close(self): - pass diff --git a/src/databricks/sql/telemetry/TelemetryHelper.py b/src/databricks/sql/telemetry/TelemetryHelper.py deleted file mode 100644 index e2e63585..00000000 --- a/src/databricks/sql/telemetry/TelemetryHelper.py +++ /dev/null @@ -1,32 +0,0 @@ -import platform -import sys -import uuid -import time -from typing import Optional - -from databricks.sql import __version__ -from databricks.sql.telemetry.DriverSystemConfiguration import DriverSystemConfiguration - - -class TelemetryHelper: - - # Singleton instance of DriverSystemConfiguration - _DRIVER_SYSTEM_CONFIGURATION = None - - @classmethod - def getDriverSystemConfiguration(cls) -> DriverSystemConfiguration: - if cls._DRIVER_SYSTEM_CONFIGURATION is None: - cls._DRIVER_SYSTEM_CONFIGURATION = DriverSystemConfiguration( - driverName="Databricks SQL Python Connector", - driverVersion=__version__, - runtimeName=f"Python {sys.version.split()[0]}", - runtimeVendor=platform.python_implementation(), - runtimeVersion=platform.python_version(), - osName=platform.system(), - osVersion=platform.release(), - osArch=platform.machine(), - clientAppName=None, - localeName=f"{platform.system()}_{platform.release()}", - charSetEncoding=sys.getdefaultencoding(), - ) - return cls._DRIVER_SYSTEM_CONFIGURATION From 4cb0d70622b6599a28ad070c6021984ffb23f35a Mon Sep 17 00:00:00 2001 From: Shivam Raj <171748731+shivam2680@users.noreply.github.com> Date: Fri, 16 May 2025 11:32:01 +0530 Subject: [PATCH 08/25] [PECOBLR-361] convert column table to arrow if arrow present (#551) Signed-off-by: Sai Shree Pradhan --- src/databricks/sql/client.py | 15 ++++++++++++++- tests/e2e/test_driver.py | 7 +++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 9eef14a2..763fd14c 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -1433,9 +1433,22 @@ def fetchall_arrow(self) -> "pyarrow.Table": while not self.has_been_closed_server_side and self.has_more_rows: self._fill_results_buffer() partial_results = self.results.remaining_rows() - results = pyarrow.concat_tables([results, partial_results]) + if isinstance(results, ColumnTable) and isinstance( + partial_results, ColumnTable + ): + results = self.merge_columnar(results, partial_results) + else: + results = pyarrow.concat_tables([results, partial_results]) self._next_row_index += partial_results.num_rows + # If PyArrow is installed and we have a ColumnTable result, convert it to PyArrow Table + # Valid only for metadata commands result set + if isinstance(results, ColumnTable) and pyarrow: + data = { + name: col + for name, col in zip(results.column_names, results.column_table) + } + return pyarrow.Table.from_pydict(data) return results def fetchall_columnar(self): diff --git a/tests/e2e/test_driver.py b/tests/e2e/test_driver.py index 8c0a4a5a..cfd56140 100644 --- a/tests/e2e/test_driver.py +++ b/tests/e2e/test_driver.py @@ -801,6 +801,13 @@ def test_decimal_not_returned_as_strings_arrow(self): decimal_type = arrow_df.field(0).type assert pyarrow.types.is_decimal(decimal_type) + @skipUnless(pysql_supports_arrow(), "arrow test needs arrow support") + def test_catalogs_returns_arrow_table(self): + with self.cursor() as cursor: + cursor.catalogs() + results = cursor.fetchall_arrow() + assert isinstance(results, pyarrow.Table) + def test_close_connection_closes_cursors(self): from databricks.sql.thrift_api.TCLIService import ttypes From d1efa032c86919c9486c4ca0e66025e0c1d6e6f0 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Wed, 21 May 2025 11:57:04 +0530 Subject: [PATCH 09/25] Update CODEOWNERS (#562) new codeowners Signed-off-by: Sai Shree Pradhan --- .github/CODEOWNERS | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 0d074c07..11d5aeb0 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -2,4 +2,4 @@ # the repo. Unless a later match takes precedence, these # users will be requested for review when someone opens a # pull request. -* @deeksha-db @samikshya-db @jprakash-db @yunbodeng-db @jackyhu-db @benc-db +* @deeksha-db @samikshya-db @jprakash-db @jackyhu-db @madhav-db @gopalldb @jayantsing-db @vikrantpuppala @shivam2680 From 2fc3cb653525b0dff89320b5964c5d727301ca11 Mon Sep 17 00:00:00 2001 From: Madhav Sainanee Date: Wed, 21 May 2025 14:28:10 +0530 Subject: [PATCH 10/25] Enhance Cursor close handling and context manager exception management to prevent server side resource leaks (#554) * Enhance Cursor close handling and context manager exception management * tests * fmt * Fix Cursor.close() to properly handle CursorAlreadyClosedError * Remove specific test message from Cursor.close() error handling * Improve error handling in connection and cursor context managers to ensure proper closure during exceptions, including KeyboardInterrupt. Add tests for nested cursor management and verify operation closure on server-side errors. * add * add Signed-off-by: Sai Shree Pradhan --- src/databricks/sql/client.py | 33 ++++++++- tests/e2e/test_driver.py | 100 ++++++++++++++++++++++++++- tests/unit/test_client.py | 129 +++++++++++++++++++++++++++++++++++ 3 files changed, 256 insertions(+), 6 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 763fd14c..bd505ac7 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -327,7 +327,13 @@ def __enter__(self) -> "Connection": return self def __exit__(self, exc_type, exc_value, traceback): - self.close() + try: + self.close() + except BaseException as e: + logger.warning(f"Exception during connection close in __exit__: {e}") + if exc_type is None: + raise + return False def __del__(self): if self.open: @@ -471,7 +477,14 @@ def __enter__(self) -> "Cursor": return self def __exit__(self, exc_type, exc_value, traceback): - self.close() + try: + logger.debug("Cursor context manager exiting, calling close()") + self.close() + except BaseException as e: + logger.warning(f"Exception during cursor close in __exit__: {e}") + if exc_type is None: + raise + return False def __iter__(self): if self.active_result_set: @@ -1181,7 +1194,21 @@ def cancel(self) -> None: def close(self) -> None: """Close cursor""" self.open = False - self.active_op_handle = None + + # Close active operation handle if it exists + if self.active_op_handle: + try: + self.thrift_backend.close_command(self.active_op_handle) + except RequestError as e: + if isinstance(e.args[1], CursorAlreadyClosedError): + logger.info("Operation was canceled by a prior request") + else: + logging.warning(f"Error closing operation handle: {e}") + except Exception as e: + logging.warning(f"Error closing operation handle: {e}") + finally: + self.active_op_handle = None + if self.active_result_set: self._close_and_clear_active_result_set() diff --git a/tests/e2e/test_driver.py b/tests/e2e/test_driver.py index cfd56140..440d4efb 100644 --- a/tests/e2e/test_driver.py +++ b/tests/e2e/test_driver.py @@ -50,7 +50,7 @@ from tests.e2e.common.uc_volume_tests import PySQLUCVolumeTestSuiteMixin -from databricks.sql.exc import SessionAlreadyClosedError +from databricks.sql.exc import SessionAlreadyClosedError, CursorAlreadyClosedError log = logging.getLogger(__name__) @@ -820,7 +820,6 @@ def test_close_connection_closes_cursors(self): ars = cursor.active_result_set # We must manually run this check because thrift_backend always forces `has_been_closed_server_side` to True - # Cursor op state should be open before connection is closed status_request = ttypes.TGetOperationStatusReq( operationHandle=ars.command_id, getProgressUpdate=False @@ -847,9 +846,104 @@ def test_closing_a_closed_connection_doesnt_fail(self, caplog): with self.connection() as conn: # First .close() call is explicit here conn.close() - assert "Session appears to have been closed already" in caplog.text + conn = None + try: + with pytest.raises(KeyboardInterrupt): + with self.connection() as c: + conn = c + raise KeyboardInterrupt("Simulated interrupt") + finally: + if conn is not None: + assert not conn.open, "Connection should be closed after KeyboardInterrupt" + + def test_cursor_close_properly_closes_operation(self): + """Test that Cursor.close() properly closes the active operation handle on the server.""" + with self.connection() as conn: + cursor = conn.cursor() + try: + cursor.execute("SELECT 1 AS test") + assert cursor.active_op_handle is not None + cursor.close() + assert cursor.active_op_handle is None + assert not cursor.open + finally: + if cursor.open: + cursor.close() + + conn = None + cursor = None + try: + with self.connection() as c: + conn = c + with pytest.raises(KeyboardInterrupt): + with conn.cursor() as cur: + cursor = cur + raise KeyboardInterrupt("Simulated interrupt") + finally: + if cursor is not None: + assert not cursor.open, "Cursor should be closed after KeyboardInterrupt" + + def test_nested_cursor_context_managers(self): + """Test that nested cursor context managers properly close operations on the server.""" + with self.connection() as conn: + with conn.cursor() as cursor1: + cursor1.execute("SELECT 1 AS test1") + assert cursor1.active_op_handle is not None + + with conn.cursor() as cursor2: + cursor2.execute("SELECT 2 AS test2") + assert cursor2.active_op_handle is not None + + # After inner context manager exit, cursor2 should be not open + assert not cursor2.open + assert cursor2.active_op_handle is None + + # After outer context manager exit, cursor1 should be not open + assert not cursor1.open + assert cursor1.active_op_handle is None + + def test_cursor_error_handling(self): + """Test that cursor close handles errors properly to prevent orphaned operations.""" + with self.connection() as conn: + cursor = conn.cursor() + + cursor.execute("SELECT 1 AS test") + + op_handle = cursor.active_op_handle + + assert op_handle is not None + + # Manually close the operation to simulate server-side closure + conn.thrift_backend.close_command(op_handle) + + cursor.close() + + assert not cursor.open + + def test_result_set_close(self): + """Test that ResultSet.close() properly closes operations on the server and handles state correctly.""" + with self.connection() as conn: + cursor = conn.cursor() + try: + cursor.execute("SELECT * FROM RANGE(10)") + + result_set = cursor.active_result_set + assert result_set is not None + + initial_op_state = result_set.op_state + + result_set.close() + + assert result_set.op_state == result_set.thrift_backend.CLOSED_OP_STATE + assert result_set.op_state != initial_op_state + + # Closing the result set again should be a no-op and not raise exceptions + result_set.close() + finally: + cursor.close() + # use a RetrySuite to encapsulate these tests which we'll typically want to run together; however keep # the 429/503 subsuites separate since they execute under different circumstances. diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index c39aeb52..5271baa7 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -20,6 +20,7 @@ import databricks.sql import databricks.sql.client as client from databricks.sql import InterfaceError, DatabaseError, Error, NotSupportedError +from databricks.sql.exc import RequestError, CursorAlreadyClosedError from databricks.sql.types import Row from tests.unit.test_fetches import FetchTests @@ -283,6 +284,15 @@ def test_context_manager_closes_cursor(self): cursor.close = mock_close mock_close.assert_called_once_with() + cursor = client.Cursor(Mock(), Mock()) + cursor.close = Mock() + try: + with self.assertRaises(KeyboardInterrupt): + with cursor: + raise KeyboardInterrupt("Simulated interrupt") + finally: + cursor.close.assert_called() + @patch("%s.client.ThriftBackend" % PACKAGE_NAME) def test_context_manager_closes_connection(self, mock_client_class): instance = mock_client_class.return_value @@ -298,6 +308,15 @@ def test_context_manager_closes_connection(self, mock_client_class): close_session_id = instance.close_session.call_args[0][0].sessionId self.assertEqual(close_session_id, b"\x22") + connection = databricks.sql.connect(**self.DUMMY_CONNECTION_ARGS) + connection.close = Mock() + try: + with self.assertRaises(KeyboardInterrupt): + with connection: + raise KeyboardInterrupt("Simulated interrupt") + finally: + connection.close.assert_called() + def dict_product(self, dicts): """ Generate cartesion product of values in input dictionary, outputting a dictionary @@ -676,6 +695,116 @@ def test_access_current_query_id(self): cursor.close() self.assertIsNone(cursor.query_id) + def test_cursor_close_handles_exception(self): + """Test that Cursor.close() handles exceptions from close_command properly.""" + mock_backend = Mock() + mock_connection = Mock() + mock_op_handle = Mock() + + mock_backend.close_command.side_effect = Exception("Test error") + + cursor = client.Cursor(mock_connection, mock_backend) + cursor.active_op_handle = mock_op_handle + + cursor.close() + + mock_backend.close_command.assert_called_once_with(mock_op_handle) + + self.assertIsNone(cursor.active_op_handle) + + self.assertFalse(cursor.open) + + def test_cursor_context_manager_handles_exit_exception(self): + """Test that cursor's context manager handles exceptions during __exit__.""" + mock_backend = Mock() + mock_connection = Mock() + + cursor = client.Cursor(mock_connection, mock_backend) + original_close = cursor.close + cursor.close = Mock(side_effect=Exception("Test error during close")) + + try: + with cursor: + raise ValueError("Test error inside context") + except ValueError: + pass + + cursor.close.assert_called_once() + + def test_connection_close_handles_cursor_close_exception(self): + """Test that _close handles exceptions from cursor.close() properly.""" + cursors_closed = [] + + def mock_close_with_exception(): + cursors_closed.append(1) + raise Exception("Test error during close") + + cursor1 = Mock() + cursor1.close = mock_close_with_exception + + def mock_close_normal(): + cursors_closed.append(2) + + cursor2 = Mock() + cursor2.close = mock_close_normal + + mock_backend = Mock() + mock_session_handle = Mock() + + try: + for cursor in [cursor1, cursor2]: + try: + cursor.close() + except Exception: + pass + + mock_backend.close_session(mock_session_handle) + except Exception as e: + self.fail(f"Connection close should handle exceptions: {e}") + + self.assertEqual(cursors_closed, [1, 2], "Both cursors should have close called") + + def test_resultset_close_handles_cursor_already_closed_error(self): + """Test that ResultSet.close() handles CursorAlreadyClosedError properly.""" + result_set = client.ResultSet.__new__(client.ResultSet) + result_set.thrift_backend = Mock() + result_set.thrift_backend.CLOSED_OP_STATE = 'CLOSED' + result_set.connection = Mock() + result_set.connection.open = True + result_set.op_state = 'RUNNING' + result_set.has_been_closed_server_side = False + result_set.command_id = Mock() + + class MockRequestError(Exception): + def __init__(self): + self.args = ["Error message", CursorAlreadyClosedError()] + + result_set.thrift_backend.close_command.side_effect = MockRequestError() + + original_close = client.ResultSet.close + try: + try: + if ( + result_set.op_state != result_set.thrift_backend.CLOSED_OP_STATE + and not result_set.has_been_closed_server_side + and result_set.connection.open + ): + result_set.thrift_backend.close_command(result_set.command_id) + except MockRequestError as e: + if isinstance(e.args[1], CursorAlreadyClosedError): + pass + finally: + result_set.has_been_closed_server_side = True + result_set.op_state = result_set.thrift_backend.CLOSED_OP_STATE + + result_set.thrift_backend.close_command.assert_called_once_with(result_set.command_id) + + assert result_set.has_been_closed_server_side is True + + assert result_set.op_state == result_set.thrift_backend.CLOSED_OP_STATE + finally: + pass + if __name__ == "__main__": suite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) From 73471e9b24289324e1b98699c9d731f290b5139b Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Fri, 23 May 2025 00:31:40 +0530 Subject: [PATCH 11/25] PECOBLR-86 improve logging on python driver (#556) * PECOBLR-86 Improve logging for debug level Signed-off-by: Sai Shree Pradhan * PECOBLR-86 Improve logging for debug level Signed-off-by: Sai Shree Pradhan * fixed format Signed-off-by: Sai Shree Pradhan * used lazy logging Signed-off-by: Sai Shree Pradhan * changed debug to error logs Signed-off-by: Sai Shree Pradhan * used lazy logging Signed-off-by: Sai Shree Pradhan --------- Signed-off-by: Sai Shree Pradhan --- src/databricks/sql/thrift_backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 43f800dd..e3dc38ad 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -1090,7 +1090,7 @@ def fetch_results( return queue, resp.hasMoreRows def close_command(self, op_handle): - logger.debug(f"ThriftBackend.close_command(op_handle={op_handle})") + logger.debug("ThriftBackend.close_command(op_handle=%s)", op_handle) req = ttypes.TCloseOperationReq(operationHandle=op_handle) resp = self.make_request(self._client.CloseOperation, req) return resp.status From 74c64637baac1887a301e6a8465b38a6fa9d856f Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Mon, 26 May 2025 10:20:45 +0530 Subject: [PATCH 12/25] Update github actions run conditions (#569) More conditions to run github actions Signed-off-by: Sai Shree Pradhan --- .github/workflows/code-quality-checks.yml | 4 ++++ .github/workflows/integration.yml | 5 +++++ 2 files changed, 9 insertions(+) diff --git a/.github/workflows/code-quality-checks.yml b/.github/workflows/code-quality-checks.yml index 4889156b..462d2236 100644 --- a/.github/workflows/code-quality-checks.yml +++ b/.github/workflows/code-quality-checks.yml @@ -3,9 +3,13 @@ on: push: branches: - main + - sea-migration + - telemetry pull_request: branches: - main + - sea-migration + - telemetry jobs: run-unit-tests: runs-on: ubuntu-latest diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 87eaa79b..ccd3a580 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -4,6 +4,11 @@ on: paths-ignore: - "**.MD" - "**.md" + pull_request: + branches: + - main + - sea-migration + - telemetry jobs: run-e2e-tests: From cbc9ebf1e405b52e799f4d56b666ab1dcce9b8f0 Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Mon, 26 May 2025 10:52:44 +0530 Subject: [PATCH 13/25] Added classes required for telemetry Signed-off-by: Sai Shree Pradhan --- .../telemetry/DriverConnectionParameters.py | 23 +++++++++++-- .../sql/telemetry/DriverErrorInfo.py | 12 +++++++ .../telemetry/DriverSystemConfiguration.py | 32 +++++++++---------- .../sql/telemetry/DriverVolumeOperation.py | 7 ++++ .../sql/telemetry/FrontendLogContext.py | 9 ++++++ src/databricks/sql/telemetry/HostDetails.py | 7 ++++ .../sql/telemetry/SqlExecutionEvent.py | 9 ++++++ .../sql/telemetry/TelemetryClientContext.py | 7 ++++ .../sql/telemetry/TelemetryEvent.py | 2 +- 9 files changed, 88 insertions(+), 20 deletions(-) diff --git a/src/databricks/sql/telemetry/DriverConnectionParameters.py b/src/databricks/sql/telemetry/DriverConnectionParameters.py index 0dc69512..544e3f4f 100644 --- a/src/databricks/sql/telemetry/DriverConnectionParameters.py +++ b/src/databricks/sql/telemetry/DriverConnectionParameters.py @@ -9,16 +9,33 @@ @dataclass class DriverConnectionParameters: http_path: str - driver_mode: DatabricksClientType - host_details: HostDetails + mode: DatabricksClientType + host_info: HostDetails auth_mech: AuthMech auth_flow: AuthFlow auth_scope: str discovery_url: str allowed_volume_ingestion_paths: str - enable_complex_datatype_support: bool azure_tenant_id: str socket_timeout: int def to_json(self): return json.dumps(asdict(self)) + + +# Part of TelemetryEvent +# DriverConnectionParameters connectionParams = new DriverConnectionParameters( +# httpPath = " /sql/1.0/endpoints/1234567890abcdef", +# driverMode = "THRIFT", +# hostDetails = new HostDetails( +# hostUrl = "https://my-workspace.cloud.databricks.com", +# port = 443 +# ), +# authMech = "OAUTH", +# authFlow = "AZURE_MANAGED_IDENTITIES", +# authScope = "sql", +# discoveryUrl = "https://example-url", +# allowedVolumeIngestionPaths = "[]", +# azureTenantId = "1234567890abcdef", +# socketTimeout = 10000 +# ) diff --git a/src/databricks/sql/telemetry/DriverErrorInfo.py b/src/databricks/sql/telemetry/DriverErrorInfo.py index 83f52375..f83c73d0 100644 --- a/src/databricks/sql/telemetry/DriverErrorInfo.py +++ b/src/databricks/sql/telemetry/DriverErrorInfo.py @@ -9,3 +9,15 @@ class DriverErrorInfo: def to_json(self): return json.dumps(asdict(self)) + + +# Required for ErrorLogs +# DriverErrorInfo errorInfo = new DriverErrorInfo( +# errorName="CONNECTION_ERROR", +# stackTrace="Connection failure while using the Databricks SQL Python connector. Failed to connect to server: https://my-workspace.cloud.databricks.com\n" + +# "databricks.sql.exc.OperationalError: Connection refused: connect\n" + +# "at databricks.sql.thrift_backend.ThriftBackend.make_request(ThriftBackend.py:329)\n" + +# "at databricks.sql.thrift_backend.ThriftBackend.attempt_request(ThriftBackend.py:366)\n" + +# "at databricks.sql.thrift_backend.ThriftBackend.open_session(ThriftBackend.py:575)\n" + +# "at databricks.sql.client.Connection.__init__(client.py:69)\n" + +# "at databricks.sql.client.connect(connection.py:123)") diff --git a/src/databricks/sql/telemetry/DriverSystemConfiguration.py b/src/databricks/sql/telemetry/DriverSystemConfiguration.py index 60af0831..b3af2132 100644 --- a/src/databricks/sql/telemetry/DriverSystemConfiguration.py +++ b/src/databricks/sql/telemetry/DriverSystemConfiguration.py @@ -1,8 +1,5 @@ import json from dataclasses import dataclass, asdict -import platform -import sys -import locale from databricks.sql import __version__ @@ -20,18 +17,21 @@ class DriverSystemConfiguration: driver_name: str char_set_encoding: str - def __init__(self): - self.driver_version = __version__ - self.os_name = platform.system() - self.os_version = platform.version() - self.os_arch = platform.machine() - self.runtime_name = platform.python_implementation() - self.runtime_version = platform.python_version() - self.runtime_vendor = sys.implementation.name - self.client_app_name = "databricks-sql-python" - self.locale_name = locale.getdefaultlocale()[0] - self.driver_name = "databricks-sql-python" - self.char_set_encoding = "UTF-8" - def to_json(self): return json.dumps(asdict(self)) + + +# Part of TelemetryEvent +# DriverSystemConfiguration systemConfig = new DriverSystemConfiguration( +# driver_version = "2.9.3", +# os_name = "Darwin", +# os_version = "24.4.0", +# os_arch = "arm64", +# runtime_name = "CPython", +# runtime_version = "3.13.3", +# runtime_vendor = "cpython", +# client_app_name = "databricks-sql-python", +# locale_name = "en_US", +# driver_name = "databricks-sql-python", +# char_set_encoding = "UTF-8" +# ) diff --git a/src/databricks/sql/telemetry/DriverVolumeOperation.py b/src/databricks/sql/telemetry/DriverVolumeOperation.py index 7a6e32e4..9cdaae43 100644 --- a/src/databricks/sql/telemetry/DriverVolumeOperation.py +++ b/src/databricks/sql/telemetry/DriverVolumeOperation.py @@ -12,3 +12,10 @@ class DriverVolumeOperation: def to_json(self): return json.dumps(asdict(self)) + + +# Part of TelemetryEvent +# DriverVolumeOperation volumeOperation = new DriverVolumeOperation( +# volumeOperationType = "LIST", +# volumePath = "/path/to/volume" +# ) diff --git a/src/databricks/sql/telemetry/FrontendLogContext.py b/src/databricks/sql/telemetry/FrontendLogContext.py index e6145e3e..55cefb31 100644 --- a/src/databricks/sql/telemetry/FrontendLogContext.py +++ b/src/databricks/sql/telemetry/FrontendLogContext.py @@ -9,3 +9,12 @@ class FrontendLogContext: def to_json(self): return json.dumps(asdict(self)) + + +# used in TelemetryFrontendLog +# FrontendLogContext frontendLogContext = new FrontendLogContext( +# clientContext = new TelemetryClientContext( +# timestampMillis = 1716489600000, +# userAgent = "databricks-sql-python-test" +# ) +# ) diff --git a/src/databricks/sql/telemetry/HostDetails.py b/src/databricks/sql/telemetry/HostDetails.py index 3c288890..e8cfb4c7 100644 --- a/src/databricks/sql/telemetry/HostDetails.py +++ b/src/databricks/sql/telemetry/HostDetails.py @@ -9,3 +9,10 @@ class HostDetails: def to_json(self): return json.dumps(asdict(self)) + + +# Part of DriverConnectionParameters +# HostDetails hostDetails = new HostDetails( +# hostUrl = "https://my-workspace.cloud.databricks.com", +# port = 443 +# ) diff --git a/src/databricks/sql/telemetry/SqlExecutionEvent.py b/src/databricks/sql/telemetry/SqlExecutionEvent.py index 9d2efae9..ca33488c 100644 --- a/src/databricks/sql/telemetry/SqlExecutionEvent.py +++ b/src/databricks/sql/telemetry/SqlExecutionEvent.py @@ -13,3 +13,12 @@ class SqlExecutionEvent: def to_json(self): return json.dumps(asdict(self)) + + +# Part of TelemetryEvent +# SqlExecutionEvent sqlExecutionEvent = new SqlExecutionEvent( +# statementType = "QUERY", +# isCompressed = true, +# executionResult = "INLINE_ARROW", +# retryCount = 0 +# ) diff --git a/src/databricks/sql/telemetry/TelemetryClientContext.py b/src/databricks/sql/telemetry/TelemetryClientContext.py index f23e9289..896b347f 100644 --- a/src/databricks/sql/telemetry/TelemetryClientContext.py +++ b/src/databricks/sql/telemetry/TelemetryClientContext.py @@ -9,3 +9,10 @@ class TelemetryClientContext: def to_json(self): return json.dumps(asdict(self)) + + +# used in FrontendLogContext +# TelemetryClientContext clientContext = new TelemetryClientContext( +# timestampMillis = 1716489600000, +# userAgent = "databricks-sql-python-test" +# ) diff --git a/src/databricks/sql/telemetry/TelemetryEvent.py b/src/databricks/sql/telemetry/TelemetryEvent.py index fcbb2b77..19727049 100644 --- a/src/databricks/sql/telemetry/TelemetryEvent.py +++ b/src/databricks/sql/telemetry/TelemetryEvent.py @@ -19,7 +19,7 @@ class TelemetryEvent: vol_operation: DriverVolumeOperation sql_operation: SqlExecutionEvent error_info: DriverErrorInfo - latency: int + operation_latency_ms: int def to_json(self): return json.dumps(asdict(self)) From 9d10e1608a7721d087d0253f5cccdabfb68c09a0 Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Mon, 26 May 2025 12:09:04 +0530 Subject: [PATCH 14/25] fixed example Signed-off-by: Sai Shree Pradhan --- examples/query_execute.py | 2 +- examples/test_telemetry.py | 20 -------------------- 2 files changed, 1 insertion(+), 21 deletions(-) delete mode 100644 examples/test_telemetry.py diff --git a/examples/query_execute.py b/examples/query_execute.py index d9ed5d8e..38d2f17a 100644 --- a/examples/query_execute.py +++ b/examples/query_execute.py @@ -8,7 +8,7 @@ ) as connection: with connection.cursor() as cursor: - cursor.execute("SELECT * FROM main.eng_lumberjack.staging_frontend_log_sql_driver_log limit 1") + cursor.execute("SELECT * FROM default.diamonds LIMIT 2") result = cursor.fetchall() for row in result: diff --git a/examples/test_telemetry.py b/examples/test_telemetry.py deleted file mode 100644 index 4b419eaf..00000000 --- a/examples/test_telemetry.py +++ /dev/null @@ -1,20 +0,0 @@ -import os -import databricks.sql as sql - -# Create connection with telemetry enabled -conn = sql.connect( - server_hostname=os.environ["DATABRICKS_SERVER_HOSTNAME"], - http_path=os.environ["DATABRICKS_HTTP_PATH"], - access_token=os.environ["DATABRICKS_TOKEN"], - enable_telemetry=True, # Enable telemetry - telemetry_batch_size=1 # Set batch size to 1 -) - -# Execute a simple query to generate telemetry -cursor = conn.cursor() -cursor.execute("SELECT * FROM main.eng_lumberjack.staging_frontend_log_sql_driver_log limit 1") -cursor.fetchall() - -# Close the connection -cursor.close() -conn.close() \ No newline at end of file From 63023278bd8708b38b01d05c999bacd875f3c815 Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Mon, 26 May 2025 18:47:32 +0530 Subject: [PATCH 15/25] changed to doc string Signed-off-by: Sai Shree Pradhan --- .../telemetry/DriverConnectionParameters.py | 32 +++++++++---------- .../sql/telemetry/DriverErrorInfo.py | 21 ++++++------ .../telemetry/DriverSystemConfiguration.py | 29 +++++++++-------- .../sql/telemetry/DriverVolumeOperation.py | 11 ++++--- .../sql/telemetry/FrontendLogContext.py | 15 +++++---- src/databricks/sql/telemetry/HostDetails.py | 11 ++++--- .../sql/telemetry/SqlExecutionEvent.py | 14 ++++---- .../sql/telemetry/TelemetryClientContext.py | 10 +++--- 8 files changed, 74 insertions(+), 69 deletions(-) diff --git a/src/databricks/sql/telemetry/DriverConnectionParameters.py b/src/databricks/sql/telemetry/DriverConnectionParameters.py index 544e3f4f..cb8c0a69 100644 --- a/src/databricks/sql/telemetry/DriverConnectionParameters.py +++ b/src/databricks/sql/telemetry/DriverConnectionParameters.py @@ -23,19 +23,19 @@ def to_json(self): return json.dumps(asdict(self)) -# Part of TelemetryEvent -# DriverConnectionParameters connectionParams = new DriverConnectionParameters( -# httpPath = " /sql/1.0/endpoints/1234567890abcdef", -# driverMode = "THRIFT", -# hostDetails = new HostDetails( -# hostUrl = "https://my-workspace.cloud.databricks.com", -# port = 443 -# ), -# authMech = "OAUTH", -# authFlow = "AZURE_MANAGED_IDENTITIES", -# authScope = "sql", -# discoveryUrl = "https://example-url", -# allowedVolumeIngestionPaths = "[]", -# azureTenantId = "1234567890abcdef", -# socketTimeout = 10000 -# ) +""" Part of TelemetryEvent +DriverConnectionParameters connectionParams = new DriverConnectionParameters( + httpPath = " /sql/1.0/endpoints/1234567890abcdef", + driverMode = "THRIFT", + hostDetails = new HostDetails( + hostUrl = "https://my-workspace.cloud.databricks.com", + port = 443 + ), + authMech = "OAUTH", + authFlow = "AZURE_MANAGED_IDENTITIES", + authScope = "sql", + discoveryUrl = "https://example-url", + allowedVolumeIngestionPaths = "[]", + azureTenantId = "1234567890abcdef", + socketTimeout = 10000 +)""" diff --git a/src/databricks/sql/telemetry/DriverErrorInfo.py b/src/databricks/sql/telemetry/DriverErrorInfo.py index f83c73d0..5ba3ff67 100644 --- a/src/databricks/sql/telemetry/DriverErrorInfo.py +++ b/src/databricks/sql/telemetry/DriverErrorInfo.py @@ -11,13 +11,14 @@ def to_json(self): return json.dumps(asdict(self)) -# Required for ErrorLogs -# DriverErrorInfo errorInfo = new DriverErrorInfo( -# errorName="CONNECTION_ERROR", -# stackTrace="Connection failure while using the Databricks SQL Python connector. Failed to connect to server: https://my-workspace.cloud.databricks.com\n" + -# "databricks.sql.exc.OperationalError: Connection refused: connect\n" + -# "at databricks.sql.thrift_backend.ThriftBackend.make_request(ThriftBackend.py:329)\n" + -# "at databricks.sql.thrift_backend.ThriftBackend.attempt_request(ThriftBackend.py:366)\n" + -# "at databricks.sql.thrift_backend.ThriftBackend.open_session(ThriftBackend.py:575)\n" + -# "at databricks.sql.client.Connection.__init__(client.py:69)\n" + -# "at databricks.sql.client.connect(connection.py:123)") +"""Required for ErrorLogs +DriverErrorInfo errorInfo = new DriverErrorInfo( + errorName="CONNECTION_ERROR", + stackTrace="Connection failure while using the Databricks SQL Python connector. Failed to connect to server: https://my-workspace.cloud.databricks.com\n" + + "databricks.sql.exc.OperationalError: Connection refused: connect\n" + + "at databricks.sql.thrift_backend.ThriftBackend.make_request(ThriftBackend.py:329)\n" + + "at databricks.sql.thrift_backend.ThriftBackend.attempt_request(ThriftBackend.py:366)\n" + + "at databricks.sql.thrift_backend.ThriftBackend.open_session(ThriftBackend.py:575)\n" + + "at databricks.sql.client.Connection.__init__(client.py:69)\n" + + "at databricks.sql.client.connect(connection.py:123)") +""" diff --git a/src/databricks/sql/telemetry/DriverSystemConfiguration.py b/src/databricks/sql/telemetry/DriverSystemConfiguration.py index b3af2132..45d41217 100644 --- a/src/databricks/sql/telemetry/DriverSystemConfiguration.py +++ b/src/databricks/sql/telemetry/DriverSystemConfiguration.py @@ -21,17 +21,18 @@ def to_json(self): return json.dumps(asdict(self)) -# Part of TelemetryEvent -# DriverSystemConfiguration systemConfig = new DriverSystemConfiguration( -# driver_version = "2.9.3", -# os_name = "Darwin", -# os_version = "24.4.0", -# os_arch = "arm64", -# runtime_name = "CPython", -# runtime_version = "3.13.3", -# runtime_vendor = "cpython", -# client_app_name = "databricks-sql-python", -# locale_name = "en_US", -# driver_name = "databricks-sql-python", -# char_set_encoding = "UTF-8" -# ) +"""Part of TelemetryEvent +DriverSystemConfiguration systemConfig = new DriverSystemConfiguration( + driver_version = "2.9.3", + os_name = "Darwin", + os_version = "24.4.0", + os_arch = "arm64", + runtime_name = "CPython", + runtime_version = "3.13.3", + runtime_vendor = "cpython", + client_app_name = "databricks-sql-python", + locale_name = "en_US", + driver_name = "databricks-sql-python", + char_set_encoding = "UTF-8" +) +""" diff --git a/src/databricks/sql/telemetry/DriverVolumeOperation.py b/src/databricks/sql/telemetry/DriverVolumeOperation.py index 9cdaae43..9db4e16e 100644 --- a/src/databricks/sql/telemetry/DriverVolumeOperation.py +++ b/src/databricks/sql/telemetry/DriverVolumeOperation.py @@ -14,8 +14,9 @@ def to_json(self): return json.dumps(asdict(self)) -# Part of TelemetryEvent -# DriverVolumeOperation volumeOperation = new DriverVolumeOperation( -# volumeOperationType = "LIST", -# volumePath = "/path/to/volume" -# ) +""" Part of TelemetryEvent +DriverVolumeOperation volumeOperation = new DriverVolumeOperation( + volumeOperationType = "LIST", + volumePath = "/path/to/volume" +) +""" diff --git a/src/databricks/sql/telemetry/FrontendLogContext.py b/src/databricks/sql/telemetry/FrontendLogContext.py index 55cefb31..8f50eb1b 100644 --- a/src/databricks/sql/telemetry/FrontendLogContext.py +++ b/src/databricks/sql/telemetry/FrontendLogContext.py @@ -11,10 +11,11 @@ def to_json(self): return json.dumps(asdict(self)) -# used in TelemetryFrontendLog -# FrontendLogContext frontendLogContext = new FrontendLogContext( -# clientContext = new TelemetryClientContext( -# timestampMillis = 1716489600000, -# userAgent = "databricks-sql-python-test" -# ) -# ) +"""used in TelemetryFrontendLog +FrontendLogContext frontendLogContext = new FrontendLogContext( + clientContext = new TelemetryClientContext( + timestampMillis = 1716489600000, + userAgent = "databricks-sql-python-test" + ) +) +""" diff --git a/src/databricks/sql/telemetry/HostDetails.py b/src/databricks/sql/telemetry/HostDetails.py index e8cfb4c7..8d537292 100644 --- a/src/databricks/sql/telemetry/HostDetails.py +++ b/src/databricks/sql/telemetry/HostDetails.py @@ -11,8 +11,9 @@ def to_json(self): return json.dumps(asdict(self)) -# Part of DriverConnectionParameters -# HostDetails hostDetails = new HostDetails( -# hostUrl = "https://my-workspace.cloud.databricks.com", -# port = 443 -# ) +""" Part of DriverConnectionParameters +HostDetails hostDetails = new HostDetails( + hostUrl = "https://my-workspace.cloud.databricks.com", + port = 443 +) +""" diff --git a/src/databricks/sql/telemetry/SqlExecutionEvent.py b/src/databricks/sql/telemetry/SqlExecutionEvent.py index ca33488c..a6ca3a1e 100644 --- a/src/databricks/sql/telemetry/SqlExecutionEvent.py +++ b/src/databricks/sql/telemetry/SqlExecutionEvent.py @@ -15,10 +15,10 @@ def to_json(self): return json.dumps(asdict(self)) -# Part of TelemetryEvent -# SqlExecutionEvent sqlExecutionEvent = new SqlExecutionEvent( -# statementType = "QUERY", -# isCompressed = true, -# executionResult = "INLINE_ARROW", -# retryCount = 0 -# ) +"""Part of TelemetryEvent +SqlExecutionEvent sqlExecutionEvent = new SqlExecutionEvent( + statementType = "QUERY", + isCompressed = true, + executionResult = "INLINE_ARROW", + retryCount = 0 +)""" diff --git a/src/databricks/sql/telemetry/TelemetryClientContext.py b/src/databricks/sql/telemetry/TelemetryClientContext.py index 896b347f..84870b73 100644 --- a/src/databricks/sql/telemetry/TelemetryClientContext.py +++ b/src/databricks/sql/telemetry/TelemetryClientContext.py @@ -11,8 +11,8 @@ def to_json(self): return json.dumps(asdict(self)) -# used in FrontendLogContext -# TelemetryClientContext clientContext = new TelemetryClientContext( -# timestampMillis = 1716489600000, -# userAgent = "databricks-sql-python-test" -# ) +"""used in FrontendLogContext +TelemetryClientContext clientContext = new TelemetryClientContext( + timestampMillis = 1716489600000, + userAgent = "databricks-sql-python-test" +)""" From e16fce5e60c2d3f1118e9af4383ed314ea5a7c1d Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Mon, 26 May 2025 18:50:33 +0530 Subject: [PATCH 16/25] removed self.telemetry close line Signed-off-by: Sai Shree Pradhan --- src/databricks/sql/client.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index bd505ac7..60be6fcf 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -431,9 +431,6 @@ def _close(self, close_cursors=True) -> None: self.open = False - if hasattr(self, "telemetry_client"): - self.telemetry_client.close() - def commit(self): """No-op because Databricks does not support transactions""" pass From 7461d96982a30416fbaaf9b8bbd5a014cc5db7a1 Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Tue, 27 May 2025 10:11:42 +0530 Subject: [PATCH 17/25] grouped classes Signed-off-by: Sai Shree Pradhan --- .../telemetry/DriverConnectionParameters.py | 41 ----- .../sql/telemetry/DriverErrorInfo.py | 24 --- .../telemetry/DriverSystemConfiguration.py | 38 ----- .../sql/telemetry/DriverVolumeOperation.py | 22 --- .../sql/telemetry/FrontendLogContext.py | 21 --- .../sql/telemetry/FrontendLogEntry.py | 11 -- src/databricks/sql/telemetry/HostDetails.py | 19 --- .../sql/telemetry/SqlExecutionEvent.py | 24 --- .../sql/telemetry/TelemetryClientContext.py | 18 -- .../sql/telemetry/TelemetryEvent.py | 25 --- .../sql/telemetry/TelemetryFrontendLog.py | 15 -- .../sql/telemetry/TelemetryResponse.py | 13 -- src/databricks/sql/telemetry/enums.py | 43 +++++ .../sql/telemetry/enums/AuthFlow.py | 8 - .../sql/telemetry/enums/AuthMech.py | 7 - .../telemetry/enums/DatabricksClientType.py | 6 - .../enums/DriverVolumeOperationType.py | 10 -- .../telemetry/enums/ExecutionResultFormat.py | 8 - .../sql/telemetry/enums/StatementType.py | 9 - ...equest.py => telemetry_endpoint_models.py} | 10 ++ .../sql/telemetry/telemetry_event.py | 159 ++++++++++++++++++ .../sql/telemetry/telemetry_frontend_log.py | 44 +++++ 22 files changed, 256 insertions(+), 319 deletions(-) delete mode 100644 src/databricks/sql/telemetry/DriverConnectionParameters.py delete mode 100644 src/databricks/sql/telemetry/DriverErrorInfo.py delete mode 100644 src/databricks/sql/telemetry/DriverSystemConfiguration.py delete mode 100644 src/databricks/sql/telemetry/DriverVolumeOperation.py delete mode 100644 src/databricks/sql/telemetry/FrontendLogContext.py delete mode 100644 src/databricks/sql/telemetry/FrontendLogEntry.py delete mode 100644 src/databricks/sql/telemetry/HostDetails.py delete mode 100644 src/databricks/sql/telemetry/SqlExecutionEvent.py delete mode 100644 src/databricks/sql/telemetry/TelemetryClientContext.py delete mode 100644 src/databricks/sql/telemetry/TelemetryEvent.py delete mode 100644 src/databricks/sql/telemetry/TelemetryFrontendLog.py delete mode 100644 src/databricks/sql/telemetry/TelemetryResponse.py create mode 100644 src/databricks/sql/telemetry/enums.py delete mode 100644 src/databricks/sql/telemetry/enums/AuthFlow.py delete mode 100644 src/databricks/sql/telemetry/enums/AuthMech.py delete mode 100644 src/databricks/sql/telemetry/enums/DatabricksClientType.py delete mode 100644 src/databricks/sql/telemetry/enums/DriverVolumeOperationType.py delete mode 100644 src/databricks/sql/telemetry/enums/ExecutionResultFormat.py delete mode 100644 src/databricks/sql/telemetry/enums/StatementType.py rename src/databricks/sql/telemetry/{TelemetryRequest.py => telemetry_endpoint_models.py} (61%) create mode 100644 src/databricks/sql/telemetry/telemetry_event.py create mode 100644 src/databricks/sql/telemetry/telemetry_frontend_log.py diff --git a/src/databricks/sql/telemetry/DriverConnectionParameters.py b/src/databricks/sql/telemetry/DriverConnectionParameters.py deleted file mode 100644 index cb8c0a69..00000000 --- a/src/databricks/sql/telemetry/DriverConnectionParameters.py +++ /dev/null @@ -1,41 +0,0 @@ -import json -from dataclasses import dataclass, asdict -from databricks.sql.telemetry.HostDetails import HostDetails -from databricks.sql.telemetry.enums.AuthMech import AuthMech -from databricks.sql.telemetry.enums.AuthFlow import AuthFlow -from databricks.sql.telemetry.enums.DatabricksClientType import DatabricksClientType - - -@dataclass -class DriverConnectionParameters: - http_path: str - mode: DatabricksClientType - host_info: HostDetails - auth_mech: AuthMech - auth_flow: AuthFlow - auth_scope: str - discovery_url: str - allowed_volume_ingestion_paths: str - azure_tenant_id: str - socket_timeout: int - - def to_json(self): - return json.dumps(asdict(self)) - - -""" Part of TelemetryEvent -DriverConnectionParameters connectionParams = new DriverConnectionParameters( - httpPath = " /sql/1.0/endpoints/1234567890abcdef", - driverMode = "THRIFT", - hostDetails = new HostDetails( - hostUrl = "https://my-workspace.cloud.databricks.com", - port = 443 - ), - authMech = "OAUTH", - authFlow = "AZURE_MANAGED_IDENTITIES", - authScope = "sql", - discoveryUrl = "https://example-url", - allowedVolumeIngestionPaths = "[]", - azureTenantId = "1234567890abcdef", - socketTimeout = 10000 -)""" diff --git a/src/databricks/sql/telemetry/DriverErrorInfo.py b/src/databricks/sql/telemetry/DriverErrorInfo.py deleted file mode 100644 index 5ba3ff67..00000000 --- a/src/databricks/sql/telemetry/DriverErrorInfo.py +++ /dev/null @@ -1,24 +0,0 @@ -import json -from dataclasses import dataclass, asdict - - -@dataclass -class DriverErrorInfo: - error_name: str - stack_trace: str - - def to_json(self): - return json.dumps(asdict(self)) - - -"""Required for ErrorLogs -DriverErrorInfo errorInfo = new DriverErrorInfo( - errorName="CONNECTION_ERROR", - stackTrace="Connection failure while using the Databricks SQL Python connector. Failed to connect to server: https://my-workspace.cloud.databricks.com\n" + - "databricks.sql.exc.OperationalError: Connection refused: connect\n" + - "at databricks.sql.thrift_backend.ThriftBackend.make_request(ThriftBackend.py:329)\n" + - "at databricks.sql.thrift_backend.ThriftBackend.attempt_request(ThriftBackend.py:366)\n" + - "at databricks.sql.thrift_backend.ThriftBackend.open_session(ThriftBackend.py:575)\n" + - "at databricks.sql.client.Connection.__init__(client.py:69)\n" + - "at databricks.sql.client.connect(connection.py:123)") -""" diff --git a/src/databricks/sql/telemetry/DriverSystemConfiguration.py b/src/databricks/sql/telemetry/DriverSystemConfiguration.py deleted file mode 100644 index 45d41217..00000000 --- a/src/databricks/sql/telemetry/DriverSystemConfiguration.py +++ /dev/null @@ -1,38 +0,0 @@ -import json -from dataclasses import dataclass, asdict -from databricks.sql import __version__ - - -@dataclass -class DriverSystemConfiguration: - driver_version: str - os_name: str - os_version: str - os_arch: str - runtime_name: str - runtime_version: str - runtime_vendor: str - client_app_name: str - locale_name: str - driver_name: str - char_set_encoding: str - - def to_json(self): - return json.dumps(asdict(self)) - - -"""Part of TelemetryEvent -DriverSystemConfiguration systemConfig = new DriverSystemConfiguration( - driver_version = "2.9.3", - os_name = "Darwin", - os_version = "24.4.0", - os_arch = "arm64", - runtime_name = "CPython", - runtime_version = "3.13.3", - runtime_vendor = "cpython", - client_app_name = "databricks-sql-python", - locale_name = "en_US", - driver_name = "databricks-sql-python", - char_set_encoding = "UTF-8" -) -""" diff --git a/src/databricks/sql/telemetry/DriverVolumeOperation.py b/src/databricks/sql/telemetry/DriverVolumeOperation.py deleted file mode 100644 index 9db4e16e..00000000 --- a/src/databricks/sql/telemetry/DriverVolumeOperation.py +++ /dev/null @@ -1,22 +0,0 @@ -import json -from dataclasses import dataclass, asdict -from databricks.sql.telemetry.enums.DriverVolumeOperationType import ( - DriverVolumeOperationType, -) - - -@dataclass -class DriverVolumeOperation: - volume_operation_type: DriverVolumeOperationType - volume_path: str - - def to_json(self): - return json.dumps(asdict(self)) - - -""" Part of TelemetryEvent -DriverVolumeOperation volumeOperation = new DriverVolumeOperation( - volumeOperationType = "LIST", - volumePath = "/path/to/volume" -) -""" diff --git a/src/databricks/sql/telemetry/FrontendLogContext.py b/src/databricks/sql/telemetry/FrontendLogContext.py deleted file mode 100644 index 8f50eb1b..00000000 --- a/src/databricks/sql/telemetry/FrontendLogContext.py +++ /dev/null @@ -1,21 +0,0 @@ -import json -from dataclasses import dataclass, asdict -from databricks.sql.telemetry.TelemetryClientContext import TelemetryClientContext - - -@dataclass -class FrontendLogContext: - client_context: TelemetryClientContext - - def to_json(self): - return json.dumps(asdict(self)) - - -"""used in TelemetryFrontendLog -FrontendLogContext frontendLogContext = new FrontendLogContext( - clientContext = new TelemetryClientContext( - timestampMillis = 1716489600000, - userAgent = "databricks-sql-python-test" - ) -) -""" diff --git a/src/databricks/sql/telemetry/FrontendLogEntry.py b/src/databricks/sql/telemetry/FrontendLogEntry.py deleted file mode 100644 index 1351ec31..00000000 --- a/src/databricks/sql/telemetry/FrontendLogEntry.py +++ /dev/null @@ -1,11 +0,0 @@ -import json -from dataclasses import dataclass, asdict -from databricks.sql.telemetry.TelemetryEvent import TelemetryEvent - - -@dataclass -class FrontendLogEntry: - sql_driver_log: TelemetryEvent - - def to_json(self): - return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/HostDetails.py b/src/databricks/sql/telemetry/HostDetails.py deleted file mode 100644 index 8d537292..00000000 --- a/src/databricks/sql/telemetry/HostDetails.py +++ /dev/null @@ -1,19 +0,0 @@ -import json -from dataclasses import dataclass, asdict - - -@dataclass -class HostDetails: - host_url: str - port: int - - def to_json(self): - return json.dumps(asdict(self)) - - -""" Part of DriverConnectionParameters -HostDetails hostDetails = new HostDetails( - hostUrl = "https://my-workspace.cloud.databricks.com", - port = 443 -) -""" diff --git a/src/databricks/sql/telemetry/SqlExecutionEvent.py b/src/databricks/sql/telemetry/SqlExecutionEvent.py deleted file mode 100644 index a6ca3a1e..00000000 --- a/src/databricks/sql/telemetry/SqlExecutionEvent.py +++ /dev/null @@ -1,24 +0,0 @@ -import json -from dataclasses import dataclass, asdict -from databricks.sql.telemetry.enums.StatementType import StatementType -from databricks.sql.telemetry.enums.ExecutionResultFormat import ExecutionResultFormat - - -@dataclass -class SqlExecutionEvent: - statement_type: StatementType - is_compressed: bool - execution_result: ExecutionResultFormat - retry_count: int - - def to_json(self): - return json.dumps(asdict(self)) - - -"""Part of TelemetryEvent -SqlExecutionEvent sqlExecutionEvent = new SqlExecutionEvent( - statementType = "QUERY", - isCompressed = true, - executionResult = "INLINE_ARROW", - retryCount = 0 -)""" diff --git a/src/databricks/sql/telemetry/TelemetryClientContext.py b/src/databricks/sql/telemetry/TelemetryClientContext.py deleted file mode 100644 index 84870b73..00000000 --- a/src/databricks/sql/telemetry/TelemetryClientContext.py +++ /dev/null @@ -1,18 +0,0 @@ -from dataclasses import dataclass, asdict -import json - - -@dataclass -class TelemetryClientContext: - timestamp_millis: int - user_agent: str - - def to_json(self): - return json.dumps(asdict(self)) - - -"""used in FrontendLogContext -TelemetryClientContext clientContext = new TelemetryClientContext( - timestampMillis = 1716489600000, - userAgent = "databricks-sql-python-test" -)""" diff --git a/src/databricks/sql/telemetry/TelemetryEvent.py b/src/databricks/sql/telemetry/TelemetryEvent.py deleted file mode 100644 index 19727049..00000000 --- a/src/databricks/sql/telemetry/TelemetryEvent.py +++ /dev/null @@ -1,25 +0,0 @@ -import json -from dataclasses import dataclass, asdict -from databricks.sql.telemetry.DriverSystemConfiguration import DriverSystemConfiguration -from databricks.sql.telemetry.DriverConnectionParameters import ( - DriverConnectionParameters, -) -from databricks.sql.telemetry.DriverVolumeOperation import DriverVolumeOperation -from databricks.sql.telemetry.SqlExecutionEvent import SqlExecutionEvent -from databricks.sql.telemetry.DriverErrorInfo import DriverErrorInfo - - -@dataclass -class TelemetryEvent: - session_id: str - sql_statement_id: str - system_configuration: DriverSystemConfiguration - driver_connection_params: DriverConnectionParameters - auth_type: str - vol_operation: DriverVolumeOperation - sql_operation: SqlExecutionEvent - error_info: DriverErrorInfo - operation_latency_ms: int - - def to_json(self): - return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/TelemetryFrontendLog.py b/src/databricks/sql/telemetry/TelemetryFrontendLog.py deleted file mode 100644 index abbc1120..00000000 --- a/src/databricks/sql/telemetry/TelemetryFrontendLog.py +++ /dev/null @@ -1,15 +0,0 @@ -import json -from dataclasses import dataclass, asdict -from databricks.sql.telemetry.FrontendLogContext import FrontendLogContext -from databricks.sql.telemetry.FrontendLogEntry import FrontendLogEntry - - -@dataclass -class TelemetryFrontendLog: - workspace_id: int - frontend_log_event_id: str - context: FrontendLogContext - entry: FrontendLogEntry - - def to_json(self): - return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/TelemetryResponse.py b/src/databricks/sql/telemetry/TelemetryResponse.py deleted file mode 100644 index 3b14050d..00000000 --- a/src/databricks/sql/telemetry/TelemetryResponse.py +++ /dev/null @@ -1,13 +0,0 @@ -import json -from dataclasses import dataclass, asdict -from typing import List, Optional - - -@dataclass -class TelemetryResponse: - errors: List[str] - numSuccess: int - numProtoSuccess: int - - def to_json(self): - return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/enums.py b/src/databricks/sql/telemetry/enums.py new file mode 100644 index 00000000..2d0107de --- /dev/null +++ b/src/databricks/sql/telemetry/enums.py @@ -0,0 +1,43 @@ +from enum import Enum + + +class AuthFlow(Enum): + TOKEN_PASSTHROUGH = "token_passthrough" + CLIENT_CREDENTIALS = "client_credentials" + BROWSER_BASED_AUTHENTICATION = "browser_based_authentication" + AZURE_MANAGED_IDENTITIES = "azure_managed_identities" + + +class AuthMech(Enum): + OTHER = "other" + PAT = "pat" + OAUTH = "oauth" + + +class DatabricksClientType(Enum): + SEA = "SEA" + THRIFT = "THRIFT" + + +class DriverVolumeOperationType(Enum): + TYPE_UNSPECIFIED = "type_unspecified" + PUT = "put" + GET = "get" + DELETE = "delete" + LIST = "list" + QUERY = "query" + + +class ExecutionResultFormat(Enum): + FORMAT_UNSPECIFIED = "format_unspecified" + INLINE_ARROW = "inline_arrow" + EXTERNAL_LINKS = "external_links" + COLUMNAR_INLINE = "columnar_inline" + + +class StatementType(Enum): + NONE = "none" + QUERY = "query" + SQL = "sql" + UPDATE = "update" + METADATA = "metadata" \ No newline at end of file diff --git a/src/databricks/sql/telemetry/enums/AuthFlow.py b/src/databricks/sql/telemetry/enums/AuthFlow.py deleted file mode 100644 index 2afc35c7..00000000 --- a/src/databricks/sql/telemetry/enums/AuthFlow.py +++ /dev/null @@ -1,8 +0,0 @@ -from enum import Enum - - -class AuthFlow(Enum): - TOKEN_PASSTHROUGH = "token_passthrough" - CLIENT_CREDENTIALS = "client_credentials" - BROWSER_BASED_AUTHENTICATION = "browser_based_authentication" - AZURE_MANAGED_IDENTITIES = "azure_managed_identities" diff --git a/src/databricks/sql/telemetry/enums/AuthMech.py b/src/databricks/sql/telemetry/enums/AuthMech.py deleted file mode 100644 index 6425eea4..00000000 --- a/src/databricks/sql/telemetry/enums/AuthMech.py +++ /dev/null @@ -1,7 +0,0 @@ -from enum import Enum - - -class AuthMech(Enum): - OTHER = "other" - PAT = "pat" - OAUTH = "oauth" diff --git a/src/databricks/sql/telemetry/enums/DatabricksClientType.py b/src/databricks/sql/telemetry/enums/DatabricksClientType.py deleted file mode 100644 index 8e08c355..00000000 --- a/src/databricks/sql/telemetry/enums/DatabricksClientType.py +++ /dev/null @@ -1,6 +0,0 @@ -from enum import Enum - - -class DatabricksClientType(Enum): - SEA = "SEA" - THRIFT = "THRIFT" diff --git a/src/databricks/sql/telemetry/enums/DriverVolumeOperationType.py b/src/databricks/sql/telemetry/enums/DriverVolumeOperationType.py deleted file mode 100644 index 581e56c2..00000000 --- a/src/databricks/sql/telemetry/enums/DriverVolumeOperationType.py +++ /dev/null @@ -1,10 +0,0 @@ -from enum import Enum - - -class DriverVolumeOperationType(Enum): - TYPE_UNSPECIFIED = "type_unspecified" - PUT = "put" - GET = "get" - DELETE = "delete" - LIST = "list" - QUERY = "query" diff --git a/src/databricks/sql/telemetry/enums/ExecutionResultFormat.py b/src/databricks/sql/telemetry/enums/ExecutionResultFormat.py deleted file mode 100644 index 23e18150..00000000 --- a/src/databricks/sql/telemetry/enums/ExecutionResultFormat.py +++ /dev/null @@ -1,8 +0,0 @@ -from enum import Enum - - -class ExecutionResultFormat(Enum): - FORMAT_UNSPECIFIED = "format_unspecified" - INLINE_ARROW = "inline_arrow" - EXTERNAL_LINKS = "external_links" - COLUMNAR_INLINE = "columnar_inline" diff --git a/src/databricks/sql/telemetry/enums/StatementType.py b/src/databricks/sql/telemetry/enums/StatementType.py deleted file mode 100644 index cea86bab..00000000 --- a/src/databricks/sql/telemetry/enums/StatementType.py +++ /dev/null @@ -1,9 +0,0 @@ -from enum import Enum - - -class StatementType(Enum): - NONE = "none" - QUERY = "query" - SQL = "sql" - UPDATE = "update" - METADATA = "metadata" diff --git a/src/databricks/sql/telemetry/TelemetryRequest.py b/src/databricks/sql/telemetry/telemetry_endpoint_models.py similarity index 61% rename from src/databricks/sql/telemetry/TelemetryRequest.py rename to src/databricks/sql/telemetry/telemetry_endpoint_models.py index 8142e118..1a8fb971 100644 --- a/src/databricks/sql/telemetry/TelemetryRequest.py +++ b/src/databricks/sql/telemetry/telemetry_endpoint_models.py @@ -11,3 +11,13 @@ class TelemetryRequest: def to_json(self): return json.dumps(asdict(self)) + + +@dataclass +class TelemetryResponse: + errors: List[str] + numSuccess: int + numProtoSuccess: int + + def to_json(self): + return json.dumps(asdict(self)) \ No newline at end of file diff --git a/src/databricks/sql/telemetry/telemetry_event.py b/src/databricks/sql/telemetry/telemetry_event.py new file mode 100644 index 00000000..6d791be5 --- /dev/null +++ b/src/databricks/sql/telemetry/telemetry_event.py @@ -0,0 +1,159 @@ +import json +from dataclasses import dataclass, asdict +from databricks.sql.telemetry.enums import AuthMech, AuthFlow, DatabricksClientType, DriverVolumeOperationType, StatementType, ExecutionResultFormat + + +@dataclass +class HostDetails: + host_url: str + port: int + + """ Part of DriverConnectionParameters + HostDetails hostDetails = new HostDetails( + hostUrl = "https://my-workspace.cloud.databricks.com", + port = 443 + ) + """ + + def to_json(self): + return json.dumps(asdict(self)) + + +@dataclass +class DriverConnectionParameters: + http_path: str + mode: DatabricksClientType + host_info: HostDetails + auth_mech: AuthMech + auth_flow: AuthFlow + auth_scope: str + discovery_url: str + allowed_volume_ingestion_paths: str + azure_tenant_id: str + socket_timeout: int + + """ Part of TelemetryEvent + DriverConnectionParameters connectionParams = new DriverConnectionParameters( + httpPath = " /sql/1.0/endpoints/1234567890abcdef", + driverMode = "THRIFT", + hostDetails = new HostDetails( + hostUrl = "https://my-workspace.cloud.databricks.com", + port = 443 + ), + authMech = "OAUTH", + authFlow = "AZURE_MANAGED_IDENTITIES", + authScope = "sql", + discoveryUrl = "https://example-url", + allowedVolumeIngestionPaths = "[]", + azureTenantId = "1234567890abcdef", + socketTimeout = 10000 + )""" + + def to_json(self): + return json.dumps(asdict(self)) + + +@dataclass +class DriverSystemConfiguration: + driver_version: str + os_name: str + os_version: str + os_arch: str + runtime_name: str + runtime_version: str + runtime_vendor: str + client_app_name: str + locale_name: str + driver_name: str + char_set_encoding: str + + """Part of TelemetryEvent + DriverSystemConfiguration systemConfig = new DriverSystemConfiguration( + driver_version = "2.9.3", + os_name = "Darwin", + os_version = "24.4.0", + os_arch = "arm64", + runtime_name = "CPython", + runtime_version = "3.13.3", + runtime_vendor = "cpython", + client_app_name = "databricks-sql-python", + locale_name = "en_US", + driver_name = "databricks-sql-python", + char_set_encoding = "UTF-8" + ) + """ + + def to_json(self): + return json.dumps(asdict(self)) + + +@dataclass +class DriverVolumeOperation: + volume_operation_type: DriverVolumeOperationType + volume_path: str + + """ Part of TelemetryEvent + DriverVolumeOperation volumeOperation = new DriverVolumeOperation( + volumeOperationType = "LIST", + volumePath = "/path/to/volume" + ) + """ + + def to_json(self): + return json.dumps(asdict(self)) + + +@dataclass +class DriverErrorInfo: + error_name: str + stack_trace: str + + """Required for ErrorLogs + DriverErrorInfo errorInfo = new DriverErrorInfo( + errorName="CONNECTION_ERROR", + stackTrace="Connection failure while using the Databricks SQL Python connector. Failed to connect to server: https://my-workspace.cloud.databricks.com\n" + + "databricks.sql.exc.OperationalError: Connection refused: connect\n" + + "at databricks.sql.thrift_backend.ThriftBackend.make_request(ThriftBackend.py:329)\n" + + "at databricks.sql.thrift_backend.ThriftBackend.attempt_request(ThriftBackend.py:366)\n" + + "at databricks.sql.thrift_backend.ThriftBackend.open_session(ThriftBackend.py:575)\n" + + "at databricks.sql.client.Connection.__init__(client.py:69)\n" + + "at databricks.sql.client.connect(connection.py:123)") + """ + + def to_json(self): + return json.dumps(asdict(self)) + + +@dataclass +class SqlExecutionEvent: + statement_type: StatementType + is_compressed: bool + execution_result: ExecutionResultFormat + retry_count: int + + """Part of TelemetryEvent + SqlExecutionEvent sqlExecutionEvent = new SqlExecutionEvent( + statementType = "QUERY", + isCompressed = true, + executionResult = "INLINE_ARROW", + retryCount = 0 + )""" + + def to_json(self): + return json.dumps(asdict(self)) + + +@dataclass +class TelemetryEvent: + session_id: str + sql_statement_id: str + system_configuration: DriverSystemConfiguration + driver_connection_params: DriverConnectionParameters + auth_type: str + vol_operation: DriverVolumeOperation + sql_operation: SqlExecutionEvent + error_info: DriverErrorInfo + operation_latency_ms: int + + def to_json(self): + return json.dumps(asdict(self)) \ No newline at end of file diff --git a/src/databricks/sql/telemetry/telemetry_frontend_log.py b/src/databricks/sql/telemetry/telemetry_frontend_log.py new file mode 100644 index 00000000..3b7d9fd4 --- /dev/null +++ b/src/databricks/sql/telemetry/telemetry_frontend_log.py @@ -0,0 +1,44 @@ +import json +from dataclasses import dataclass, asdict +from databricks.sql.telemetry.telemetry_event import TelemetryEvent + +@dataclass +class TelemetryClientContext: + timestamp_millis: int + user_agent: str + + """used in FrontendLogContext + TelemetryClientContext clientContext = new TelemetryClientContext( + timestampMillis = 1716489600000, + userAgent = "databricks-sql-python-test" + )""" + + def to_json(self): + return json.dumps(asdict(self)) + + +@dataclass +class FrontendLogContext: + client_context: TelemetryClientContext + + def to_json(self): + return json.dumps(asdict(self)) + + +@dataclass +class FrontendLogEntry: + sql_driver_log: TelemetryEvent + + def to_json(self): + return json.dumps(asdict(self)) + + +@dataclass +class TelemetryFrontendLog: + workspace_id: int + frontend_log_event_id: str + context: FrontendLogContext + entry: FrontendLogEntry + + def to_json(self): + return json.dumps(asdict(self)) From 95e43e4b8ebcd1ff39d20899dc22db7b78fadc37 Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Tue, 27 May 2025 10:13:54 +0530 Subject: [PATCH 18/25] formatting Signed-off-by: Sai Shree Pradhan --- src/databricks/sql/telemetry/enums.py | 2 +- .../telemetry/telemetry_endpoint_models.py | 2 +- .../sql/telemetry/telemetry_event.py | 19 +++++++++++++------ .../sql/telemetry/telemetry_frontend_log.py | 5 +++-- 4 files changed, 18 insertions(+), 10 deletions(-) diff --git a/src/databricks/sql/telemetry/enums.py b/src/databricks/sql/telemetry/enums.py index 2d0107de..cd7cd9a3 100644 --- a/src/databricks/sql/telemetry/enums.py +++ b/src/databricks/sql/telemetry/enums.py @@ -40,4 +40,4 @@ class StatementType(Enum): QUERY = "query" SQL = "sql" UPDATE = "update" - METADATA = "metadata" \ No newline at end of file + METADATA = "metadata" diff --git a/src/databricks/sql/telemetry/telemetry_endpoint_models.py b/src/databricks/sql/telemetry/telemetry_endpoint_models.py index 1a8fb971..1ad9c550 100644 --- a/src/databricks/sql/telemetry/telemetry_endpoint_models.py +++ b/src/databricks/sql/telemetry/telemetry_endpoint_models.py @@ -20,4 +20,4 @@ class TelemetryResponse: numProtoSuccess: int def to_json(self): - return json.dumps(asdict(self)) \ No newline at end of file + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/telemetry_event.py b/src/databricks/sql/telemetry/telemetry_event.py index 6d791be5..16878eef 100644 --- a/src/databricks/sql/telemetry/telemetry_event.py +++ b/src/databricks/sql/telemetry/telemetry_event.py @@ -1,6 +1,13 @@ import json from dataclasses import dataclass, asdict -from databricks.sql.telemetry.enums import AuthMech, AuthFlow, DatabricksClientType, DriverVolumeOperationType, StatementType, ExecutionResultFormat +from databricks.sql.telemetry.enums import ( + AuthMech, + AuthFlow, + DatabricksClientType, + DriverVolumeOperationType, + StatementType, + ExecutionResultFormat, +) @dataclass @@ -51,7 +58,7 @@ class DriverConnectionParameters: def to_json(self): return json.dumps(asdict(self)) - + @dataclass class DriverSystemConfiguration: @@ -85,7 +92,7 @@ class DriverSystemConfiguration: def to_json(self): return json.dumps(asdict(self)) - + @dataclass class DriverVolumeOperation: @@ -101,7 +108,7 @@ class DriverVolumeOperation: def to_json(self): return json.dumps(asdict(self)) - + @dataclass class DriverErrorInfo: @@ -141,7 +148,7 @@ class SqlExecutionEvent: def to_json(self): return json.dumps(asdict(self)) - + @dataclass class TelemetryEvent: @@ -156,4 +163,4 @@ class TelemetryEvent: operation_latency_ms: int def to_json(self): - return json.dumps(asdict(self)) \ No newline at end of file + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/telemetry_frontend_log.py b/src/databricks/sql/telemetry/telemetry_frontend_log.py index 3b7d9fd4..491bafaf 100644 --- a/src/databricks/sql/telemetry/telemetry_frontend_log.py +++ b/src/databricks/sql/telemetry/telemetry_frontend_log.py @@ -2,6 +2,7 @@ from dataclasses import dataclass, asdict from databricks.sql.telemetry.telemetry_event import TelemetryEvent + @dataclass class TelemetryClientContext: timestamp_millis: int @@ -15,7 +16,7 @@ class TelemetryClientContext: def to_json(self): return json.dumps(asdict(self)) - + @dataclass class FrontendLogContext: @@ -23,7 +24,7 @@ class FrontendLogContext: def to_json(self): return json.dumps(asdict(self)) - + @dataclass class FrontendLogEntry: From d72fb271d2c185ba1390a7057502b8763db96d97 Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Tue, 27 May 2025 11:04:46 +0530 Subject: [PATCH 19/25] fixed doc string Signed-off-by: Sai Shree Pradhan --- .../sql/telemetry/telemetry_event.py | 101 +++++++++++------- .../sql/telemetry/telemetry_frontend_log.py | 12 ++- 2 files changed, 69 insertions(+), 44 deletions(-) diff --git a/src/databricks/sql/telemetry/telemetry_event.py b/src/databricks/sql/telemetry/telemetry_event.py index 16878eef..1a5744c7 100644 --- a/src/databricks/sql/telemetry/telemetry_event.py +++ b/src/databricks/sql/telemetry/telemetry_event.py @@ -12,34 +12,29 @@ @dataclass class HostDetails: - host_url: str - port: int + """ + Part of DriverConnectionParameters - """ Part of DriverConnectionParameters + Example: HostDetails hostDetails = new HostDetails( hostUrl = "https://my-workspace.cloud.databricks.com", port = 443 ) """ + host_url: str + port: int + def to_json(self): return json.dumps(asdict(self)) @dataclass class DriverConnectionParameters: - http_path: str - mode: DatabricksClientType - host_info: HostDetails - auth_mech: AuthMech - auth_flow: AuthFlow - auth_scope: str - discovery_url: str - allowed_volume_ingestion_paths: str - azure_tenant_id: str - socket_timeout: int + """ + Part of TelemetryEvent - """ Part of TelemetryEvent + Example: DriverConnectionParameters connectionParams = new DriverConnectionParameters( httpPath = " /sql/1.0/endpoints/1234567890abcdef", driverMode = "THRIFT", @@ -54,7 +49,19 @@ class DriverConnectionParameters: allowedVolumeIngestionPaths = "[]", azureTenantId = "1234567890abcdef", socketTimeout = 10000 - )""" + ) + """ + + http_path: str + mode: DatabricksClientType + host_info: HostDetails + auth_mech: AuthMech + auth_flow: AuthFlow + auth_scope: str + discovery_url: str + allowed_volume_ingestion_paths: str + azure_tenant_id: str + socket_timeout: int def to_json(self): return json.dumps(asdict(self)) @@ -62,19 +69,10 @@ def to_json(self): @dataclass class DriverSystemConfiguration: - driver_version: str - os_name: str - os_version: str - os_arch: str - runtime_name: str - runtime_version: str - runtime_vendor: str - client_app_name: str - locale_name: str - driver_name: str - char_set_encoding: str + """ + Part of TelemetryEvent - """Part of TelemetryEvent + Example: DriverSystemConfiguration systemConfig = new DriverSystemConfiguration( driver_version = "2.9.3", os_name = "Darwin", @@ -90,32 +88,47 @@ class DriverSystemConfiguration: ) """ + driver_version: str + os_name: str + os_version: str + os_arch: str + runtime_name: str + runtime_version: str + runtime_vendor: str + client_app_name: str + locale_name: str + driver_name: str + char_set_encoding: str + def to_json(self): return json.dumps(asdict(self)) @dataclass class DriverVolumeOperation: - volume_operation_type: DriverVolumeOperationType - volume_path: str + """ + Part of TelemetryEvent - """ Part of TelemetryEvent + Example: DriverVolumeOperation volumeOperation = new DriverVolumeOperation( volumeOperationType = "LIST", volumePath = "/path/to/volume" ) """ + volume_operation_type: DriverVolumeOperationType + volume_path: str + def to_json(self): return json.dumps(asdict(self)) @dataclass class DriverErrorInfo: - error_name: str - stack_trace: str + """ + Required for ErrorLogs - """Required for ErrorLogs + Example: DriverErrorInfo errorInfo = new DriverErrorInfo( errorName="CONNECTION_ERROR", stackTrace="Connection failure while using the Databricks SQL Python connector. Failed to connect to server: https://my-workspace.cloud.databricks.com\n" + @@ -124,27 +137,35 @@ class DriverErrorInfo: "at databricks.sql.thrift_backend.ThriftBackend.attempt_request(ThriftBackend.py:366)\n" + "at databricks.sql.thrift_backend.ThriftBackend.open_session(ThriftBackend.py:575)\n" + "at databricks.sql.client.Connection.__init__(client.py:69)\n" + - "at databricks.sql.client.connect(connection.py:123)") + "at databricks.sql.client.connect(connection.py:123)" + ) """ + error_name: str + stack_trace: str + def to_json(self): return json.dumps(asdict(self)) @dataclass class SqlExecutionEvent: - statement_type: StatementType - is_compressed: bool - execution_result: ExecutionResultFormat - retry_count: int + """ + Part of TelemetryEvent - """Part of TelemetryEvent + Example: SqlExecutionEvent sqlExecutionEvent = new SqlExecutionEvent( statementType = "QUERY", isCompressed = true, executionResult = "INLINE_ARROW", retryCount = 0 - )""" + ) + """ + + statement_type: StatementType + is_compressed: bool + execution_result: ExecutionResultFormat + retry_count: int def to_json(self): return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/telemetry_frontend_log.py b/src/databricks/sql/telemetry/telemetry_frontend_log.py index 491bafaf..e2ba498d 100644 --- a/src/databricks/sql/telemetry/telemetry_frontend_log.py +++ b/src/databricks/sql/telemetry/telemetry_frontend_log.py @@ -5,14 +5,18 @@ @dataclass class TelemetryClientContext: - timestamp_millis: int - user_agent: str + """ + Used in FrontendLogContext - """used in FrontendLogContext + Example: TelemetryClientContext clientContext = new TelemetryClientContext( timestampMillis = 1716489600000, userAgent = "databricks-sql-python-test" - )""" + ) + """ + + timestamp_millis: int + user_agent: str def to_json(self): return json.dumps(asdict(self)) From 28efabaaeb2dc69aa4a111ba1e569a206d784f6a Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Tue, 27 May 2025 11:07:30 +0530 Subject: [PATCH 20/25] fixed doc string Signed-off-by: Sai Shree Pradhan --- src/databricks/sql/telemetry/telemetry_event.py | 12 ++++++------ .../sql/telemetry/telemetry_frontend_log.py | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/databricks/sql/telemetry/telemetry_event.py b/src/databricks/sql/telemetry/telemetry_event.py index 1a5744c7..4c51d1c6 100644 --- a/src/databricks/sql/telemetry/telemetry_event.py +++ b/src/databricks/sql/telemetry/telemetry_event.py @@ -13,7 +13,7 @@ @dataclass class HostDetails: """ - Part of DriverConnectionParameters + Part of DriverConnectionParameters. Example: HostDetails hostDetails = new HostDetails( @@ -32,7 +32,7 @@ def to_json(self): @dataclass class DriverConnectionParameters: """ - Part of TelemetryEvent + Part of TelemetryEvent. Example: DriverConnectionParameters connectionParams = new DriverConnectionParameters( @@ -70,7 +70,7 @@ def to_json(self): @dataclass class DriverSystemConfiguration: """ - Part of TelemetryEvent + Part of TelemetryEvent. Example: DriverSystemConfiguration systemConfig = new DriverSystemConfiguration( @@ -107,7 +107,7 @@ def to_json(self): @dataclass class DriverVolumeOperation: """ - Part of TelemetryEvent + Part of TelemetryEvent. Example: DriverVolumeOperation volumeOperation = new DriverVolumeOperation( @@ -126,7 +126,7 @@ def to_json(self): @dataclass class DriverErrorInfo: """ - Required for ErrorLogs + Required for ErrorLogs. Example: DriverErrorInfo errorInfo = new DriverErrorInfo( @@ -151,7 +151,7 @@ def to_json(self): @dataclass class SqlExecutionEvent: """ - Part of TelemetryEvent + Part of TelemetryEvent. Example: SqlExecutionEvent sqlExecutionEvent = new SqlExecutionEvent( diff --git a/src/databricks/sql/telemetry/telemetry_frontend_log.py b/src/databricks/sql/telemetry/telemetry_frontend_log.py index e2ba498d..87e30bd0 100644 --- a/src/databricks/sql/telemetry/telemetry_frontend_log.py +++ b/src/databricks/sql/telemetry/telemetry_frontend_log.py @@ -6,7 +6,7 @@ @dataclass class TelemetryClientContext: """ - Used in FrontendLogContext + Used in FrontendLogContext. Example: TelemetryClientContext clientContext = new TelemetryClientContext( From c8c08ddbdd252ff97f75cff88d991a4083b83dfc Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Wed, 28 May 2025 10:10:43 +0530 Subject: [PATCH 21/25] added more descriptive comments, put dataclasses in a sub-folder Signed-off-by: Sai Shree Pradhan --- .../models/telemetry_endpoint_models.py | 43 ++++ .../sql/telemetry/models/telemetry_event.py | 189 ++++++++++++++++++ .../models/telemetry_frontend_log.py | 76 +++++++ .../telemetry/telemetry_endpoint_models.py | 23 --- .../sql/telemetry/telemetry_event.py | 187 ----------------- .../sql/telemetry/telemetry_frontend_log.py | 49 ----- 6 files changed, 308 insertions(+), 259 deletions(-) create mode 100644 src/databricks/sql/telemetry/models/telemetry_endpoint_models.py create mode 100644 src/databricks/sql/telemetry/models/telemetry_event.py create mode 100644 src/databricks/sql/telemetry/models/telemetry_frontend_log.py delete mode 100644 src/databricks/sql/telemetry/telemetry_endpoint_models.py delete mode 100644 src/databricks/sql/telemetry/telemetry_event.py delete mode 100644 src/databricks/sql/telemetry/telemetry_frontend_log.py diff --git a/src/databricks/sql/telemetry/models/telemetry_endpoint_models.py b/src/databricks/sql/telemetry/models/telemetry_endpoint_models.py new file mode 100644 index 00000000..a940d933 --- /dev/null +++ b/src/databricks/sql/telemetry/models/telemetry_endpoint_models.py @@ -0,0 +1,43 @@ +import json +from dataclasses import dataclass, asdict +from typing import List, Optional + + +@dataclass +class TelemetryRequest: + """ + Represents a request to send telemetry data to the server side. + Contains the telemetry items to be uploaded and optional protocol buffer logs. + + Attributes: + uploadTime (int): Unix timestamp in milliseconds when the request is made + items (List[str]): List of telemetry event items to be uploaded + protoLogs (Optional[List[str]]): Optional list of protocol buffer formatted logs + """ + + uploadTime: int + items: List[str] + protoLogs: Optional[List[str]] + + def to_json(self): + return json.dumps(asdict(self)) + + +@dataclass +class TelemetryResponse: + """ + Represents the response from the telemetry backend after processing a request. + Contains information about the success or failure of the telemetry upload. + + Attributes: + errors (List[str]): List of error messages if any occurred during processing + numSuccess (int): Number of successfully processed telemetry items + numProtoSuccess (int): Number of successfully processed protocol buffer logs + """ + + errors: List[str] + numSuccess: int + numProtoSuccess: int + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/models/telemetry_event.py b/src/databricks/sql/telemetry/models/telemetry_event.py new file mode 100644 index 00000000..2b7b99ce --- /dev/null +++ b/src/databricks/sql/telemetry/models/telemetry_event.py @@ -0,0 +1,189 @@ +import json +from dataclasses import dataclass, asdict +from databricks.sql.telemetry.enums import ( + AuthMech, + AuthFlow, + DatabricksClientType, + DriverVolumeOperationType, + StatementType, + ExecutionResultFormat, +) +from typing import Optional + + +@dataclass +class HostDetails: + """ + Represents the host connection details for a Databricks workspace. + + Attributes: + host_url (str): The URL of the Databricks workspace (e.g., https://my-workspace.cloud.databricks.com) + port (int): The port number for the connection (typically 443 for HTTPS) + """ + + host_url: str + port: int + + def to_json(self): + return json.dumps(asdict(self)) + + +@dataclass +class DriverConnectionParameters: + """ + Contains all connection parameters used to establish a connection to Databricks SQL. + This includes authentication details, host information, and connection settings. + + Attributes: + http_path (str): The HTTP path for the SQL endpoint + mode (DatabricksClientType): The type of client connection (e.g., THRIFT) + host_info (HostDetails): Details about the host connection + auth_mech (AuthMech): The authentication mechanism used + auth_flow (AuthFlow): The authentication flow type + auth_scope (str): The scope of authentication + discovery_url (str): URL for service discovery + allowed_volume_ingestion_paths (str): JSON string of allowed paths for volume operations + azure_tenant_id (str): Azure tenant ID for Azure authentication + socket_timeout (int): Connection timeout in milliseconds + """ + + http_path: str + mode: DatabricksClientType + host_info: HostDetails + auth_mech: AuthMech + auth_flow: AuthFlow + auth_scope: str + discovery_url: str + allowed_volume_ingestion_paths: str + azure_tenant_id: str + socket_timeout: int + + def to_json(self): + return json.dumps(asdict(self)) + + +@dataclass +class DriverSystemConfiguration: + """ + Contains system-level configuration information about the client environment. + This includes details about the operating system, runtime, and driver version. + + Attributes: + driver_version (str): Version of the Databricks SQL driver + os_name (str): Name of the operating system + os_version (str): Version of the operating system + os_arch (str): Architecture of the operating system + runtime_name (str): Name of the Python runtime (e.g., CPython) + runtime_version (str): Version of the Python runtime + runtime_vendor (str): Vendor of the Python runtime + client_app_name (str): Name of the client application + locale_name (str): System locale setting + driver_name (str): Name of the driver + char_set_encoding (str): Character set encoding used + """ + + driver_version: str + os_name: str + os_version: str + os_arch: str + runtime_name: str + runtime_version: str + runtime_vendor: str + client_app_name: str + locale_name: str + driver_name: str + char_set_encoding: str + + def to_json(self): + return json.dumps(asdict(self)) + + +@dataclass +class DriverVolumeOperation: + """ + Represents a volume operation performed by the driver. + Used for tracking volume-related operations in telemetry. + + Attributes: + volume_operation_type (DriverVolumeOperationType): Type of volume operation (e.g., LIST) + volume_path (str): Path to the volume being operated on + """ + + volume_operation_type: DriverVolumeOperationType + volume_path: str + + def to_json(self): + return json.dumps(asdict(self)) + + +@dataclass +class DriverErrorInfo: + """ + Contains detailed information about errors that occur during driver operations. + Used for error tracking and debugging in telemetry. + + Attributes: + error_name (str): Name/type of the error + stack_trace (str): Full stack trace of the error + """ + + error_name: str + stack_trace: str + + def to_json(self): + return json.dumps(asdict(self)) + + +@dataclass +class SqlExecutionEvent: + """ + Represents a SQL query execution event. + Contains details about the query execution, including type, compression, and result format. + + Attributes: + statement_type (StatementType): Type of SQL statement + is_compressed (bool): Whether the result is compressed + execution_result (ExecutionResultFormat): Format of the execution result + retry_count (int): Number of retry attempts made + """ + + statement_type: StatementType + is_compressed: bool + execution_result: ExecutionResultFormat + retry_count: int + + def to_json(self): + return json.dumps(asdict(self)) + + +@dataclass +class TelemetryEvent: + """ + Main telemetry event class that aggregates all telemetry data. + Contains information about the session, system configuration, connection parameters, + and any operations or errors that occurred. + + Attributes: + session_id (str): Unique identifier for the session + sql_statement_id (Optional[str]): ID of the SQL statement if applicable + system_configuration (DriverSystemConfiguration): System configuration details + driver_connection_params (DriverConnectionParameters): Connection parameters + auth_type (Optional[str]): Type of authentication used + vol_operation (Optional[DriverVolumeOperation]): Volume operation details if applicable + sql_operation (Optional[SqlExecutionEvent]): SQL execution details if applicable + error_info (Optional[DriverErrorInfo]): Error information if an error occurred + operation_latency_ms (Optional[int]): Operation latency in milliseconds + """ + + session_id: str + sql_statement_id: Optional[str] = None + system_configuration: DriverSystemConfiguration + driver_connection_params: DriverConnectionParameters + auth_type: Optional[str] = None + vol_operation: Optional[DriverVolumeOperation] = None + sql_operation: Optional[SqlExecutionEvent] = None + error_info: Optional[DriverErrorInfo] = None + operation_latency_ms: Optional[int] = None + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/models/telemetry_frontend_log.py b/src/databricks/sql/telemetry/models/telemetry_frontend_log.py new file mode 100644 index 00000000..5c080d66 --- /dev/null +++ b/src/databricks/sql/telemetry/models/telemetry_frontend_log.py @@ -0,0 +1,76 @@ +import json +from dataclasses import dataclass, asdict +from databricks.sql.telemetry.models.telemetry_event import TelemetryEvent + + +@dataclass +class TelemetryClientContext: + """ + Contains client-side context information for telemetry events. + This includes timestamp and user agent information for tracking when and how the client is being used. + + Attributes: + timestamp_millis (int): Unix timestamp in milliseconds when the event occurred + user_agent (str): Identifier for the client application making the request + """ + + timestamp_millis: int + user_agent: str + + def to_json(self): + return json.dumps(asdict(self)) + + +@dataclass +class FrontendLogContext: + """ + Wrapper for client context information in frontend logs. + Provides additional context about the client environment for telemetry events. + + Attributes: + client_context (TelemetryClientContext): Client-specific context information + """ + + client_context: TelemetryClientContext + + def to_json(self): + return json.dumps(asdict(self)) + + +@dataclass +class FrontendLogEntry: + """ + Contains the actual telemetry event data in a frontend log. + Wraps the SQL driver log information for frontend processing. + + Attributes: + sql_driver_log (TelemetryEvent): The telemetry event containing SQL driver information + """ + + sql_driver_log: TelemetryEvent + + def to_json(self): + return json.dumps(asdict(self)) + + +@dataclass +class TelemetryFrontendLog: + """ + Main container for frontend telemetry data. + Aggregates workspace information, event ID, context, and the actual log entry. + Used for sending telemetry data to the server side. + + Attributes: + workspace_id (int): Unique identifier for the Databricks workspace + frontend_log_event_id (str): Unique identifier for this telemetry event + context (FrontendLogContext): Context information about the client + entry (FrontendLogEntry): The actual telemetry event data + """ + + workspace_id: int + frontend_log_event_id: str + context: FrontendLogContext + entry: FrontendLogEntry + + def to_json(self): + return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/telemetry_endpoint_models.py b/src/databricks/sql/telemetry/telemetry_endpoint_models.py deleted file mode 100644 index 1ad9c550..00000000 --- a/src/databricks/sql/telemetry/telemetry_endpoint_models.py +++ /dev/null @@ -1,23 +0,0 @@ -import json -from dataclasses import dataclass, asdict -from typing import List, Optional - - -@dataclass -class TelemetryRequest: - uploadTime: int - items: List[str] - protoLogs: Optional[List[str]] - - def to_json(self): - return json.dumps(asdict(self)) - - -@dataclass -class TelemetryResponse: - errors: List[str] - numSuccess: int - numProtoSuccess: int - - def to_json(self): - return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/telemetry_event.py b/src/databricks/sql/telemetry/telemetry_event.py deleted file mode 100644 index 4c51d1c6..00000000 --- a/src/databricks/sql/telemetry/telemetry_event.py +++ /dev/null @@ -1,187 +0,0 @@ -import json -from dataclasses import dataclass, asdict -from databricks.sql.telemetry.enums import ( - AuthMech, - AuthFlow, - DatabricksClientType, - DriverVolumeOperationType, - StatementType, - ExecutionResultFormat, -) - - -@dataclass -class HostDetails: - """ - Part of DriverConnectionParameters. - - Example: - HostDetails hostDetails = new HostDetails( - hostUrl = "https://my-workspace.cloud.databricks.com", - port = 443 - ) - """ - - host_url: str - port: int - - def to_json(self): - return json.dumps(asdict(self)) - - -@dataclass -class DriverConnectionParameters: - """ - Part of TelemetryEvent. - - Example: - DriverConnectionParameters connectionParams = new DriverConnectionParameters( - httpPath = " /sql/1.0/endpoints/1234567890abcdef", - driverMode = "THRIFT", - hostDetails = new HostDetails( - hostUrl = "https://my-workspace.cloud.databricks.com", - port = 443 - ), - authMech = "OAUTH", - authFlow = "AZURE_MANAGED_IDENTITIES", - authScope = "sql", - discoveryUrl = "https://example-url", - allowedVolumeIngestionPaths = "[]", - azureTenantId = "1234567890abcdef", - socketTimeout = 10000 - ) - """ - - http_path: str - mode: DatabricksClientType - host_info: HostDetails - auth_mech: AuthMech - auth_flow: AuthFlow - auth_scope: str - discovery_url: str - allowed_volume_ingestion_paths: str - azure_tenant_id: str - socket_timeout: int - - def to_json(self): - return json.dumps(asdict(self)) - - -@dataclass -class DriverSystemConfiguration: - """ - Part of TelemetryEvent. - - Example: - DriverSystemConfiguration systemConfig = new DriverSystemConfiguration( - driver_version = "2.9.3", - os_name = "Darwin", - os_version = "24.4.0", - os_arch = "arm64", - runtime_name = "CPython", - runtime_version = "3.13.3", - runtime_vendor = "cpython", - client_app_name = "databricks-sql-python", - locale_name = "en_US", - driver_name = "databricks-sql-python", - char_set_encoding = "UTF-8" - ) - """ - - driver_version: str - os_name: str - os_version: str - os_arch: str - runtime_name: str - runtime_version: str - runtime_vendor: str - client_app_name: str - locale_name: str - driver_name: str - char_set_encoding: str - - def to_json(self): - return json.dumps(asdict(self)) - - -@dataclass -class DriverVolumeOperation: - """ - Part of TelemetryEvent. - - Example: - DriverVolumeOperation volumeOperation = new DriverVolumeOperation( - volumeOperationType = "LIST", - volumePath = "/path/to/volume" - ) - """ - - volume_operation_type: DriverVolumeOperationType - volume_path: str - - def to_json(self): - return json.dumps(asdict(self)) - - -@dataclass -class DriverErrorInfo: - """ - Required for ErrorLogs. - - Example: - DriverErrorInfo errorInfo = new DriverErrorInfo( - errorName="CONNECTION_ERROR", - stackTrace="Connection failure while using the Databricks SQL Python connector. Failed to connect to server: https://my-workspace.cloud.databricks.com\n" + - "databricks.sql.exc.OperationalError: Connection refused: connect\n" + - "at databricks.sql.thrift_backend.ThriftBackend.make_request(ThriftBackend.py:329)\n" + - "at databricks.sql.thrift_backend.ThriftBackend.attempt_request(ThriftBackend.py:366)\n" + - "at databricks.sql.thrift_backend.ThriftBackend.open_session(ThriftBackend.py:575)\n" + - "at databricks.sql.client.Connection.__init__(client.py:69)\n" + - "at databricks.sql.client.connect(connection.py:123)" - ) - """ - - error_name: str - stack_trace: str - - def to_json(self): - return json.dumps(asdict(self)) - - -@dataclass -class SqlExecutionEvent: - """ - Part of TelemetryEvent. - - Example: - SqlExecutionEvent sqlExecutionEvent = new SqlExecutionEvent( - statementType = "QUERY", - isCompressed = true, - executionResult = "INLINE_ARROW", - retryCount = 0 - ) - """ - - statement_type: StatementType - is_compressed: bool - execution_result: ExecutionResultFormat - retry_count: int - - def to_json(self): - return json.dumps(asdict(self)) - - -@dataclass -class TelemetryEvent: - session_id: str - sql_statement_id: str - system_configuration: DriverSystemConfiguration - driver_connection_params: DriverConnectionParameters - auth_type: str - vol_operation: DriverVolumeOperation - sql_operation: SqlExecutionEvent - error_info: DriverErrorInfo - operation_latency_ms: int - - def to_json(self): - return json.dumps(asdict(self)) diff --git a/src/databricks/sql/telemetry/telemetry_frontend_log.py b/src/databricks/sql/telemetry/telemetry_frontend_log.py deleted file mode 100644 index 87e30bd0..00000000 --- a/src/databricks/sql/telemetry/telemetry_frontend_log.py +++ /dev/null @@ -1,49 +0,0 @@ -import json -from dataclasses import dataclass, asdict -from databricks.sql.telemetry.telemetry_event import TelemetryEvent - - -@dataclass -class TelemetryClientContext: - """ - Used in FrontendLogContext. - - Example: - TelemetryClientContext clientContext = new TelemetryClientContext( - timestampMillis = 1716489600000, - userAgent = "databricks-sql-python-test" - ) - """ - - timestamp_millis: int - user_agent: str - - def to_json(self): - return json.dumps(asdict(self)) - - -@dataclass -class FrontendLogContext: - client_context: TelemetryClientContext - - def to_json(self): - return json.dumps(asdict(self)) - - -@dataclass -class FrontendLogEntry: - sql_driver_log: TelemetryEvent - - def to_json(self): - return json.dumps(asdict(self)) - - -@dataclass -class TelemetryFrontendLog: - workspace_id: int - frontend_log_event_id: str - context: FrontendLogContext - entry: FrontendLogEntry - - def to_json(self): - return json.dumps(asdict(self)) From 74ea9b687b1236ab206fc38b426ec4bb52e3da47 Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Wed, 28 May 2025 10:18:05 +0530 Subject: [PATCH 22/25] fixed default attributes ordering Signed-off-by: Sai Shree Pradhan --- src/databricks/sql/telemetry/models/telemetry_event.py | 2 +- src/databricks/sql/telemetry/models/telemetry_frontend_log.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/databricks/sql/telemetry/models/telemetry_event.py b/src/databricks/sql/telemetry/models/telemetry_event.py index 2b7b99ce..ca51ee89 100644 --- a/src/databricks/sql/telemetry/models/telemetry_event.py +++ b/src/databricks/sql/telemetry/models/telemetry_event.py @@ -176,9 +176,9 @@ class TelemetryEvent: """ session_id: str - sql_statement_id: Optional[str] = None system_configuration: DriverSystemConfiguration driver_connection_params: DriverConnectionParameters + sql_statement_id: Optional[str] = None auth_type: Optional[str] = None vol_operation: Optional[DriverVolumeOperation] = None sql_operation: Optional[SqlExecutionEvent] = None diff --git a/src/databricks/sql/telemetry/models/telemetry_frontend_log.py b/src/databricks/sql/telemetry/models/telemetry_frontend_log.py index 5c080d66..aaf328b6 100644 --- a/src/databricks/sql/telemetry/models/telemetry_frontend_log.py +++ b/src/databricks/sql/telemetry/models/telemetry_frontend_log.py @@ -1,6 +1,7 @@ import json from dataclasses import dataclass, asdict from databricks.sql.telemetry.models.telemetry_event import TelemetryEvent +from typing import Optional @dataclass @@ -67,10 +68,10 @@ class TelemetryFrontendLog: entry (FrontendLogEntry): The actual telemetry event data """ - workspace_id: int frontend_log_event_id: str context: FrontendLogContext entry: FrontendLogEntry + workspace_id: Optional[int] = None def to_json(self): return json.dumps(asdict(self)) From ac7881fadbf7737564b298301b34382f359aaf9e Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Wed, 28 May 2025 14:32:37 +0530 Subject: [PATCH 23/25] changed file names Signed-off-by: Sai Shree Pradhan --- .../models/{telemetry_endpoint_models.py => endpoint_models.py} | 0 .../sql/telemetry/models/{telemetry_event.py => event.py} | 0 .../models/{telemetry_frontend_log.py => frontend_logs.py} | 0 3 files changed, 0 insertions(+), 0 deletions(-) rename src/databricks/sql/telemetry/models/{telemetry_endpoint_models.py => endpoint_models.py} (100%) rename src/databricks/sql/telemetry/models/{telemetry_event.py => event.py} (100%) rename src/databricks/sql/telemetry/models/{telemetry_frontend_log.py => frontend_logs.py} (100%) diff --git a/src/databricks/sql/telemetry/models/telemetry_endpoint_models.py b/src/databricks/sql/telemetry/models/endpoint_models.py similarity index 100% rename from src/databricks/sql/telemetry/models/telemetry_endpoint_models.py rename to src/databricks/sql/telemetry/models/endpoint_models.py diff --git a/src/databricks/sql/telemetry/models/telemetry_event.py b/src/databricks/sql/telemetry/models/event.py similarity index 100% rename from src/databricks/sql/telemetry/models/telemetry_event.py rename to src/databricks/sql/telemetry/models/event.py diff --git a/src/databricks/sql/telemetry/models/telemetry_frontend_log.py b/src/databricks/sql/telemetry/models/frontend_logs.py similarity index 100% rename from src/databricks/sql/telemetry/models/telemetry_frontend_log.py rename to src/databricks/sql/telemetry/models/frontend_logs.py From 6219d38b94a51a38e952976d629fd14bb7efe2c8 Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Thu, 29 May 2025 09:07:22 +0530 Subject: [PATCH 24/25] added enums to models folder Signed-off-by: Sai Shree Pradhan --- src/databricks/sql/telemetry/{ => models}/enums.py | 0 src/databricks/sql/telemetry/models/event.py | 2 +- src/databricks/sql/telemetry/models/frontend_logs.py | 2 +- 3 files changed, 2 insertions(+), 2 deletions(-) rename src/databricks/sql/telemetry/{ => models}/enums.py (100%) diff --git a/src/databricks/sql/telemetry/enums.py b/src/databricks/sql/telemetry/models/enums.py similarity index 100% rename from src/databricks/sql/telemetry/enums.py rename to src/databricks/sql/telemetry/models/enums.py diff --git a/src/databricks/sql/telemetry/models/event.py b/src/databricks/sql/telemetry/models/event.py index ca51ee89..03ce5c5d 100644 --- a/src/databricks/sql/telemetry/models/event.py +++ b/src/databricks/sql/telemetry/models/event.py @@ -1,6 +1,6 @@ import json from dataclasses import dataclass, asdict -from databricks.sql.telemetry.enums import ( +from databricks.sql.telemetry.models.enums import ( AuthMech, AuthFlow, DatabricksClientType, diff --git a/src/databricks/sql/telemetry/models/frontend_logs.py b/src/databricks/sql/telemetry/models/frontend_logs.py index aaf328b6..953e39b3 100644 --- a/src/databricks/sql/telemetry/models/frontend_logs.py +++ b/src/databricks/sql/telemetry/models/frontend_logs.py @@ -1,6 +1,6 @@ import json from dataclasses import dataclass, asdict -from databricks.sql.telemetry.models.telemetry_event import TelemetryEvent +from databricks.sql.telemetry.models.event import TelemetryEvent from typing import Optional From 6305323d7541f2e5089edfc8aa78544e37a1a893 Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Thu, 29 May 2025 10:25:17 +0530 Subject: [PATCH 25/25] removed telemetry batch size Signed-off-by: Sai Shree Pradhan --- src/databricks/sql/client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 60be6fcf..9446d87a 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -238,7 +238,6 @@ def read(self) -> Optional[OAuthToken]: self.telemetry_enabled = ( self.client_telemetry_enabled and self.server_telemetry_enabled ) - telemetry_batch_size = kwargs.get("telemetry_batch_size", 200) user_agent_entry = kwargs.get("user_agent_entry") if user_agent_entry is None: