diff --git a/integrations/acquisition/covidcast/test_fill_is_latest_issue.py b/integrations/acquisition/covidcast/test_fill_is_latest_issue.py index b00e1215e..0820888bb 100644 --- a/integrations/acquisition/covidcast/test_fill_is_latest_issue.py +++ b/integrations/acquisition/covidcast/test_fill_is_latest_issue.py @@ -47,9 +47,13 @@ def tearDown(self): self.cur.close() self.cnx.close() - def test_fill_is_latest_issue(self): + def _test_fill_is_latest_issue(self, clbp, use_filter): """Update rows having a stale `direction` field and serve the results.""" + # NOTE: column order is: + # (id, source, signal, time_type, geo_type, time_value, geo_value, + # value_updated_timestamp, value, stderr, sample_size, direction_updated_timestamp, direction, issue, lag, is_latest_issue, is_wip) + self.cur.execute(''' insert into covidcast values (0, 'src', 'sig', 'day', 'state', 20200228, 'ca', @@ -65,12 +69,35 @@ def test_fill_is_latest_issue(self): (0, 'src', 'sig', 'day', 'state', 20200301, 'ca', 123, 5, 5, 5, 5, NULL, 20200302, 1, 1, False), (0, 'src', 'sig', 'day', 'state', 20200301, 'ca', + 123, 5, 9, 8, 7, NULL, 20200301, 0, 1, False), + (0, 'src', 'sig', 'day', 'state', 20200228, 'ny', + 123, 2, 5, 5, 5, NULL, 20200228, 0, 1, False), + (0, 'src', 'sig', 'day', 'state', 20200228, 'ny', + 123, 2, 0, 0, 0, NULL, 20200229, 1, 1, False), + (0, 'src', 'sig', 'day', 'state', 20200229, 'ny', + 123, 6, 0, 0, 0, NULL, 20200301, 1, 1, False), + (0, 'src', 'sig', 'day', 'state', 20200229, 'ny', + 123, 6, 9, 9, 9, NULL, 20200229, 0, 1, False), + (0, 'src', 'sig', 'day', 'state', 20200301, 'ny', + 123, 5, 0, 0, 0, NULL, 20200303, 2, 1, False), + (0, 'src', 'sig', 'day', 'state', 20200301, 'ny', + 123, 5, 5, 5, 5, NULL, 20200302, 1, 1, False), + (0, 'src', 'sig', 'day', 'state', 20200301, 'ny', 123, 5, 9, 8, 7, NULL, 20200301, 0, 1, False) ''') self.cnx.commit() + # NOTE: 'ny' values are identical to the 'ca' values, but with the `geo_value` changed + + if use_filter: + # ignores ny + fc = "`geo_value` = 'ca'" + else: + # wildcard ; does not filter + fc = "TRUE" + # fill is_latest_issue - main() + main(FILTER_CONDITION=fc, CLEAR_LATEST_BY_PARTITION=clbp) self.cur.execute('''select * from covidcast''') result = list(self.cur) @@ -88,7 +115,40 @@ def test_fill_is_latest_issue(self): (6, 'src', 'sig', 'day', 'state', 20200301, 'ca', 123, 5, 5, 5, 5, None, 20200302, 1, bytearray(b'0'), bytearray(b'0')), (7, 'src', 'sig', 'day', 'state', 20200301, 'ca', + 123, 5, 9, 8, 7, None, 20200301, 0, bytearray(b'0'), bytearray(b'0')), + (8, 'src', 'sig', 'day', 'state', 20200228, 'ny', + 123, 2, 5, 5, 5, None, 20200228, 0, bytearray(b'0'), bytearray(b'0')), + (9, 'src', 'sig', 'day', 'state', 20200228, 'ny', + 123, 2, 0, 0, 0, None, 20200229, 1, bytearray(b'1'), bytearray(b'0')), + (10, 'src', 'sig', 'day', 'state', 20200229, 'ny', + 123, 6, 0, 0, 0, None, 20200301, 1, bytearray(b'1'), bytearray(b'0')), + (11, 'src', 'sig', 'day', 'state', 20200229, 'ny', + 123, 6, 9, 9, 9, None, 20200229, 0, bytearray(b'0'), bytearray(b'0')), + (12, 'src', 'sig', 'day', 'state', 20200301, 'ny', + 123, 5, 0, 0, 0, None, 20200303, 2, bytearray(b'1'), bytearray(b'0')), + (13, 'src', 'sig', 'day', 'state', 20200301, 'ny', + 123, 5, 5, 5, 5, None, 20200302, 1, bytearray(b'0'), bytearray(b'0')), + (14, 'src', 'sig', 'day', 'state', 20200301, 'ny', 123, 5, 9, 8, 7, None, 20200301, 0, bytearray(b'0'), bytearray(b'0')) ] + if use_filter: + # revert ny is_latest values + for i in range(7, 14): + x = list(expected[i]) + x[-2] = bytearray(b'1') + expected[i] = tuple(x) + self.assertEqual(result, expected) + + def test_fill_is_latest_issue_by_partition(self): + self._test_fill_is_latest_issue(True, False) + + def test_fill_is_latest_issue_not_by_partition(self): + self._test_fill_is_latest_issue(False, False) + + def test_fill_is_latest_issue_by_partition_w_filter(self): + self._test_fill_is_latest_issue(True, True) + + def test_fill_is_latest_issue_not_by_partition_w_filter(self): + self._test_fill_is_latest_issue(False, True) diff --git a/src/acquisition/covidcast/fill_is_latest_issue.py b/src/acquisition/covidcast/fill_is_latest_issue.py index a6c0b358c..6973e76cb 100644 --- a/src/acquisition/covidcast/fill_is_latest_issue.py +++ b/src/acquisition/covidcast/fill_is_latest_issue.py @@ -11,12 +11,28 @@ # partition configuration -PARTITION_VARIABLE = 'geo_value' -PARTITION_SPLITS = ["'05101'", "'101'", "'13071'", "'15007'", "'17161'", "'19039'", "'20123'", "'21213'", "'24035'", - "'27005'", "'28115'", "'29510'", "'31161'", "'35100'", "'37117'", "'39081'", "'41013'", "'44140'", - "'47027'", "'48140'", "'48461'", "'51169'", "'55033'"] +###PARTITION_VARIABLE = 'geo_value' +###PARTITION_SPLITS = ["'05101'", "'101'", "'13071'", "'15007'", "'17161'", "'19039'", "'20123'", "'21213'", "'24035'", +### "'27005'", "'28115'", "'29510'", "'31161'", "'35100'", "'37117'", "'39081'", "'41013'", "'44140'", +### "'47027'", "'48140'", "'48461'", "'51169'", "'55033'"] -def main(): +PARTITION_VARIABLE = 'time_value' +PARTITION_SPLITS = [20200201 + i*100 for i in range(9)] # dates for the first of the month from feb - oct 2020 + +if sorted(PARTITION_SPLITS) != PARTITION_SPLITS: + raise Exception('PARTITION_SPLITS not properly ordered!') + +# filtering configuration +_FILTER_CONDITION = "TRUE" # this would indicate no filtering should be done +_FILTER_CONDITION = ( + "`time_type` = 'day'" # TODO: do we not care about issues on week-type data? + " AND `source` = 'jhu-csse'" # for fixing is_latest on JHU data +) + +_CLEAR_LATEST_BY_PARTITION = True + + +def main(*, CLEAR_LATEST_BY_PARTITION=_CLEAR_LATEST_BY_PARTITION, FILTER_CONDITION=_FILTER_CONDITION): u, p = secrets.db.epi connection = mysql.connector.connect( @@ -26,7 +42,7 @@ def main(): database='epidata') cursor = connection.cursor() - set_partition_to_one_query = ''' + set_latest_query = ''' UPDATE ( SELECT @@ -39,7 +55,6 @@ def main(): MAX(`issue`) AS `issue` FROM `covidcast` WHERE - `time_type` = 'day' AND %s GROUP BY `source`, @@ -54,27 +69,32 @@ def main(): SET `is_latest_issue`=1 ''' - set_to_zero_query = ''' + clear_latest_query = ''' UPDATE `covidcast` - SET `is_latest_issue` = 0; + SET `is_latest_issue` = 0 + WHERE %s; ''' commit = False try: - cursor.execute(set_to_zero_query) - for partition_index in range(24): - # constructing the partitoin condition from partition index + if not CLEAR_LATEST_BY_PARTITION: + cursor.execute(clear_latest_query % FILTER_CONDITION) + for partition_index in range(len(PARTITION_SPLITS)+1): + # constructing the partition condition from partition index ge_condition = 'TRUE' if partition_index == 0 else \ f'`{PARTITION_VARIABLE}` >= {PARTITION_SPLITS[partition_index - 1]}' l_condition = 'TRUE' if partition_index == len(PARTITION_SPLITS) else \ f'`{PARTITION_VARIABLE}` < {PARTITION_SPLITS[partition_index]}' - partition_condition = f'({ge_condition}) AND ({l_condition})' + partition_condition = f'({FILTER_CONDITION}) AND ({ge_condition}) AND ({l_condition})' - cursor.execute(set_partition_to_one_query % partition_condition) + if CLEAR_LATEST_BY_PARTITION: + cursor.execute(clear_latest_query % partition_condition) + cursor.execute(set_latest_query % partition_condition) commit = True except Exception as e: connection.rollback() + print("exception raised at partition %s (partition index #%s) of column `%s`" % (PARTITION_SPLITS[partition_index], partition_index, PARTITION_VARIABLE)) raise e finally: cursor.close() @@ -83,4 +103,4 @@ def main(): connection.close() if __name__=='__main__': - main() \ No newline at end of file + main()