14
14
15
15
import typing
16
16
17
- from google .cloud import bigquery
18
- from google .cloud .bigquery_storage import types as bqstorage_types
19
17
import pandas
20
18
import pandas .testing
21
19
import pytest
@@ -41,7 +39,15 @@ def _assert_bq_execution_location(
41
39
if expected_location is None :
42
40
expected_location = session ._location
43
41
44
- assert typing .cast (bigquery .QueryJob , df .query_job ).location == expected_location
42
+ query_job = df .query_job
43
+ assert query_job is not None
44
+ assert query_job .location == expected_location
45
+ destination = query_job .destination
46
+ assert destination is not None
47
+ destination_dataset = session .bqclient .get_dataset (
48
+ f"{ destination .project } .{ destination .dataset_id } "
49
+ )
50
+ assert destination_dataset .location == expected_location
45
51
46
52
# Ensure operation involving BQ client suceeds
47
53
result = (
@@ -52,38 +58,28 @@ def _assert_bq_execution_location(
52
58
.head ()
53
59
)
54
60
55
- assert (
56
- typing .cast (bigquery .QueryJob , result .query_job ).location == expected_location
61
+ # Use allow_large_results = True to force a job to be created.
62
+ result_pd = result .to_pandas (allow_large_results = True )
63
+
64
+ query_job = df .query_job
65
+ assert query_job is not None
66
+ assert query_job .location == expected_location
67
+ destination = query_job .destination
68
+ assert destination is not None
69
+ destination_dataset = session .bqclient .get_dataset (
70
+ f"{ destination .project } .{ destination .dataset_id } "
57
71
)
72
+ assert destination_dataset .location == expected_location
58
73
59
74
expected_result = pandas .DataFrame (
60
75
{"number" : [444 , 222 ]}, index = pandas .Index (["aaa" , "bbb" ], name = "name" )
61
76
)
62
77
pandas .testing .assert_frame_equal (
63
- expected_result , result .to_pandas (), check_dtype = False , check_index_type = False
64
- )
65
-
66
- # Ensure BQ Storage Read client operation succceeds
67
- table = result .query_job .destination
68
- requested_session = bqstorage_types .ReadSession ( # type: ignore[attr-defined]
69
- table = f"projects/{ table .project } /datasets/{ table .dataset_id } /tables/{ table .table_id } " ,
70
- data_format = bqstorage_types .DataFormat .ARROW , # type: ignore[attr-defined]
71
- )
72
- read_session = session .bqstoragereadclient .create_read_session (
73
- parent = f"projects/{ table .project } " ,
74
- read_session = requested_session ,
75
- max_stream_count = 1 ,
78
+ expected_result ,
79
+ result_pd ,
80
+ check_dtype = False ,
81
+ check_index_type = False ,
76
82
)
77
- reader = session .bqstoragereadclient .read_rows (read_session .streams [0 ].name )
78
- frames = []
79
- for message in reader .rows ().pages :
80
- frames .append (message .to_dataframe ())
81
- read_dataframe = pandas .concat (frames )
82
- # normalize before comparing since we lost some of the bigframes column
83
- # naming abtractions in the direct read of the destination table
84
- read_dataframe = read_dataframe .set_index ("name" )
85
- read_dataframe .columns = result .columns
86
- pandas .testing .assert_frame_equal (expected_result , read_dataframe )
87
83
88
84
89
85
def test_bq_location_default ():
0 commit comments