Skip to content

Commit ac91a04

Browse files
authored
Merge pull request #258 from melange396/is_latest_fix
updates to address JHU is_latest (issue #329)
2 parents cf1cc58 + c8cf69e commit ac91a04

File tree

2 files changed

+97
-17
lines changed

2 files changed

+97
-17
lines changed

integrations/acquisition/covidcast/test_fill_is_latest_issue.py

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,13 @@ def tearDown(self):
4747
self.cur.close()
4848
self.cnx.close()
4949

50-
def test_fill_is_latest_issue(self):
50+
def _test_fill_is_latest_issue(self, clbp, use_filter):
5151
"""Update rows having a stale `direction` field and serve the results."""
5252

53+
# NOTE: column order is:
54+
# (id, source, signal, time_type, geo_type, time_value, geo_value,
55+
# value_updated_timestamp, value, stderr, sample_size, direction_updated_timestamp, direction, issue, lag, is_latest_issue, is_wip)
56+
5357
self.cur.execute('''
5458
insert into covidcast values
5559
(0, 'src', 'sig', 'day', 'state', 20200228, 'ca',
@@ -65,12 +69,35 @@ def test_fill_is_latest_issue(self):
6569
(0, 'src', 'sig', 'day', 'state', 20200301, 'ca',
6670
123, 5, 5, 5, 5, NULL, 20200302, 1, 1, False),
6771
(0, 'src', 'sig', 'day', 'state', 20200301, 'ca',
72+
123, 5, 9, 8, 7, NULL, 20200301, 0, 1, False),
73+
(0, 'src', 'sig', 'day', 'state', 20200228, 'ny',
74+
123, 2, 5, 5, 5, NULL, 20200228, 0, 1, False),
75+
(0, 'src', 'sig', 'day', 'state', 20200228, 'ny',
76+
123, 2, 0, 0, 0, NULL, 20200229, 1, 1, False),
77+
(0, 'src', 'sig', 'day', 'state', 20200229, 'ny',
78+
123, 6, 0, 0, 0, NULL, 20200301, 1, 1, False),
79+
(0, 'src', 'sig', 'day', 'state', 20200229, 'ny',
80+
123, 6, 9, 9, 9, NULL, 20200229, 0, 1, False),
81+
(0, 'src', 'sig', 'day', 'state', 20200301, 'ny',
82+
123, 5, 0, 0, 0, NULL, 20200303, 2, 1, False),
83+
(0, 'src', 'sig', 'day', 'state', 20200301, 'ny',
84+
123, 5, 5, 5, 5, NULL, 20200302, 1, 1, False),
85+
(0, 'src', 'sig', 'day', 'state', 20200301, 'ny',
6886
123, 5, 9, 8, 7, NULL, 20200301, 0, 1, False)
6987
''')
7088
self.cnx.commit()
7189

90+
# NOTE: 'ny' values are identical to the 'ca' values, but with the `geo_value` changed
91+
92+
if use_filter:
93+
# ignores ny
94+
fc = "`geo_value` = 'ca'"
95+
else:
96+
# wildcard ; does not filter
97+
fc = "TRUE"
98+
7299
# fill is_latest_issue
73-
main()
100+
main(FILTER_CONDITION=fc, CLEAR_LATEST_BY_PARTITION=clbp)
74101

75102
self.cur.execute('''select * from covidcast''')
76103
result = list(self.cur)
@@ -88,7 +115,40 @@ def test_fill_is_latest_issue(self):
88115
(6, 'src', 'sig', 'day', 'state', 20200301, 'ca',
89116
123, 5, 5, 5, 5, None, 20200302, 1, bytearray(b'0'), bytearray(b'0')),
90117
(7, 'src', 'sig', 'day', 'state', 20200301, 'ca',
118+
123, 5, 9, 8, 7, None, 20200301, 0, bytearray(b'0'), bytearray(b'0')),
119+
(8, 'src', 'sig', 'day', 'state', 20200228, 'ny',
120+
123, 2, 5, 5, 5, None, 20200228, 0, bytearray(b'0'), bytearray(b'0')),
121+
(9, 'src', 'sig', 'day', 'state', 20200228, 'ny',
122+
123, 2, 0, 0, 0, None, 20200229, 1, bytearray(b'1'), bytearray(b'0')),
123+
(10, 'src', 'sig', 'day', 'state', 20200229, 'ny',
124+
123, 6, 0, 0, 0, None, 20200301, 1, bytearray(b'1'), bytearray(b'0')),
125+
(11, 'src', 'sig', 'day', 'state', 20200229, 'ny',
126+
123, 6, 9, 9, 9, None, 20200229, 0, bytearray(b'0'), bytearray(b'0')),
127+
(12, 'src', 'sig', 'day', 'state', 20200301, 'ny',
128+
123, 5, 0, 0, 0, None, 20200303, 2, bytearray(b'1'), bytearray(b'0')),
129+
(13, 'src', 'sig', 'day', 'state', 20200301, 'ny',
130+
123, 5, 5, 5, 5, None, 20200302, 1, bytearray(b'0'), bytearray(b'0')),
131+
(14, 'src', 'sig', 'day', 'state', 20200301, 'ny',
91132
123, 5, 9, 8, 7, None, 20200301, 0, bytearray(b'0'), bytearray(b'0'))
92133
]
93134

