-
Notifications
You must be signed in to change notification settings - Fork 111
Separate Session related functionality from Connection class #571
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f97c81d
fe0af87
18f8f67
fd8decb
1a92b77
1b9a50a
0bf2794
ff35165
0df486a
63b10c3
f2b3fd5
53f16ab
a026751
0d6995c
923bbb6
8df8c33
bcf5994
500dd0b
510b454
634faa9
a32862b
88b728d
ed04584
ff842d7
9190a33
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,160 @@ | ||||||||||||
import logging | ||||||||||||
from typing import Dict, Tuple, List, Optional, Any | ||||||||||||
|
||||||||||||
from databricks.sql.thrift_api.TCLIService import ttypes | ||||||||||||
from databricks.sql.types import SSLOptions | ||||||||||||
from databricks.sql.auth.auth import get_python_sql_connector_auth_provider | ||||||||||||
from databricks.sql.exc import SessionAlreadyClosedError, DatabaseError, RequestError | ||||||||||||
from databricks.sql import __version__ | ||||||||||||
from databricks.sql import USER_AGENT_NAME | ||||||||||||
from databricks.sql.thrift_backend import ThriftBackend | ||||||||||||
|
||||||||||||
logger = logging.getLogger(__name__) | ||||||||||||
|
||||||||||||
|
||||||||||||
class Session: | ||||||||||||
def __init__( | ||||||||||||
self, | ||||||||||||
server_hostname: str, | ||||||||||||
varun-edachali-dbx marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
http_path: str, | ||||||||||||
http_headers: Optional[List[Tuple[str, str]]] = None, | ||||||||||||
session_configuration: Optional[Dict[str, Any]] = None, | ||||||||||||
catalog: Optional[str] = None, | ||||||||||||
schema: Optional[str] = None, | ||||||||||||
_use_arrow_native_complex_types: Optional[bool] = True, | ||||||||||||
**kwargs, | ||||||||||||
Comment on lines
+17
to
+25
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree, that makes sense. Shouldn't we name it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe Session is an internal abstraction and these params originate from Connection so better to name ConnectionParams. Additionally, i think this change might break a lot of things. Let's do it completely separately (let's log a JIRA ticket for now and take it up later) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||
) -> None: | ||||||||||||
""" | ||||||||||||
Create a session to a Databricks SQL endpoint or a Databricks cluster. | ||||||||||||
|
||||||||||||
This class handles all session-related behavior and communication with the backend. | ||||||||||||
""" | ||||||||||||
self.is_open = False | ||||||||||||
self.host = server_hostname | ||||||||||||
self.port = kwargs.get("_port", 443) | ||||||||||||
|
||||||||||||
self.session_configuration = session_configuration | ||||||||||||
self.catalog = catalog | ||||||||||||
self.schema = schema | ||||||||||||
|
||||||||||||
auth_provider = get_python_sql_connector_auth_provider( | ||||||||||||
server_hostname, **kwargs | ||||||||||||
) | ||||||||||||
|
||||||||||||
user_agent_entry = kwargs.get("user_agent_entry") | ||||||||||||
if user_agent_entry is None: | ||||||||||||
user_agent_entry = kwargs.get("_user_agent_entry") | ||||||||||||
if user_agent_entry is not None: | ||||||||||||
logger.warning( | ||||||||||||
"[WARN] Parameter '_user_agent_entry' is deprecated; use 'user_agent_entry' instead. " | ||||||||||||
"This parameter will be removed in the upcoming releases." | ||||||||||||
) | ||||||||||||
|
||||||||||||
if user_agent_entry: | ||||||||||||
useragent_header = "{}/{} ({})".format( | ||||||||||||
USER_AGENT_NAME, __version__, user_agent_entry | ||||||||||||
) | ||||||||||||
else: | ||||||||||||
useragent_header = "{}/{}".format(USER_AGENT_NAME, __version__) | ||||||||||||
|
||||||||||||
base_headers = [("User-Agent", useragent_header)] | ||||||||||||
|
||||||||||||
self._ssl_options = SSLOptions( | ||||||||||||
# Double negation is generally a bad thing, but we have to keep backward compatibility | ||||||||||||
tls_verify=not kwargs.get( | ||||||||||||
"_tls_no_verify", False | ||||||||||||
), # by default - verify cert and host | ||||||||||||
tls_verify_hostname=kwargs.get("_tls_verify_hostname", True), | ||||||||||||
tls_trusted_ca_file=kwargs.get("_tls_trusted_ca_file"), | ||||||||||||
tls_client_cert_file=kwargs.get("_tls_client_cert_file"), | ||||||||||||
tls_client_cert_key_file=kwargs.get("_tls_client_cert_key_file"), | ||||||||||||
tls_client_cert_key_password=kwargs.get("_tls_client_cert_key_password"), | ||||||||||||
) | ||||||||||||
|
||||||||||||
self.thrift_backend = ThriftBackend( | ||||||||||||
self.host, | ||||||||||||
self.port, | ||||||||||||
http_path, | ||||||||||||
(http_headers or []) + base_headers, | ||||||||||||
auth_provider, | ||||||||||||
ssl_options=self._ssl_options, | ||||||||||||
_use_arrow_native_complex_types=_use_arrow_native_complex_types, | ||||||||||||
**kwargs, | ||||||||||||
) | ||||||||||||
|
||||||||||||
self._handle = None | ||||||||||||
self.protocol_version = None | ||||||||||||
|
||||||||||||
def open(self) -> None: | ||||||||||||
self._open_session_resp = self.thrift_backend.open_session( | ||||||||||||
self.session_configuration, self.catalog, self.schema | ||||||||||||
) | ||||||||||||
self._handle = self._open_session_resp.sessionHandle | ||||||||||||
self.protocol_version = self.get_protocol_version(self._open_session_resp) | ||||||||||||
self.is_open = True | ||||||||||||
logger.info("Successfully opened session " + str(self.get_id_hex())) | ||||||||||||
|
||||||||||||
@staticmethod | ||||||||||||
def get_protocol_version(openSessionResp): | ||||||||||||
""" | ||||||||||||
Since the sessionHandle will sometimes have a serverProtocolVersion, it takes | ||||||||||||
precedence over the serverProtocolVersion defined in the OpenSessionResponse. | ||||||||||||
""" | ||||||||||||
Comment on lines
+101
to
+102
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
i think there is a line gap after a multi-line pydoc. @jprakash-db do we follow any python coding guidelines like for docstring: https://peps.python.org/pep-0257/ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in databricks we follow this https://databricks.atlassian.net/wiki/spaces/UN/pages/3334538555/Python+Guidelines+go+py but this could be different for OSS repo. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We use the black formatter which follows the PEP-257 style. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. got it. is there a linter? in the CI or do we have to run the linter manually? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently we have |
||||||||||||
if ( | ||||||||||||
openSessionResp.sessionHandle | ||||||||||||
and hasattr(openSessionResp.sessionHandle, "serverProtocolVersion") | ||||||||||||
and openSessionResp.sessionHandle.serverProtocolVersion | ||||||||||||
): | ||||||||||||
return openSessionResp.sessionHandle.serverProtocolVersion | ||||||||||||
return openSessionResp.serverProtocolVersion | ||||||||||||
|
||||||||||||
@staticmethod | ||||||||||||
def server_parameterized_queries_enabled(protocolVersion): | ||||||||||||
if ( | ||||||||||||
protocolVersion | ||||||||||||
and protocolVersion >= ttypes.TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V8 | ||||||||||||
): | ||||||||||||
return True | ||||||||||||
else: | ||||||||||||
return False | ||||||||||||
|
||||||||||||
def get_handle(self): | ||||||||||||
return self._handle | ||||||||||||
|
||||||||||||
def get_id(self): | ||||||||||||
handle = self.get_handle() | ||||||||||||
if handle is None: | ||||||||||||
return None | ||||||||||||
return self.thrift_backend.handle_to_id(handle) | ||||||||||||
|
||||||||||||
def get_id_hex(self): | ||||||||||||
handle = self.get_handle() | ||||||||||||
if handle is None: | ||||||||||||
return None | ||||||||||||
return self.thrift_backend.handle_to_hex_id(handle) | ||||||||||||
|
||||||||||||
def close(self) -> None: | ||||||||||||
"""Close the underlying session.""" | ||||||||||||
logger.info(f"Closing session {self.get_id_hex()}") | ||||||||||||
if not self.is_open: | ||||||||||||
logger.debug("Session appears to have been closed already") | ||||||||||||
return | ||||||||||||
|
||||||||||||
try: | ||||||||||||
self.thrift_backend.close_session(self.get_handle()) | ||||||||||||
except RequestError as e: | ||||||||||||
if isinstance(e.args[1], SessionAlreadyClosedError): | ||||||||||||
logger.info("Session was closed by a prior request") | ||||||||||||
except DatabaseError as e: | ||||||||||||
if "Invalid SessionHandle" in str(e): | ||||||||||||
logger.warning( | ||||||||||||
f"Attempted to close session that was already closed: {e}" | ||||||||||||
) | ||||||||||||
else: | ||||||||||||
logger.warning( | ||||||||||||
f"Attempt to close session raised an exception at the server: {e}" | ||||||||||||
) | ||||||||||||
except Exception as e: | ||||||||||||
logger.error(f"Attempt to close session raised a local exception: {e}") | ||||||||||||
|
||||||||||||
self.is_open = False | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. when opening session the flag is set at the end which makes sense. for closing session call, should we be eager to unset the flag in the very beginning? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if the session close fails? Shouldn't the session remain open? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if the client code has called on the Session class to close the session, then the client assumes that method will close the session. I think unsetting the flag right away makes more sense then. However, an interesting question is do we use this flag internally in Session class to make unsetting meaningful (i.e., when flag is false, do we give null or throw exception when getting session handle?) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently there does not seem to exist such a dependency, but I'm still not clear on this. If the close() call raises an exception isn't the client expected to retry? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we group together property and staticmethod? @jprakash-db any coding/lint guidelines OSS python driver follows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are no specific standard in python, because saying something as private etc has no meaning, we can access anything anytime. There are some general standards but nothing concrete