Skip to content

schema refinement, dbjobs speed increase, and added migration routine #922

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions integrations/acquisition/covidcast/test_csv_uploading.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from delphi_utils import Nans
from delphi.epidata.client.delphi_epidata import Epidata
from delphi.epidata.acquisition.covidcast.csv_to_database import main
from delphi.epidata.acquisition.covidcast.dbjobs_runner import main as dbjobs_main
import delphi.operations.secrets as secrets

# py3tester coverage target (equivalent to `import *`)
Expand Down Expand Up @@ -123,7 +122,6 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_values = pd.concat([values, pd.DataFrame({ "time_value": [20200419] * 3, "signal": [signal_name] * 3, "direction": [None] * 3})], axis=1).rename(columns=uploader_column_rename).to_dict(orient="records")
Expand Down Expand Up @@ -152,7 +150,6 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_values = pd.concat([values, pd.DataFrame({
Expand Down Expand Up @@ -187,7 +184,6 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_response = {'result': -2, 'message': 'no results'}
Expand All @@ -213,7 +209,6 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_values_df = pd.concat([values, pd.DataFrame({
Expand Down Expand Up @@ -247,7 +242,6 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_values = pd.concat([values, pd.DataFrame({
Expand Down Expand Up @@ -283,7 +277,6 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_values = pd.concat([values, pd.DataFrame({
Expand Down Expand Up @@ -315,7 +308,6 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_response = {'result': -2, 'message': 'no results'}
Expand All @@ -332,7 +324,6 @@ def test_uploading(self):
f.write('this,header,is,wrong\n')

main(args)
dbjobs_main()

path = data_dir + '/archive/failed/src-name/20200420_state_test.csv'
self.assertIsNotNone(os.stat(path))
Expand All @@ -346,7 +337,6 @@ def test_uploading(self):
f.write('file name is wrong\n')

main(args)
dbjobs_main()

path = data_dir + '/archive/failed/unknown/hello.csv'
self.assertIsNotNone(os.stat(path))
Expand Down
2 changes: 0 additions & 2 deletions integrations/acquisition/covidcast/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ def _make_dummy_row(self):
# cols: ^ timeval v se ssz ^issue ^lag

def _insert_rows(self, rows):
# inserts rows into the database using the full acquisition process, including 'dbjobs' load into history & latest tables
self._db.insert_or_update_bulk(rows)
self._db.run_dbjobs()
###db._connection.commit() # NOTE: this isnt needed here, but would be if using external access (like through client lib)

def _find_matches_for_row(self, row):
Expand Down
1 change: 0 additions & 1 deletion integrations/acquisition/covidcast/test_delete_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def _test_delete_batch(self, cc_deletions):
]
rows.append(CovidcastRow('src', 'sig', 'day', 'geo', 0, "d_justone", 0,0,0,0,0,0, 1, 0))
self._db.insert_or_update_bulk(rows)
self._db.run_dbjobs()

# delete entries
self._db.delete_batch(cc_deletions)
Expand Down
4 changes: 0 additions & 4 deletions integrations/client/test_delphi_epidata.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ def test_covidcast(self):
6.5, 2.2, 11.5, nmv, nmv, nmv, 20200416, 2),
]
self._db.insert_or_update_bulk(rows)
self._db.run_dbjobs()
self._db._connection.commit()

with self.subTest(name='request two signals'):
Expand Down Expand Up @@ -355,7 +354,6 @@ def test_geo_value(self):
60, 61, 62, nmv, nmv, nmv, 20200414, 0),
]
self._db.insert_or_update_bulk(rows)
self._db.run_dbjobs()
self._db._connection.commit()

def fetch(geo_value):
Expand Down Expand Up @@ -445,7 +443,6 @@ def test_covidcast_meta(self):
7.0, 2.0, 12.5, nmv, nmv, nmv, 20200416, 1),
]
self._db.insert_or_update_bulk(rows)
self._db.run_dbjobs()
self._db._connection.commit()

# cache it
Expand Down Expand Up @@ -501,7 +498,6 @@ def test_async_epidata(self):
60, 61, 62, nmv, nmv, nmv, 20200414, 0)
]
self._db.insert_or_update_bulk(rows)
self._db.run_dbjobs()
self._db._connection.commit()

test_output = Epidata.async_epidata([
Expand Down
1 change: 0 additions & 1 deletion integrations/server/test_covidcast_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ def tearDown(self):

def _insert_rows(self, rows: Iterable[CovidcastRow]):
self._db.insert_or_update_bulk(rows)
self._db.run_dbjobs()
self._db._connection.commit()
return rows

Expand Down
Loading