Skip to content

Commit 4168db5

Browse files
authored
Merge pull request #1999 from cmu-delphi/1996-patch-google-symptoms
1996 patch google symptoms
2 parents 936a4d5 + 528a1e3 commit 4168db5

15 files changed

+9100
-195
lines changed

google_symptoms/delphi_google_symptoms/constants.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Registry for constants."""
2-
from datetime import timedelta
2+
3+
from datetime import datetime, timedelta
34

45
from delphi_utils import Smoother
56

@@ -108,3 +109,7 @@
108109
'Wyoming': 'wy'}
109110

110111
DC_FIPS = "11001"
112+
113+
FULL_BKFILL_START_DATE = datetime.strptime("2020-02-20", "%Y-%m-%d")
114+
115+
PAD_DAYS = 7
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
"""utility functions for date parsing."""
2+
3+
from datetime import date, datetime, timedelta
4+
from itertools import product
5+
from typing import Dict, List, Union
6+
7+
import covidcast
8+
from delphi_utils.validator.utils import lag_converter
9+
from pandas import to_datetime
10+
11+
from .constants import COMBINED_METRIC, FULL_BKFILL_START_DATE, PAD_DAYS, SMOOTHERS
12+
13+
14+
def generate_patch_dates(params: Dict) -> Dict[date, Dict[str, Union[date, int]]]:
15+
"""
16+
Generate date range for chunking backfilled dates.
17+
18+
Parameters
19+
----------
20+
params: dictionary parsed from params.json
21+
22+
Returns
23+
-------
24+
dict(date: dict(export date range settings))
25+
"""
26+
issue_date = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d")
27+
end_date = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d")
28+
num_export_days = params["validation"]["common"].get("span_length", 14)
29+
30+
patch_dates = dict()
31+
while issue_date <= end_date:
32+
global_max_expected_lag = get_max_lag(params)
33+
export_end_date = issue_date - timedelta(days=global_max_expected_lag + 1)
34+
export_start_date = issue_date - timedelta(days=num_export_days + global_max_expected_lag + 1)
35+
36+
patch_dates[issue_date] = {
37+
"export_start_date": export_start_date,
38+
"export_end_date": export_end_date,
39+
"num_export_days": num_export_days,
40+
}
41+
42+
issue_date += timedelta(days=1)
43+
44+
return patch_dates
45+
46+
47+
def get_max_lag(params: Dict) -> int:
48+
"""Determine reporting lag for data source."""
49+
max_expected_lag = lag_converter(params["validation"]["common"].get("max_expected_lag", {"all": 4}))
50+
return max(list(max_expected_lag.values()))
51+
52+
53+
def generate_num_export_days(params: Dict, logger) -> [int]:
54+
"""
55+
Generate dates for exporting based on current available data.
56+
57+
Parameters
58+
59+
----------
60+
params: dictionary parsed from params.json
61+
62+
Returns
63+
-------
64+
num_export_days: int
65+
"""
66+
# If end_date not specified, use current date.
67+
export_end_date = datetime.strptime(
68+
params["indicator"].get("export_end_date", datetime.strftime(date.today(), "%Y-%m-%d")), "%Y-%m-%d"
69+
)
70+
71+
# Generate a list of signals we expect to produce
72+
sensor_names = set(
73+
"_".join([metric, smoother, "search"]) for metric, smoother in product(COMBINED_METRIC, SMOOTHERS)
74+
)
75+
76+
# Fetch metadata to check how recent each signal is
77+
covidcast.use_api_key(params["indicator"]["api_credentials"])
78+
metadata = covidcast.metadata()
79+
# Filter to only those signals we currently want to produce for `google-symptoms`
80+
gs_metadata = metadata[(metadata.data_source == "google-symptoms") & (metadata.signal.isin(sensor_names))]
81+
82+
num_export_days = params["indicator"]["num_export_days"]
83+
custom_run = False if not params["common"].get("custom_run") else params["common"].get("custom_run", False)
84+
85+
if num_export_days is None and not custom_run:
86+
if sensor_names.difference(set(gs_metadata.signal)):
87+
# If any signal not in metadata yet, we need to backfill its full history.
88+
logger.warning("Signals missing in the epidata; backfilling full history")
89+
num_export_days = (export_end_date - FULL_BKFILL_START_DATE).days + 1
90+
else:
91+
latest_date_diff = (datetime.today() - to_datetime(min(gs_metadata.max_time))).days + 1
92+
global_max_expected_lag = get_max_lag(params)
93+
expected_date_diff = params["validation"]["common"].get("span_length", 14) + global_max_expected_lag
94+
95+
if latest_date_diff > expected_date_diff:
96+
logger.info(f"Missing dates from: {to_datetime(min(gs_metadata.max_time)).date()}")
97+
98+
num_export_days = expected_date_diff
99+
100+
return num_export_days
101+
102+
103+
def generate_query_dates(
104+
export_start_date: date, export_end_date: date, num_export_days: int, custom_run_flag: bool
105+
) -> List[date]:
106+
"""Produce date range to retrieve data for.
107+
108+
Calculate start of date range as a static offset from the end date.
109+
Pad date range by an additional `PAD_DAYS` days before the earliest date to
110+
produce data for calculating smoothed estimates.
111+
112+
Parameters
113+
----------
114+
export_start_date: date
115+
first date to retrieve data for
116+
export_end_date: date
117+
last date to retrieve data for
118+
num_export_days: int
119+
number of days before end date to export
120+
custom_run_flag: bool
121+
flag to indicate if the date should be taken from export or calculated based on if it's a patch or regular run
122+
123+
Returns
124+
-------
125+
List[date, date]
126+
"""
127+
start_date = export_start_date
128+
if not custom_run_flag:
129+
start_date = export_end_date - timedelta(days=num_export_days)
130+
retrieve_dates = [start_date - timedelta(days=PAD_DAYS - 1), export_end_date]
131+
132+
return retrieve_dates
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
"""
2+
This module is used for patching data in the delphi_google_symptom package.
3+
4+
To use this module, you need to specify the range of issue dates in params.json, like so:
5+
6+
{
7+
"common": {
8+
...
9+
"custom_run": true
10+
},
11+
"validation": {
12+
...
13+
},
14+
"patch": {
15+
"patch_dir": ".../covidcast-indicators/google-symptoms/AprilPatch",
16+
"start_issue": "2024-04-20",
17+
"end_issue": "2024-04-21"
18+
}
19+
}
20+
21+
It will generate data for that range of issue dates, and store them in batch issue format:
22+
[params patch_dir]/issue_[issue-date]/google-symptoms/xxx.csv
23+
"""
24+
25+
from datetime import datetime, timedelta
26+
from os import makedirs
27+
28+
from delphi_utils import get_structured_logger, read_params
29+
30+
from .date_utils import generate_patch_dates
31+
from .run import run_module
32+
33+
34+
def patch(params):
35+
"""
36+
Run the google symptoms indicator for a range of issue dates.
37+
38+
Parameters
39+
----------
40+
params
41+
Dictionary containing indicator configuration. Expected to have the following structure:
42+
- "common":
43+
- "export_dir": str, directory to write output
44+
- "log_exceptions" (optional): bool, whether to log exceptions to file
45+
- "log_filename" (optional): str, name of file to write logs
46+
- "indicator":
47+
- "export_start_date": str, YYYY-MM-DD format, date from which to export data
48+
- "num_export_days": int, number of days before end date (today) to export
49+
- "path_to_bigquery_credentials": str, path to BigQuery API key and service account
50+
JSON file
51+
- "patch": Only used for patching data
52+
- "start_date": str, YYYY-MM-DD format, first issue date
53+
- "end_date": str, YYYY-MM-DD format, last issue date
54+
- "patch_dir": str, directory to write all issues output
55+
"""
56+
logger = get_structured_logger("delphi_google_symptom.patch", filename=params["common"]["log_filename"])
57+
58+
issue_date = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d")
59+
end_issue = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d")
60+
61+
logger.info(f"""Start patching {params["patch"]["patch_dir"]}""")
62+
logger.info(f"""Start issue: {issue_date.strftime("%Y-%m-%d")}""")
63+
logger.info(f"""End issue: {end_issue.strftime("%Y-%m-%d")}""")
64+
65+
makedirs(params["patch"]["patch_dir"], exist_ok=True)
66+
67+
patch_dates = generate_patch_dates(params)
68+
69+
while issue_date <= end_issue:
70+
logger.info(f"""Running issue {issue_date.strftime("%Y-%m-%d")}""")
71+
72+
# Output dir setup
73+
current_issue_yyyymmdd = issue_date.strftime("%Y%m%d")
74+
current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_yyyymmdd}/google-symptom"""
75+
makedirs(f"{current_issue_dir}", exist_ok=True)
76+
77+
params["common"]["export_dir"] = f"""{current_issue_dir}"""
78+
params["indicator"]["custom_run"] = True
79+
80+
date_settings = patch_dates[issue_date]
81+
82+
params["indicator"]["export_start_date"] = date_settings["export_start_date"].strftime("%Y-%m-%d")
83+
params["indicator"]["export_end_date"] = date_settings["export_end_date"].strftime("%Y-%m-%d")
84+
params["indicator"]["num_export_days"] = date_settings["num_export_days"]
85+
86+
run_module(params, logger)
87+
88+
issue_date += timedelta(days=1)
89+
90+
91+
if __name__ == "__main__":
92+
patch(read_params())

