diff --git a/google_symptoms/delphi_google_symptoms/date_utils.py b/google_symptoms/delphi_google_symptoms/date_utils.py index c5f855e94..8b19d128e 100644 --- a/google_symptoms/delphi_google_symptoms/date_utils.py +++ b/google_symptoms/delphi_google_symptoms/date_utils.py @@ -17,47 +17,36 @@ def generate_patch_dates(params: Dict) -> Dict[date, Tuple[date]]: Parameters ---------- - issue_date - end_date - params + params: dictionary parsed from params.json Returns ------- - dict of date and tuple of date + dict(date: dict(export date range settings)) """ issue_date = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d") end_date = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d") - num_export_days = _generate_num_export_days(params) + num_export_days = params["validation"]["common"].get("span_length", 14) patch_dates = dict() while issue_date <= end_date: - # negate the subtraction done within generate_query_dates - expected_start_dt = issue_date - timedelta(days=num_export_days - PAD_DAYS + 1) - daterange = generate_query_dates(expected_start_dt, issue_date, num_export_days, True) - patch_dates[issue_date] = tuple(daterange) - issue_date += timedelta(days=1) - return patch_dates + global_max_expected_lag = get_max_lag(params) + export_end_date = issue_date - timedelta(days=global_max_expected_lag + 1) + export_start_date = issue_date - timedelta(days=num_export_days + global_max_expected_lag + 1) + patch_dates[issue_date] = { + "export_start_date": export_start_date, + "export_end_date": export_end_date, + "num_export_days": num_export_days, + } -def _generate_num_export_days(params: Dict) -> int: - """ - Generate dates for exporting with possible lag. + issue_date += timedelta(days=1) - Parameters - ---------- - params: dictionary parsed from params.json + return patch_dates - Returns - ------- - number of export days - """ - # Calculate number of days based on what's missing from the API and - # what the validator expects. +def get_max_lag(params: Dict) -> int: + """Determine reporting lag for data source""" max_expected_lag = lag_converter(params["validation"]["common"].get("max_expected_lag", {"all": 4})) - global_max_expected_lag = max(list(max_expected_lag.values())) - num_export_days = params["validation"]["common"].get("span_length", 14) + global_max_expected_lag - return num_export_days - + return max(list(max_expected_lag.values())) def generate_num_export_days(params: Dict, logger) -> [int]: """ @@ -85,7 +74,7 @@ def generate_num_export_days(params: Dict, logger) -> [int]: # Fetch metadata to check how recent each signal is covidcast.use_api_key(params["indicator"]["api_credentials"]) metadata = covidcast.metadata() - # Filter to only those we currently want to produce, ignore any old or deprecated signals + # Filter to only those signals we currently want to produce for `google-symptoms` gs_metadata = metadata[(metadata.data_source == "google-symptoms") & (metadata.signal.isin(sensor_names))] num_export_days = params["indicator"]["num_export_days"] @@ -98,7 +87,8 @@ def generate_num_export_days(params: Dict, logger) -> [int]: num_export_days = (export_end_date - FULL_BKFILL_START_DATE).days + 1 else: latest_date_diff = (datetime.today() - to_datetime(min(gs_metadata.max_time))).days + 1 - expected_date_diff = _generate_num_export_days(params) + global_max_expected_lag = get_max_lag(params) + expected_date_diff = params["validation"]["common"].get("span_length", 14) + global_max_expected_lag if latest_date_diff > expected_date_diff: logger.info(f"Missing dates from: {to_datetime(min(gs_metadata.max_time)).date()}") @@ -138,3 +128,4 @@ def generate_query_dates( retrieve_dates = [start_date - timedelta(days=PAD_DAYS - 1), export_end_date] return retrieve_dates + diff --git a/google_symptoms/delphi_google_symptoms/patch.py b/google_symptoms/delphi_google_symptoms/patch.py index 5e422bbe5..520973a89 100755 --- a/google_symptoms/delphi_google_symptoms/patch.py +++ b/google_symptoms/delphi_google_symptoms/patch.py @@ -67,17 +67,24 @@ def patch(params): patch_dates = generate_patch_dates(params) while issue_date <= end_issue: - daterange = patch_dates[issue_date] logger.info(f"""Running issue {issue_date.strftime("%Y-%m-%d")}""") + + # Output dir setup current_issue_yyyymmdd = issue_date.strftime("%Y%m%d") current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_yyyymmdd}/google-symptom""" makedirs(f"{current_issue_dir}", exist_ok=True) + params["common"]["export_dir"] = f"""{current_issue_dir}""" + params["indicator"]["custom_run"] = True + + date_settings = patch_dates[issue_date] + + params["indicator"]["export_start_date"] = date_settings["export_start_date"].strftime("%Y-%m-%d") + params["indicator"]["export_end_date"] = date_settings["export_end_date"].strftime("%Y-%m-%d") + params["indicator"]["num_export_days"] = date_settings["num_export_days"] - params["indicator"]["export_start_date"] = daterange[0].strftime("%Y-%m-%d") - params["indicator"]["export_end_date"] = daterange[1].strftime("%Y-%m-%d") - params["patch"]["patch_flag"] = True run_module(params, logger) + issue_date += timedelta(days=1) diff --git a/google_symptoms/delphi_google_symptoms/pull.py b/google_symptoms/delphi_google_symptoms/pull.py index 2b74ca10d..4ad7a8442 100644 --- a/google_symptoms/delphi_google_symptoms/pull.py +++ b/google_symptoms/delphi_google_symptoms/pull.py @@ -9,6 +9,7 @@ from google.oauth2 import service_account from .constants import COMBINED_METRIC, DC_FIPS, DTYPE_CONVERSIONS, METRICS, SYMPTOM_SETS +from .date_utils import generate_query_dates # Create map of BigQuery symptom column names to desired column names. colname_map = {"symptom_" + @@ -214,7 +215,7 @@ def initialize_credentials(credentials): pandas_gbq.context.project = credentials.project_id -def pull_gs_data(credentials, export_date_range): +def pull_gs_data(credentials, export_start_date, export_end_date, num_export_days, custom_run_flag): """Pull latest dataset for each geo level and combine. PS: No information for PR @@ -237,7 +238,7 @@ def pull_gs_data(credentials, export_date_range): dict: {"county": pd.DataFrame, "state": pd.DataFrame} """ # Fetch and format dates we want to attempt to retrieve - + export_date_range = generate_query_dates(export_start_date, export_end_date, num_export_days, custom_run_flag) retrieve_dates = format_dates_for_query(export_date_range) initialize_credentials(credentials) diff --git a/google_symptoms/delphi_google_symptoms/run.py b/google_symptoms/delphi_google_symptoms/run.py index 31e9c4509..ed5a47b23 100644 --- a/google_symptoms/delphi_google_symptoms/run.py +++ b/google_symptoms/delphi_google_symptoms/run.py @@ -12,7 +12,7 @@ from delphi_utils import create_export_csv, get_structured_logger from .constants import COMBINED_METRIC, GEO_RESOLUTIONS, SMOOTHERS, SMOOTHERS_MAP -from .date_utils import generate_num_export_days, generate_query_dates +from .date_utils import generate_num_export_days from .geo import geo_map from .pull import pull_gs_data @@ -58,10 +58,9 @@ def run_module(params, logger=None): custom_run_flag = ( False if not params["indicator"].get("custom_run", False) else params["indicator"].get("custom_run", False) ) - export_date_range = generate_query_dates(export_start_date, export_end_date, num_export_days, custom_run_flag) # Pull GS data - dfs = pull_gs_data(params["indicator"]["bigquery_credentials"], export_date_range) + dfs = pull_gs_data(params["indicator"]["bigquery_credentials"], export_start_date, export_end_date, num_export_days, custom_run_flag) for geo_res in GEO_RESOLUTIONS: if geo_res == "state": df_pull = dfs["state"] diff --git a/google_symptoms/tests/test_date_utils.py b/google_symptoms/tests/test_date_utils.py index dcd104cc3..3d358c1e6 100644 --- a/google_symptoms/tests/test_date_utils.py +++ b/google_symptoms/tests/test_date_utils.py @@ -4,9 +4,13 @@ from freezegun import freeze_time from conftest import TEST_DIR, NEW_DATE +import covidcast + from delphi_utils.validator.utils import lag_converter from delphi_google_symptoms.constants import FULL_BKFILL_START_DATE from delphi_google_symptoms.date_utils import generate_query_dates, generate_num_export_days, generate_patch_dates + + class TestDateUtils: @freeze_time("2021-01-05") @@ -36,42 +40,65 @@ def test_generate_query_dates_custom(self): assert set(output) == set(expected) def test_generate_export_dates(self, params, logger, monkeypatch): - import covidcast metadata_df = pd.read_csv(f"{TEST_DIR}/test_data/covid_metadata.csv") monkeypatch.setattr(covidcast, "metadata", lambda: metadata_df) - num_export_days = generate_num_export_days(params, logger) + num_export_days = generate_num_export_days(params, logger) expected_num_export_days = params["indicator"]["num_export_days"] - assert num_export_days == expected_num_export_days def test_generate_export_dates_normal(self, params_w_no_date, logger, monkeypatch): - import covidcast metadata_df = pd.read_csv(f"{TEST_DIR}/test_data/covid_metadata.csv") monkeypatch.setattr(covidcast, "metadata", lambda: metadata_df) + num_export_days = generate_num_export_days(params_w_no_date, logger) - max_expected_lag = lag_converter(params_w_no_date["validation"]["common"].get("max_expected_lag", {"all": 4})) + max_expected_lag = lag_converter(params_w_no_date["validation"]["common"]["max_expected_lag"]) global_max_expected_lag = max(list(max_expected_lag.values())) - expected_num_export_days = params_w_no_date["validation"]["common"].get("span_length", 14) + global_max_expected_lag + expected_num_export_days = params_w_no_date["validation"]["common"]["span_length"] + global_max_expected_lag assert num_export_days == expected_num_export_days def test_generate_export_date_missing(self, params_w_no_date, logger, monkeypatch): - import covidcast metadata_df = pd.read_csv(f"{TEST_DIR}/test_data/covid_metadata_missing.csv") monkeypatch.setattr(covidcast, "metadata", lambda: metadata_df) + num_export_days = generate_num_export_days(params_w_no_date, logger) expected_num_export_days = (date.today() - FULL_BKFILL_START_DATE.date()).days + 1 assert num_export_days == expected_num_export_days - def test_generate_patch_dates(self, params_w_patch, logger, monkeypatch): - import covidcast - metadata_df = pd.read_csv(f"{TEST_DIR}/test_data/covid_metadata_missing.csv") - monkeypatch.setattr(covidcast, "metadata", lambda: metadata_df) - max_expected_lag = lag_converter(params_w_patch["validation"]["common"].get("max_expected_lag", {"all": 4})) + def generate_expected_start_end_dates(self, params_, issue_date): + # Actual dates reported on issue dates June 27-29, 2024, by the old + # version of the google-symptoms indicator + # (https://github.com/cmu-delphi/covidcast-indicators/tree/b338a0962bf3a63f70a83f0b719516f914b098e2). + # The patch module should be able to recreate these dates. + dates_dict = { + "2024-06-27": [ '2024-06-02', '2024-06-03', '2024-06-04', '2024-06-05', '2024-06-06', '2024-06-07', '2024-06-08', '2024-06-09', '2024-06-10', '2024-06-11', '2024-06-12', '2024-06-13', '2024-06-14', '2024-06-15', '2024-06-16', '2024-06-17', '2024-06-18', '2024-06-19', '2024-06-20', '2024-06-21', '2024-06-22'], + "2024-06-28": ['2024-06-03', '2024-06-04', '2024-06-05', '2024-06-06', '2024-06-07', '2024-06-08', '2024-06-09', '2024-06-10', '2024-06-11', '2024-06-12', '2024-06-13', '2024-06-14', '2024-06-15', '2024-06-16', '2024-06-17', '2024-06-18', '2024-06-19', '2024-06-20', '2024-06-21', '2024-06-22', '2024-06-23'], + "2024-06-29": ['2024-06-04', '2024-06-05', '2024-06-06','2024-06-07', '2024-06-08', '2024-06-09', '2024-06-10', '2024-06-11', '2024-06-12', '2024-06-13', '2024-06-14', '2024-06-15', '2024-06-16', '2024-06-17', '2024-06-18', '2024-06-19', '2024-06-20', '2024-06-21', '2024-06-22', '2024-06-23', '2024-06-24'], + } + + dates_dict = { + datetime.strptime(key, "%Y-%m-%d"): [ + datetime.strptime(listvalue, "%Y-%m-%d") for listvalue in value + ] for key, value in dates_dict.items() + } + + dates = dates_dict[issue_date] + + # Raw signals add 6 extra dates of padding for later calculating + # smoothed signals. Since this test is checking an early step in the + # process, before padding has happened, we can drop the first 6 + # dates. + return { + "export_start_date": min(dates[6:21]), + "export_end_date": max(dates[6:21]) + } + + def test_generate_patch_dates(self, params_w_patch, logger): + max_expected_lag = lag_converter(params_w_patch["validation"]["common"]["max_expected_lag"]) global_max_expected_lag = max(list(max_expected_lag.values())) - expected_num_export_days = params_w_patch["validation"]["common"].get("span_length", 14) + global_max_expected_lag + num_export_days = params_w_patch["validation"]["common"]["span_length"] issue_date = datetime.strptime(params_w_patch["patch"]["start_issue"], "%Y-%m-%d") end_issue = datetime.strptime(params_w_patch["patch"]["end_issue"], "%Y-%m-%d") @@ -79,14 +106,11 @@ def test_generate_patch_dates(self, params_w_patch, logger, monkeypatch): patch_date_dict = generate_patch_dates(params_w_patch) while issue_date <= end_issue: - expected_daterange = generate_query_dates( - FULL_BKFILL_START_DATE, - issue_date, - expected_num_export_days, - False - ) # in the patch script the date generated by generate_patch_dates becomes the export_start_date and export_end_date - export_start_date, export_end_date = patch_date_dict[issue_date] - actual_daterange = generate_query_dates(export_start_date, export_end_date, expected_num_export_days, True) - assert set(actual_daterange) == set(expected_daterange) - issue_date += timedelta(days=1) \ No newline at end of file + patch_settings = patch_date_dict[issue_date] + expected_dict = self.generate_expected_start_end_dates(params_w_patch, issue_date) + expected_dict["num_export_days"] = num_export_days # unmodified + + assert patch_settings == expected_dict + + issue_date += timedelta(days=1) diff --git a/google_symptoms/tests/test_patch.py b/google_symptoms/tests/test_patch.py index 19d333593..6b60db9a5 100644 --- a/google_symptoms/tests/test_patch.py +++ b/google_symptoms/tests/test_patch.py @@ -25,27 +25,29 @@ def parse_csv_file(self, file_list: List[str]) -> Tuple[List[datetime]]: return sorted(smoothed_list), sorted(raw_list) def generate_expected_dates(self, params_, smoother, issue_date): - max_expected_lag = lag_converter(params_["validation"]["common"].get("max_expected_lag", {"all": 4})) - global_max_expected_lag = max(list(max_expected_lag.values())) - - if params_["indicator"].get("num_export_days"): - num_export_days = params_["indicator"]["num_export_days"] + # Actual dates reported on issue dates June 27-29, 2024, by the old + # version of the google-symptoms indicator + # (https://github.com/cmu-delphi/covidcast-indicators/tree/b338a0962bf3a63f70a83f0b719516f914b098e2). + # The patch module should be able to recreate these dates. + dates_dict = { + "2024-06-27": [ '2024-06-02', '2024-06-03', '2024-06-04', '2024-06-05', '2024-06-06', '2024-06-07', '2024-06-08', '2024-06-09', '2024-06-10', '2024-06-11', '2024-06-12', '2024-06-13', '2024-06-14', '2024-06-15', '2024-06-16', '2024-06-17', '2024-06-18', '2024-06-19', '2024-06-20', '2024-06-21', '2024-06-22'], + "2024-06-28": ['2024-06-03', '2024-06-04', '2024-06-05', '2024-06-06', '2024-06-07', '2024-06-08', '2024-06-09', '2024-06-10', '2024-06-11', '2024-06-12', '2024-06-13', '2024-06-14', '2024-06-15', '2024-06-16', '2024-06-17', '2024-06-18', '2024-06-19', '2024-06-20', '2024-06-21', '2024-06-22', '2024-06-23'], + "2024-06-29": ['2024-06-04', '2024-06-05', '2024-06-06','2024-06-07', '2024-06-08', '2024-06-09', '2024-06-10', '2024-06-11', '2024-06-12', '2024-06-13', '2024-06-14', '2024-06-15', '2024-06-16', '2024-06-17', '2024-06-18', '2024-06-19', '2024-06-20', '2024-06-21', '2024-06-22', '2024-06-23', '2024-06-24'], + } + + dates_dict = { + datetime.strptime(key, "%Y-%m-%d"): [ + datetime.strptime(listvalue, "%Y-%m-%d") for listvalue in value + ] for key, value in dates_dict.items() + } + + dates = dates_dict[issue_date] + + if smoother == "raw": + return dates else: - num_export_days = params_["validation"]["common"].get("span_length", 14) + global_max_expected_lag - - # mimic date generate as if the issue date was "today" - query_start_date, query_end_date = generate_query_dates( - FULL_BKFILL_START_DATE, - issue_date, - num_export_days, - False - ) - # the smoother in line 82-88 filters out prev seven days - export_start_date = query_start_date + timedelta(days=6) if smoother == "smoothed" else query_start_date - export_end_date = query_end_date - timedelta(days=global_max_expected_lag) - num_export_days = (export_end_date - export_start_date).days + 1 - - return sorted([export_start_date + timedelta(days=x) for x in range(num_export_days)]) + # Smoothed signals drop the first 6 dates. + return dates[6:21] def mocked_patch(self, params_): with mock_patch("delphi_google_symptoms.patch.read_params", return_value=params_), \ @@ -58,9 +60,7 @@ def side_effect(*args, **kwargs): df = state_data_gap pattern = re.compile(r'\d{4}-\d{2}-\d{2}') start_date, end_date = re.findall(pattern, args[0]) - end_date_w_lag = (datetime.strptime(end_date, "%Y-%m-%d") - timedelta(days=4)).strftime("%Y-%m-%d") - return df[(df["date"] >= start_date) & \ - (df["date"] <= end_date_w_lag)] + return df[(df["date"] >= start_date) & (df["date"] <= end_date)] else: return pd.DataFrame() @@ -80,11 +80,15 @@ def side_effect(*args, **kwargs): assert smoothed_dates == expected_smoothed_dates assert raw_dates == expected_raw_dates + shutil.rmtree(issue_dir) + start_date += timedelta(days=1) + def test_patch_default(self, params_w_patch): params_w_patch["indicator"]["num_export_days"] = None self.mocked_patch(params_w_patch) + def test_patch_date_set(self, params_w_patch): self.mocked_patch(params_w_patch) diff --git a/google_symptoms/tests/test_pull.py b/google_symptoms/tests/test_pull.py index b0553d800..baaf9934c 100644 --- a/google_symptoms/tests/test_pull.py +++ b/google_symptoms/tests/test_pull.py @@ -44,7 +44,8 @@ def test_good_file(self, mock_credentials, mock_read_gbq): "20201230", "%Y%m%d") end_date = datetime.combine(date.today(), datetime.min.time()) - dfs = pull_gs_data("", [start_date, end_date]) + dfs = pull_gs_data("", datetime.strptime( + "20201230", "%Y%m%d"), datetime.combine(date.today(), datetime.min.time()), 0, False) for level in ["county", "state"]: df = dfs[level]