20
20
from decimal import Decimal
21
21
from typing import Any , List , Optional # NOQA for mypy types
22
22
23
- import copy
24
23
import uuid
25
24
import datetime
26
25
import math
@@ -295,58 +294,41 @@ def warnings(self):
295
294
return self ._query .warnings
296
295
return None
297
296
297
+ def _new_request_with_session_from (self , request ):
298
+ """
299
+ Returns a new request with the `ClientSession` set to the one from the
300
+ given request.
301
+ """
302
+ request = self .connection ._create_request ()
303
+ request ._client_session = request ._client_session
304
+ return request
305
+
298
306
def setinputsizes (self , sizes ):
299
307
raise trino .exceptions .NotSupportedError
300
308
301
309
def setoutputsize (self , size , column ):
302
310
raise trino .exceptions .NotSupportedError
303
311
304
- def _prepare_statement (self , operation , statement_name ):
312
+ def _prepare_statement (self , statement , name ):
305
313
"""
306
- Prepends the given `operation` with "PREPARE <statement_name> FROM" and
307
- executes as a prepare statement.
308
-
309
- :param operation: sql to be executed.
310
- :param statement_name: name that will be assigned to the prepare
311
- statement.
312
-
313
- :raises trino.exceptions.FailedToObtainAddedPrepareHeader: Error raised
314
- when unable to find the 'X-Trino-Added-Prepare' for the PREPARE
315
- statement request.
314
+ Registers a prepared statement for the provided `operation` with the
315
+ `name` assigned to it.
316
316
317
- :return: string representing the value of the 'X-Trino-Added-Prepare'
318
- header .
317
+ :param statement: sql to be executed.
318
+ :param name: name that will be assigned to the prepared statement .
319
319
"""
320
- sql = 'PREPARE {statement_name} FROM {operation}' .format (
321
- statement_name = statement_name ,
322
- operation = operation
323
- )
324
-
325
- # Send prepare statement. Copy the _request object to avoid polluting the
326
- # one that is going to be used to execute the actual operation.
327
- query = trino .client .TrinoQuery (copy .deepcopy (self ._request ), sql = sql ,
320
+ sql = f"PREPARE { name } FROM { statement } "
321
+ # TODO: Evaluate whether we can avoid the piggybacking on current request
322
+ query = trino .client .TrinoQuery (self ._new_request_with_session_from (self ._request ), sql = sql ,
328
323
experimental_python_types = self ._experimental_pyton_types )
329
- result = query .execute ()
330
-
331
- # Iterate until the 'X-Trino-Added-Prepare' header is found or
332
- # until there are no more results
333
- for _ in result :
334
- response_headers = result .response_headers
335
-
336
- if constants .HEADER_ADDED_PREPARE in response_headers :
337
- return response_headers [constants .HEADER_ADDED_PREPARE ]
324
+ query .execute ()
338
325
339
- raise trino .exceptions .FailedToObtainAddedPrepareHeader
340
-
341
- def _get_added_prepare_statement_trino_query (
326
+ def _execute_prepared_statement (
342
327
self ,
343
328
statement_name ,
344
329
params
345
330
):
346
331
sql = 'EXECUTE ' + statement_name + ' USING ' + ',' .join (map (self ._format_prepared_param , params ))
347
-
348
- # No need to deepcopy _request here because this is the actual request
349
- # operation
350
332
return trino .client .TrinoQuery (self ._request , sql = sql , experimental_python_types = self ._experimental_pyton_types )
351
333
352
334
def _format_prepared_param (self , param ):
@@ -422,28 +404,12 @@ def _format_prepared_param(self, param):
422
404
423
405
raise trino .exceptions .NotSupportedError ("Query parameter of type '%s' is not supported." % type (param ))
424
406
425
- def _deallocate_prepare_statement (self , added_prepare_header , statement_name ):
407
+ def _deallocate_prepared_statement (self , statement_name ):
426
408
sql = 'DEALLOCATE PREPARE ' + statement_name
427
-
428
- # Send deallocate statement. Copy the _request object to avoid poluting the
429
- # one that is going to be used to execute the actual operation.
430
- query = trino .client .TrinoQuery (copy .deepcopy (self ._request ), sql = sql ,
409
+ # TODO: Evaluate whether we can avoid the piggybacking on current request
410
+ query = trino .client .TrinoQuery (self ._new_request_with_session_from (self ._request ), sql = sql ,
431
411
experimental_python_types = self ._experimental_pyton_types )
432
- result = query .execute (
433
- additional_http_headers = {
434
- constants .HEADER_PREPARED_STATEMENT : added_prepare_header
435
- }
436
- )
437
-
438
- # Iterate until the 'X-Trino-Deallocated-Prepare' header is found or
439
- # until there are no more results
440
- for _ in result :
441
- response_headers = result .response_headers
442
-
443
- if constants .HEADER_DEALLOCATED_PREPARE in response_headers :
444
- return response_headers [constants .HEADER_DEALLOCATED_PREPARE ]
445
-
446
- raise trino .exceptions .FailedToObtainDeallocatedPrepareHeader
412
+ query .execute ()
447
413
448
414
def _generate_unique_statement_name (self ):
449
415
return 'st_' + uuid .uuid4 ().hex .replace ('-' , '' )
@@ -456,27 +422,21 @@ def execute(self, operation, params=None):
456
422
)
457
423
458
424
statement_name = self ._generate_unique_statement_name ()
459
- # Send prepare statement
460
- added_prepare_header = self ._prepare_statement (
461
- operation , statement_name
462
- )
425
+ self ._prepare_statement (operation , statement_name )
463
426
464
427
try :
465
428
# Send execute statement and assign the return value to `results`
466
429
# as it will be returned by the function
467
- self ._query = self ._get_added_prepare_statement_trino_query (
430
+ self ._query = self ._execute_prepared_statement (
468
431
statement_name , params
469
432
)
470
- result = self ._query .execute (
471
- additional_http_headers = {
472
- constants .HEADER_PREPARED_STATEMENT : added_prepare_header
473
- }
474
- )
433
+ result = self ._query .execute ()
475
434
finally :
476
435
# Send deallocate statement
477
436
# At this point the query can be deallocated since it has already
478
437
# been executed
479
- self ._deallocate_prepare_statement (added_prepare_header , statement_name )
438
+ # TODO: Consider caching prepared statements if requested by caller
439
+ self ._deallocate_prepared_statement (statement_name )
480
440
481
441
else :
482
442
self ._query = trino .client .TrinoQuery (self ._request , sql = operation ,
0 commit comments