Skip to content

Cloud Fetch e2e tests #154

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 7, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion tests/e2e/test_driver.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import itertools
from contextlib import contextmanager
from collections import OrderedDict
import datetime
Expand Down Expand Up @@ -52,6 +53,7 @@ def __init__(self, method_name):
# If running in local mode, just use environment variables for params.
self.arguments = os.environ if get_args_from_env else {}
self.arraysize = 1000
self.buffer_size_bytes = 10485760

def connection_params(self, arguments):
params = {
Expand Down Expand Up @@ -84,7 +86,7 @@ def connection(self, extra_params=()):
@contextmanager
def cursor(self, extra_params=()):
with self.connection(extra_params) as conn:
cursor = conn.cursor(arraysize=self.arraysize)
cursor = conn.cursor(arraysize=self.arraysize, buffer_size_bytes=self.buffer_size_bytes)
try:
yield cursor
finally:
Expand Down Expand Up @@ -633,6 +635,27 @@ def test_closing_a_closed_connection_doesnt_fail(self):

self.assertTrue(expected_message_was_found, "Did not find expected log messages")

@skipUnless(pysql_supports_arrow(), 'needs arrow support')
def test_cloud_fetch(self):
# This test can take several minutes to run
limits = [100000, 300000]
threads = [10, 25]
self.buffer_size_bytes = 104857600
self.arraysize = 100000
base_query = "SELECT * FROM store_sales WHERE ss_sold_date_sk = 2452234 "
for num_limit, num_threads, lz4_compression in itertools.product(limits, threads, [True, False]):
with self.subTest(num_limit=num_limit, num_threads=num_threads, lz4_compression=lz4_compression):
cf_result, noop_result = None, None
query = base_query + "LIMIT " + str(num_limit)
with self.cursor({"use_cloud_fetch": True, "max_download_threads": num_threads}) as cursor:
cursor.execute(query)
cf_result = cursor.fetchall()
with self.cursor({}) as cursor:
cursor.execute(query)
noop_result = cursor.fetchall()
assert len(cf_result) == len(noop_result)
for i in range(len(cf_result)):
assert cf_result[i] == noop_result[i]

# use a RetrySuite to encapsulate these tests which we'll typically want to run together; however keep
# the 429/503 subsuites separate since they execute under different circumstances.
Expand Down