google_symptoms/delphi_google_symptoms/pull.py

Lines changed: 9 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
"""Retrieve data and wrangle into appropriate format."""
22
# -*- coding: utf-8 -*-
33
import re
4-
from datetime import date, datetime, timedelta # pylint: disable=unused-import
5-
import pandas_gbq
6-
from google.oauth2 import service_account
4+
from datetime import date, datetime # pylint: disable=unused-import
5+
76
import numpy as np
87
import pandas as pd
8+
import pandas_gbq
9+
from google.oauth2 import service_account
910

10-
from .constants import DC_FIPS, METRICS, COMBINED_METRIC, SYMPTOM_SETS, DTYPE_CONVERSIONS
11-
11+
from .constants import COMBINED_METRIC, DC_FIPS, DTYPE_CONVERSIONS, METRICS, SYMPTOM_SETS
12+
from .date_utils import generate_query_dates
1213

1314
# Create map of BigQuery symptom column names to desired column names.
1415
colname_map = {"symptom_" +
@@ -95,45 +96,6 @@ def preprocess(df, level):
9596
return df
9697

9798

98-
def get_date_range(export_start_date, export_end_date, num_export_days):
99-
"""Produce date range to retrieve data for.
100-
101-
Calculate start of date range as a static offset from the end date.
102-
Pad date range by an additional 7 days before the earliest date to
103-
produce data for calculating smoothed estimates.
104-
105-
Parameters
106-
----------
107-
export_start_date: date
108-
first date to retrieve data for
109-
export_end_date: date
110-
last date to retrieve data for
111-
num_export_days: int
112-
number of days before end date to export
113-
114-
Returns
115-
-------
116-
list
117-
"""
118-
PAD_DAYS = 7
119-
120-
if num_export_days == "all":
121-
# Get all dates since export_start_date.
122-
start_date = export_start_date
123-
else:
124-
# Don't fetch data before the user-set start date.
125-
start_date = max(
126-
export_end_date - timedelta(days=num_export_days),
127-
export_start_date
128-
)
129-
130-
retrieve_dates = [
131-
start_date - timedelta(days=PAD_DAYS - 1),
132-
export_end_date]
133-
134-
return retrieve_dates
135-
136-
13799
def format_dates_for_query(date_list):
138100
"""Format list of dates as needed for query.
139101
@@ -224,7 +186,6 @@ def pull_gs_data_one_geolevel(level, date_range):
224186
query = produce_query(level, date_range)
225187

226188
df = pandas_gbq.read_gbq(query, progress_bar_type=None, dtypes = DTYPE_CONVERSIONS)
227-
228189
if len(df) == 0:
229190
df = pd.DataFrame(
230191
columns=["open_covid_region_code", "date"] +
@@ -254,7 +215,7 @@ def initialize_credentials(credentials):
254215
pandas_gbq.context.project = credentials.project_id
255216

256217

257-
def pull_gs_data(credentials, export_start_date, export_end_date, num_export_days):
218+
def pull_gs_data(credentials, export_start_date, export_end_date, num_export_days, custom_run_flag):
258219
"""Pull latest dataset for each geo level and combine.
259220
260221
PS: No information for PR
@@ -277,9 +238,8 @@ def pull_gs_data(credentials, export_start_date, export_end_date, num_export_day
277238
dict: {"county": pd.DataFrame, "state": pd.DataFrame}
278239
"""
279240
# Fetch and format dates we want to attempt to retrieve
280-
retrieve_dates = get_date_range(
281-
export_start_date, export_end_date, num_export_days)
282-
retrieve_dates = format_dates_for_query(retrieve_dates)
241+
export_date_range = generate_query_dates(export_start_date, export_end_date, num_export_days, custom_run_flag)
242+
retrieve_dates = format_dates_for_query(export_date_range)
283243

284244
initialize_credentials(credentials)
285245

0 commit comments

Comments
 (0)