9
9
10
10
logger = logging .getLogger (__name__ )
11
11
12
+
12
13
class SEAHttpClient :
13
14
"""
14
15
HTTP client for Statement Execution API (SEA).
15
-
16
+
16
17
This client handles the HTTP communication with the SEA endpoints,
17
18
including authentication, request formatting, and response parsing.
18
19
"""
19
-
20
+
20
21
def __init__ (
21
22
self ,
22
23
server_hostname : str ,
@@ -25,11 +26,11 @@ def __init__(
25
26
http_headers : List [tuple ],
26
27
auth_provider : AuthProvider ,
27
28
ssl_options : SSLOptions ,
28
- ** kwargs
29
+ ** kwargs ,
29
30
):
30
31
"""
31
32
Initialize the SEA HTTP client.
32
-
33
+
33
34
Args:
34
35
server_hostname: Hostname of the Databricks server
35
36
port: Port number for the connection
@@ -44,74 +45,80 @@ def __init__(
44
45
self .http_path = http_path
45
46
self .auth_provider = auth_provider
46
47
self .ssl_options = ssl_options
47
-
48
+
48
49
# Base URL for API requests
49
50
self .base_url = f"https://{ server_hostname } :{ port } "
50
-
51
+
51
52
# Convert headers list to dictionary
52
53
self .headers = dict (http_headers )
53
54
self .headers .update ({"Content-Type" : "application/json" })
54
-
55
+
55
56
# Session retry configuration
56
57
self .max_retries = kwargs .get ("_retry_stop_after_attempts_count" , 30 )
57
-
58
+
58
59
# Create a session for connection pooling
59
60
self .session = requests .Session ()
60
-
61
+
61
62
# Configure SSL verification
62
63
if ssl_options .tls_verify :
63
64
self .session .verify = ssl_options .tls_trusted_ca_file or True
64
65
else :
65
66
self .session .verify = False
66
-
67
+
67
68
# Configure client certificates if provided
68
69
if ssl_options .tls_client_cert_file :
69
70
client_cert = ssl_options .tls_client_cert_file
70
71
client_key = ssl_options .tls_client_cert_key_file
71
72
client_key_password = ssl_options .tls_client_cert_key_password
72
-
73
+
73
74
if client_key :
74
75
self .session .cert = (client_cert , client_key )
75
76
else :
76
77
self .session .cert = client_cert
77
-
78
+
78
79
if client_key_password :
79
80
# Note: requests doesn't directly support key passwords
80
81
# This would require more complex handling with libraries like pyOpenSSL
81
- logger .warning ("Client key password provided but not supported by requests library" )
82
-
82
+ logger .warning (
83
+ "Client key password provided but not supported by requests library"
84
+ )
85
+
83
86
def _get_auth_headers (self ) -> Dict [str , str ]:
84
87
"""Get authentication headers from the auth provider."""
85
- headers = {}
88
+ headers : Dict [ str , str ] = {}
86
89
self .auth_provider .add_headers (headers )
87
90
return headers
88
-
89
- def _make_request (self , method : str , path : str , data : Optional [Dict [str , Any ]] = None ) -> Dict [str , Any ]:
91
+
92
+ def _make_request (
93
+ self , method : str , path : str , data : Optional [Dict [str , Any ]] = None
94
+ ) -> Dict [str , Any ]:
90
95
"""
91
96
Make an HTTP request to the SEA endpoint.
92
-
97
+
93
98
Args:
94
99
method: HTTP method (GET, POST, DELETE)
95
100
path: API endpoint path
96
101
data: Request payload data
97
-
102
+
98
103
Returns:
99
104
Dict[str, Any]: Response data parsed from JSON
100
-
105
+
101
106
Raises:
102
107
RequestError: If the request fails
103
108
"""
104
109
url = urljoin (self .base_url , path )
105
110
headers = {** self .headers , ** self ._get_auth_headers ()}
106
-
111
+
107
112
# Log request details (without sensitive information)
108
113
logger .debug (f"Making { method } request to { url } " )
109
114
logger .debug (f"Headers: { [k for k in headers .keys ()]} " )
110
115
if data :
111
116
# Don't log sensitive data like access tokens
112
- safe_data = {k : v for k , v in data .items () if k not in ["access_token" , "token" ]}
117
+ safe_data = {
118
+ k : v for k , v in data .items () if k not in ["access_token" , "token" ]
119
+ }
113
120
logger .debug (f"Request data: { safe_data } " )
114
-
121
+
115
122
try :
116
123
if method .upper () == "GET" :
117
124
response = self .session .get (url , headers = headers , params = data )
@@ -122,42 +129,52 @@ def _make_request(self, method: str, path: str, data: Optional[Dict[str, Any]] =
122
129
response = self .session .delete (url , headers = headers , params = data )
123
130
else :
124
131
raise ValueError (f"Unsupported HTTP method: { method } " )
125
-
132
+
126
133
# Check for HTTP errors
127
134
response .raise_for_status ()
128
-
135
+
129
136
# Log response details
130
137
logger .debug (f"Response status: { response .status_code } " )
131
138
logger .debug (f"Response headers: { dict (response .headers )} " )
132
-
139
+
133
140
# Parse JSON response
134
141
if response .content :
135
142
result = response .json ()
136
143
# Log response content (but limit it for large responses)
137
144
content_str = json .dumps (result )
138
145
if len (content_str ) > 1000 :
139
- logger .debug (f"Response content (truncated): { content_str [:1000 ]} ..." )
146
+ logger .debug (
147
+ f"Response content (truncated): { content_str [:1000 ]} ..."
148
+ )
140
149
else :
141
150
logger .debug (f"Response content: { content_str } " )
142
151
return result
143
152
return {}
144
-
153
+
145
154
except requests .exceptions .RequestException as e :
146
155
# Handle request errors
147
156
error_message = f"SEA HTTP request failed: { str (e )} "
148
157
logger .error (error_message )
149
-
158
+
150
159
# Extract error details from response if available
151
160
if hasattr (e , "response" ) and e .response is not None :
152
161
try :
153
162
error_details = e .response .json ()
154
- error_message = f"{ error_message } : { error_details .get ('message' , '' )} "
155
- logger .error (f"Response status: { e .response .status_code } , Error details: { error_details } " )
163
+ error_message = (
164
+ f"{ error_message } : { error_details .get ('message' , '' )} "
165
+ )
166
+ logger .error (
167
+ f"Response status: { e .response .status_code } , Error details: { error_details } "
168
+ )
156
169
except (ValueError , KeyError ):
157
170
# If we can't parse the JSON, just log the raw content
158
- logger .error (f"Response status: { e .response .status_code } , Raw content: { e .response .content } " )
171
+ content_str = e .response .content .decode ('utf-8' , errors = 'replace' ) if isinstance (e .response .content , bytes ) else str (e .response .content )
172
+ logger .error (
173
+ f"Response status: { e .response .status_code } , Raw content: { content_str } "
174
+ )
159
175
pass
160
-
176
+
161
177
# Re-raise as a RequestError
162
178
from databricks .sql .exc import RequestError
163
- raise RequestError (error_message , e )
179
+
180
+ raise RequestError (error_message , e )
0 commit comments