6
6
import unittest
7
7
8
8
import pandas as pd
9
- from pandas .core .frame import DataFrame
10
- from sqlalchemy import text
11
- import mysql .connector
9
+ from sqlalchemy import create_engine
12
10
13
11
# from flask.testing import FlaskClient
14
12
from delphi_utils import Nans
13
+ from delphi .epidata .server .main import app
15
14
from delphi .epidata .server ._pandas import as_pandas
16
- from delphi .epidata .server ._query import limit_query
15
+ from delphi .epidata .server ._query import QueryBuilder
17
16
18
17
# py3tester coverage target
19
18
__test_target__ = "delphi.epidata.server._query"
@@ -111,27 +110,26 @@ class UnitTests(unittest.TestCase):
111
110
112
111
def setUp (self ):
113
112
"""Perform per-test setup."""
113
+ app .config ["TESTING" ] = True
114
+ app .config ["WTF_CSRF_ENABLED" ] = False
115
+ app .config ["DEBUG" ] = False
114
116
115
117
# connect to the `epidata` database and clear the `covidcast` table
116
- cnx = mysql .connector .connect (user = "user" , password = "pass" , host = "delphi_database_epidata" , database = "epidata" )
117
- cur = cnx .cursor ()
118
- cur .execute ("truncate table covidcast" )
119
- cur .execute ('update covidcast_meta_cache set timestamp = 0, epidata = ""' )
120
- cnx .commit ()
121
- cur .close ()
118
+ engine = create_engine ('mysql://user:pass@delphi_database_epidata/epidata' )
119
+ cnx = engine .connect ()
120
+ cnx .execute ("truncate table covidcast" )
121
+ cnx .execute ('update covidcast_meta_cache set timestamp = 0, epidata = ""' )
122
122
123
123
# make connection and cursor available to test cases
124
124
self .cnx = cnx
125
- self .cur = cnx .cursor ()
126
125
127
126
def tearDown (self ):
128
127
"""Perform per-test teardown."""
129
- self .cur .close ()
130
128
self .cnx .close ()
131
129
132
130
def _insert_rows (self , rows : Iterable [CovidcastRow ]):
133
131
sql = ",\n " .join ((str (r ) for r in rows ))
134
- self .cur .execute (
132
+ self .cnx .execute (
135
133
f"""
136
134
INSERT INTO
137
135
`covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`,
@@ -143,7 +141,6 @@ def _insert_rows(self, rows: Iterable[CovidcastRow]):
143
141
{ sql }
144
142
"""
145
143
)
146
- self .cnx .commit ()
147
144
return rows
148
145
149
146
def _rows_to_df (self , rows : Iterable [CovidcastRow ]) -> pd .DataFrame :
@@ -160,17 +157,12 @@ def test_as_pandas(self):
160
157
rows = [CovidcastRow (time_value = 20200401 + i , value = float (i )) for i in range (10 )]
161
158
self ._insert_rows (rows )
162
159
163
- with self .subTest ("simple" ):
164
- query = """select * from `covidcast`"""
165
- params = {}
166
- parse_dates = None
167
- engine = self .cnx
168
- df = pd .read_sql_query (str (query ), engine , params = params , parse_dates = parse_dates )
169
- df = df .astype ({"is_latest_issue" : bool , "is_wip" : bool })
160
+ with app .test_request_context ('/correlation' ):
161
+ q = QueryBuilder ("covidcast" , "t" )
162
+
163
+ df = as_pandas (str (q ), params = {}, db_engine = self .cnx , parse_dates = None ).astype ({"is_latest_issue" : bool , "is_wip" : bool })
170
164
expected_df = self ._rows_to_df (rows )
171
165
pd .testing .assert_frame_equal (df , expected_df )
172
- query = limit_query (query , 5 )
173
- df = pd .read_sql_query (str (query ), engine , params = params , parse_dates = parse_dates )
174
- df = df .astype ({"is_latest_issue" : bool , "is_wip" : bool })
166
+ df = as_pandas (str (q ), params = {}, db_engine = self .cnx , parse_dates = None , limit_rows = 5 ).astype ({"is_latest_issue" : bool , "is_wip" : bool })
175
167
expected_df = self ._rows_to_df (rows [:5 ])
176
168
pd .testing .assert_frame_equal (df , expected_df )
0 commit comments