Skip to content

Commit bf86b29

Browse files
committed
piecewise metadata computation
tests changed as i pulled the serialization down a layer
1 parent 2b183d7 commit bf86b29

File tree

4 files changed

+41
-24
lines changed

4 files changed

+41
-24
lines changed

src/acquisition/covidcast/covidcast_meta_cache_updater.py

+1-5
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
# standard library
44
import argparse
5-
import json
65
import sys
76

87
# first party
@@ -43,12 +42,9 @@ def main(args, epidata_impl=Epidata, database_impl=Database):
4342
print('unable to cache epidata')
4443
return False
4544

46-
# serialize the data
47-
epidata_json = json.dumps(metadata)
48-
4945
# update the cache
5046
try:
51-
database.update_covidcast_meta_cache(epidata_json)
47+
database.update_covidcast_meta_cache(metadata)
5248
print('successfully cached epidata')
5349
finally:
5450
# no catch block so that an exception above will cause the program to

src/acquisition/covidcast/database.py

+37-15
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@
44
"""
55

66
# third party
7+
import json
78
import mysql.connector
89
import numpy as np
10+
from math import ceil
911

1012
# first party
1113
import delphi.operations.secrets as secrets
1214

13-
from math import ceil
1415

1516

1617
class CovidcastRow():
@@ -543,7 +544,13 @@ def update_timeseries_direction_updated_timestamp(
543544
def get_covidcast_meta(self):
544545
"""Compute and return metadata on all non-WIP COVIDcast signals."""
545546

546-
sql = '''
547+
meta = []
548+
549+
sql = 'SELECT `source`, `signal` FROM covidcast GROUP BY `source`, `signal` ORDER BY `source` ASC, `signal` ASC;'
550+
self._cursor.execute(sql)
551+
for source, signal in [ss for ss in self._cursor]: #NOTE: this obfuscation protects the integrity of the cursor; using the cursor as a generator will cause contention w/ subsequent queries
552+
553+
sql = '''
547554
SELECT
548555
t.`source` AS `data_source`,
549556
t.`signal`,
@@ -574,39 +581,38 @@ def get_covidcast_meta(self):
574581
`geo_value`
575582
FROM
576583
`covidcast`
584+
WHERE
585+
`source` = %s AND
586+
`signal` = %s
577587
GROUP BY
578588
`time_value`,
579589
`time_type`,
580590
`geo_type`,
581-
`source`,
582-
`signal`,
583591
`geo_value`
584592
) x
585593
ON
586594
x.`max_issue` = t.`issue` AND
587595
x.`time_type` = t.`time_type` AND
588596
x.`time_value` = t.`time_value` AND
589-
x.`source` = t.`source` AND
590-
x.`signal` = t.`signal` AND
591597
x.`geo_type` = t.`geo_type` AND
592598
x.`geo_value` = t.`geo_value`
593599
WHERE
594-
NOT t.`is_wip`
600+
NOT t.`is_wip` AND
601+
t.`source` = %s AND
602+
t.`signal` = %s
595603
GROUP BY
596-
t.`source`,
597-
t.`signal`,
598604
t.`time_type`,
599605
t.`geo_type`
600606
ORDER BY
601-
t.`source` ASC,
602-
t.`signal` ASC,
603607
t.`time_type` ASC,
604608
t.`geo_type` ASC
605-
'''
606-
self._cursor.execute(sql)
607-
return list(dict(zip(self._cursor.column_names,x)) for x in self._cursor)
609+
'''
610+
self._cursor.execute(sql, (source, signal, source, signal))
611+
meta.extend(list(dict(zip(self._cursor.column_names,x)) for x in self._cursor))
608612

609-
def update_covidcast_meta_cache(self, epidata_json):
613+
return meta
614+
615+
def update_covidcast_meta_cache(self, metadata):
610616
"""Updates the `covidcast_meta_cache` table."""
611617

612618
sql = '''
@@ -616,5 +622,21 @@ def update_covidcast_meta_cache(self, epidata_json):
616622
`timestamp` = UNIX_TIMESTAMP(NOW()),
617623
`epidata` = %s
618624
'''
625+
epidata_json = json.dumps(metadata)
619626

620627
self._cursor.execute(sql, (epidata_json,))
628+
629+
def retrieve_covidcast_meta_cache(self):
630+
sql = '''
631+
SELECT `epidata`
632+
FROM `covidcast_meta_cache`
633+
ORDER BY `timestamp` DESC
634+
LIMIT 1;
635+
'''
636+
self._cursor.execute(sql)
637+
cache_json = self._cursor.fetchone()[0]
638+
cache = json.loads(cache_json)
639+
cache_hash = {}
640+
for entry in cache:
641+
cache_hash[(entry['data_source'], entry['signal'], entry['time_type'], entry['geo_type'])] = entry
642+
return cache_hash

tests/acquisition/covidcast/test_covidcast_meta_cache_updater.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
# standard library
44
import argparse
5-
import json
65
import unittest
76
from unittest.mock import MagicMock
87

@@ -49,7 +48,7 @@ def test_main_successful(self):
4948

5049
self.assertTrue(mock_database.update_covidcast_meta_cache.called)
5150
actual_args = mock_database.update_covidcast_meta_cache.call_args[0]
52-
expected_args = (json.dumps(api_response['epidata']),)
51+
expected_args = (api_response['epidata'],)
5352
self.assertEqual(actual_args, expected_args)
5453

5554
self.assertTrue(mock_database.disconnect.called)

tests/acquisition/covidcast/test_database.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ def test_update_covidcast_meta_cache_query(self):
269269
NOTE: Actual behavior is tested by integration test.
270270
"""
271271

272-
args = ('epidata_json',)
272+
args = ('epidata_json_str',)
273273
mock_connector = MagicMock()
274274
database = Database()
275275
database.connect(connector_impl=mock_connector)
@@ -281,7 +281,7 @@ def test_update_covidcast_meta_cache_query(self):
281281
self.assertTrue(cursor.execute.called)
282282

283283
sql, args = cursor.execute.call_args[0]
284-
expected_args = ('epidata_json',)
284+
expected_args = ('"epidata_json_str"',)
285285
self.assertEqual(args, expected_args)
286286

287287
sql = sql.lower()

0 commit comments

Comments
 (0)