|
| 1 | +import itertools |
1 | 2 | from contextlib import contextmanager
|
2 | 3 | from collections import OrderedDict
|
3 | 4 | import datetime
|
@@ -47,6 +48,7 @@ def __init__(self, method_name):
|
47 | 48 | # If running in local mode, just use environment variables for params.
|
48 | 49 | self.arguments = os.environ if get_args_from_env else {}
|
49 | 50 | self.arraysize = 1000
|
| 51 | + self.buffer_size_bytes = 10485760 |
50 | 52 |
|
51 | 53 | def connection_params(self, arguments):
|
52 | 54 | params = {
|
@@ -79,7 +81,7 @@ def connection(self, extra_params=()):
|
79 | 81 | @contextmanager
|
80 | 82 | def cursor(self, extra_params=()):
|
81 | 83 | with self.connection(extra_params) as conn:
|
82 |
| - cursor = conn.cursor(arraysize=self.arraysize) |
| 84 | + cursor = conn.cursor(arraysize=self.arraysize, buffer_size_bytes=self.buffer_size_bytes) |
83 | 85 | try:
|
84 | 86 | yield cursor
|
85 | 87 | finally:
|
@@ -603,14 +605,34 @@ def test_close_connection_closes_cursors(self):
|
603 | 605 | assert op_status_at_server.operationState != ttypes.TOperationState.CLOSED_STATE
|
604 | 606 |
|
605 | 607 | conn.close()
|
606 |
| - |
| 608 | + |
607 | 609 | # When connection closes, any cursor operations should no longer exist at the server
|
608 | 610 | with self.assertRaises(thrift.Thrift.TApplicationException) as cm:
|
609 | 611 | op_status_at_server = ars.thrift_backend._client.GetOperationStatus(status_request)
|
610 | 612 | if hasattr(cm, "exception"):
|
611 | 613 | assert "RESOURCE_DOES_NOT_EXIST" in cm.exception.message
|
612 | 614 |
|
613 |
| - |
| 615 | + @skipUnless(pysql_supports_arrow(), 'needs arrow support') |
| 616 | + def test_cloud_fetch(self): |
| 617 | + # This test can take several minutes to run |
| 618 | + limits = [100000, 500000] |
| 619 | + threads = [10, 25] |
| 620 | + self.buffer_size_bytes = 104857600 |
| 621 | + self.arraysize = 100000 |
| 622 | + base_query = "SELECT * FROM store_sales " |
| 623 | + for num_limit, num_threads, lz4_compression in itertools.product(limits, threads, [True, False]): |
| 624 | + with self.subTest(num_limit=num_limit, num_threads=num_threads, lz4_compression=lz4_compression): |
| 625 | + cf_result, noop_result = None, None |
| 626 | + query = base_query + "LIMIT " + str(num_limit) |
| 627 | + with self.cursor({"use_cloud_fetch": True, "max_download_threads": num_threads}) as cursor: |
| 628 | + cursor.execute(query) |
| 629 | + cf_result = cursor.fetchall() |
| 630 | + with self.cursor({}) as cursor: |
| 631 | + cursor.execute(query) |
| 632 | + noop_result = cursor.fetchall() |
| 633 | + assert len(cf_result) == len(noop_result) |
| 634 | + for i in range(len(cf_result)): |
| 635 | + assert cf_result[i] == noop_result[i] |
614 | 636 |
|
615 | 637 |
|
616 | 638 | # use a RetrySuite to encapsulate these tests which we'll typically want to run together; however keep
|
@@ -792,7 +814,7 @@ def perform_remove():
|
792 | 814 |
|
793 | 815 | # Clean up after ourselves
|
794 | 816 | perform_remove()
|
795 |
| - |
| 817 | + |
796 | 818 | def test_staging_ingestion_fails_to_modify_another_staging_user(self):
|
797 | 819 | """The server should only allow modification of the staging_ingestion_user's files
|
798 | 820 | """
|
@@ -908,7 +930,7 @@ def generate_file_and_path_and_queries():
|
908 | 930 |
|
909 | 931 | cursor.execute(put_query1)
|
910 | 932 | cursor.execute(put_query2)
|
911 |
| - |
| 933 | + |
912 | 934 | with pytest.raises(Error, match="Local file operations are restricted to paths within the configured staging_allowed_local_path"):
|
913 | 935 | cursor.execute(put_query3)
|
914 | 936 |
|
|
0 commit comments