|
| 1 | +import itertools |
1 | 2 | from contextlib import contextmanager
|
2 | 3 | from collections import OrderedDict
|
3 | 4 | import datetime
|
@@ -47,6 +48,7 @@ def __init__(self, method_name):
|
47 | 48 | # If running in local mode, just use environment variables for params.
|
48 | 49 | self.arguments = os.environ if get_args_from_env else {}
|
49 | 50 | self.arraysize = 1000
|
| 51 | + self.buffer_size_bytes = 10485760 |
50 | 52 |
|
51 | 53 | def connection_params(self, arguments):
|
52 | 54 | params = {
|
@@ -79,7 +81,7 @@ def connection(self, extra_params=()):
|
79 | 81 | @contextmanager
|
80 | 82 | def cursor(self, extra_params=()):
|
81 | 83 | with self.connection(extra_params) as conn:
|
82 |
| - cursor = conn.cursor(arraysize=self.arraysize) |
| 84 | + cursor = conn.cursor(arraysize=self.arraysize, buffer_size_bytes=self.buffer_size_bytes) |
83 | 85 | try:
|
84 | 86 | yield cursor
|
85 | 87 | finally:
|
@@ -610,7 +612,27 @@ def test_close_connection_closes_cursors(self):
|
610 | 612 | if hasattr(cm, "exception"):
|
611 | 613 | assert "RESOURCE_DOES_NOT_EXIST" in cm.exception.message
|
612 | 614 |
|
613 |
| - |
| 615 | + @skipUnless(pysql_supports_arrow(), 'needs arrow support') |
| 616 | + def test_cloud_fetch(self): |
| 617 | + # This test can take several minutes to run |
| 618 | + limits = [100000, 600000] |
| 619 | + threads = [10, 25] |
| 620 | + self.buffer_size_bytes = 104857600 |
| 621 | + self.arraysize = 100000 |
| 622 | + base_query = "SELECT * FROM store_sales " |
| 623 | + for num_limit, num_threads, lz4_compression in itertools.product(limits, threads, [True, False]): |
| 624 | + with self.subTest(num_limit=num_limit, num_threads=num_threads, lz4_compression=lz4_compression): |
| 625 | + cf_result, noop_result = None, None |
| 626 | + query = base_query + "LIMIT " + str(num_limit) |
| 627 | + with self.cursor({"use_cloud_fetch": True, "max_download_threads": num_threads}) as cursor: |
| 628 | + cursor.execute(query) |
| 629 | + cf_result = cursor.fetchall() |
| 630 | + with self.cursor({}) as cursor: |
| 631 | + cursor.execute(query) |
| 632 | + noop_result = cursor.fetchall() |
| 633 | + assert len(cf_result) == len(noop_result) |
| 634 | + for i in range(len(cf_result)): |
| 635 | + assert cf_result[i] == noop_result[i] |
614 | 636 |
|
615 | 637 |
|
616 | 638 | # use a RetrySuite to encapsulate these tests which we'll typically want to run together; however keep
|
|
0 commit comments