Skip to content

Commit a1677e2

Browse files
committed
Cloud Fetch e2e tests
Signed-off-by: Matthew Kim <[email protected]>
1 parent 5379803 commit a1677e2

File tree

1 file changed

+27
-5
lines changed

1 file changed

+27
-5
lines changed

tests/e2e/driver_tests.py

+27-5
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import itertools
12
from contextlib import contextmanager
23
from collections import OrderedDict
34
import datetime
@@ -47,6 +48,7 @@ def __init__(self, method_name):
4748
# If running in local mode, just use environment variables for params.
4849
self.arguments = os.environ if get_args_from_env else {}
4950
self.arraysize = 1000
51+
self.buffer_size_bytes = 10485760
5052

5153
def connection_params(self, arguments):
5254
params = {
@@ -79,7 +81,7 @@ def connection(self, extra_params=()):
7981
@contextmanager
8082
def cursor(self, extra_params=()):
8183
with self.connection(extra_params) as conn:
82-
cursor = conn.cursor(arraysize=self.arraysize)
84+
cursor = conn.cursor(arraysize=self.arraysize, buffer_size_bytes=self.buffer_size_bytes)
8385
try:
8486
yield cursor
8587
finally:
@@ -603,14 +605,34 @@ def test_close_connection_closes_cursors(self):
603605
assert op_status_at_server.operationState != ttypes.TOperationState.CLOSED_STATE
604606

605607
conn.close()
606-
608+
607609
# When connection closes, any cursor operations should no longer exist at the server
608610
with self.assertRaises(thrift.Thrift.TApplicationException) as cm:
609611
op_status_at_server = ars.thrift_backend._client.GetOperationStatus(status_request)
610612
if hasattr(cm, "exception"):
611613
assert "RESOURCE_DOES_NOT_EXIST" in cm.exception.message
612614

613-
615+
@skipUnless(pysql_supports_arrow(), 'needs arrow support')
616+
def test_cloud_fetch(self):
617+
# This test can take several minutes to run
618+
limits = [100000, 600000]
619+
threads = [10, 25]
620+
self.buffer_size_bytes = 104857600
621+
self.arraysize = 100000
622+
base_query = "SELECT * FROM store_sales "
623+
for num_limit, num_threads, lz4_compression in itertools.product(limits, threads, [True, False]):
624+
with self.subTest(num_limit=num_limit, num_threads=num_threads, lz4_compression=lz4_compression):
625+
cf_result, noop_result = None, None
626+
query = base_query + "LIMIT " + str(num_limit)
627+
with self.cursor({"use_cloud_fetch": True, "max_download_threads": num_threads}) as cursor:
628+
cursor.execute(query)
629+
cf_result = cursor.fetchall()
630+
with self.cursor({}) as cursor:
631+
cursor.execute(query)
632+
noop_result = cursor.fetchall()
633+
assert len(cf_result) == len(noop_result)
634+
for i in range(len(cf_result)):
635+
assert cf_result[i] == noop_result[i]
614636

615637

616638
# use a RetrySuite to encapsulate these tests which we'll typically want to run together; however keep
@@ -792,7 +814,7 @@ def perform_remove():
792814

793815
# Clean up after ourselves
794816
perform_remove()
795-
817+
796818
def test_staging_ingestion_fails_to_modify_another_staging_user(self):
797819
"""The server should only allow modification of the staging_ingestion_user's files
798820
"""
@@ -908,7 +930,7 @@ def generate_file_and_path_and_queries():
908930

909931
cursor.execute(put_query1)
910932
cursor.execute(put_query2)
911-
933+
912934
with pytest.raises(Error, match="Local file operations are restricted to paths within the configured staging_allowed_local_path"):
913935
cursor.execute(put_query3)
914936

0 commit comments

Comments
 (0)