135+
if use_filter:
136+
# revert ny is_latest values
137+
for i in range(7, 14):
138+
x = list(expected[i])
139+
x[-2] = bytearray(b'1')
140+
expected[i] = tuple(x)
141+
94142
self.assertEqual(result, expected)
143+
144+
def test_fill_is_latest_issue_by_partition(self):
145+
self._test_fill_is_latest_issue(True, False)
146+
147+
def test_fill_is_latest_issue_not_by_partition(self):
148+
self._test_fill_is_latest_issue(False, False)
149+
150+
def test_fill_is_latest_issue_by_partition_w_filter(self):
151+
self._test_fill_is_latest_issue(True, True)
152+
153+
def test_fill_is_latest_issue_not_by_partition_w_filter(self):
154+
self._test_fill_is_latest_issue(False, True)

src/acquisition/covidcast/fill_is_latest_issue.py

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,28 @@
1111

1212

1313
# partition configuration
14-
PARTITION_VARIABLE = 'geo_value'
15-
PARTITION_SPLITS = ["'05101'", "'101'", "'13071'", "'15007'", "'17161'", "'19039'", "'20123'", "'21213'", "'24035'",
16-
"'27005'", "'28115'", "'29510'", "'31161'", "'35100'", "'37117'", "'39081'", "'41013'", "'44140'",
17-
"'47027'", "'48140'", "'48461'", "'51169'", "'55033'"]
14+
###PARTITION_VARIABLE = 'geo_value'
15+
###PARTITION_SPLITS = ["'05101'", "'101'", "'13071'", "'15007'", "'17161'", "'19039'", "'20123'", "'21213'", "'24035'",
16+
### "'27005'", "'28115'", "'29510'", "'31161'", "'35100'", "'37117'", "'39081'", "'41013'", "'44140'",
17+
### "'47027'", "'48140'", "'48461'", "'51169'", "'55033'"]
1818

19-
def main():
19+
PARTITION_VARIABLE = 'time_value'
20+
PARTITION_SPLITS = [20200201 + i*100 for i in range(9)] # dates for the first of the month from feb - oct 2020
21+
22+
if sorted(PARTITION_SPLITS) != PARTITION_SPLITS:
23+
raise Exception('PARTITION_SPLITS not properly ordered!')
24+
25+
# filtering configuration
26+
_FILTER_CONDITION = "TRUE" # this would indicate no filtering should be done
27+
_FILTER_CONDITION = (
28+
"`time_type` = 'day'" # TODO: do we not care about issues on week-type data?
29+
" AND `source` = 'jhu-csse'" # for fixing is_latest on JHU data
30+
)
31+
32+
_CLEAR_LATEST_BY_PARTITION = True
33+
34+
35+
def main(*, CLEAR_LATEST_BY_PARTITION=_CLEAR_LATEST_BY_PARTITION, FILTER_CONDITION=_FILTER_CONDITION):
2036

2137
u, p = secrets.db.epi
2238
connection = mysql.connector.connect(
@@ -26,7 +42,7 @@ def main():
2642
database='epidata')
2743
cursor = connection.cursor()
2844

29-
set_partition_to_one_query = '''
45+
set_latest_query = '''
3046
UPDATE
3147
(
3248
SELECT
@@ -39,7 +55,6 @@ def main():
3955
MAX(`issue`) AS `issue`
4056
FROM `covidcast`
4157
WHERE
42-
`time_type` = 'day' AND
4358
%s
4459
GROUP BY
4560
`source`,
@@ -54,27 +69,32 @@ def main():
5469
SET `is_latest_issue`=1
5570
'''
5671

57-
set_to_zero_query = '''
72+
clear_latest_query = '''
5873
UPDATE `covidcast`
59-
SET `is_latest_issue` = 0;
74+
SET `is_latest_issue` = 0
75+
WHERE %s;
6076
'''
6177

6278
commit = False
6379
try:
64-
cursor.execute(set_to_zero_query)
65-
for partition_index in range(24):
66-
# constructing the partitoin condition from partition index
80+
if not CLEAR_LATEST_BY_PARTITION:
81+
cursor.execute(clear_latest_query % FILTER_CONDITION)
82+
for partition_index in range(len(PARTITION_SPLITS)+1):
83+
# constructing the partition condition from partition index
6784
ge_condition = 'TRUE' if partition_index == 0 else \
6885
f'`{PARTITION_VARIABLE}` >= {PARTITION_SPLITS[partition_index - 1]}'
6986
l_condition = 'TRUE' if partition_index == len(PARTITION_SPLITS) else \
7087
f'`{PARTITION_VARIABLE}` < {PARTITION_SPLITS[partition_index]}'
71-
partition_condition = f'({ge_condition}) AND ({l_condition})'
88+
partition_condition = f'({FILTER_CONDITION}) AND ({ge_condition}) AND ({l_condition})'
7289

73-
cursor.execute(set_partition_to_one_query % partition_condition)
90+
if CLEAR_LATEST_BY_PARTITION:
91+
cursor.execute(clear_latest_query % partition_condition)
92+
cursor.execute(set_latest_query % partition_condition)
7493

7594
commit = True
7695
except Exception as e:
7796
connection.rollback()
97+
print("exception raised at partition %s (partition index #%s) of column `%s`" % (PARTITION_SPLITS[partition_index], partition_index, PARTITION_VARIABLE))
7898
raise e
7999
finally:
80100
cursor.close()
@@ -83,4 +103,4 @@ def main():
83103
connection.close()
84104

85105
if __name__=='__main__':
86-
main()
106+
main()

0 commit comments

Comments
 (0)