Skip to content

google-symptoms patching fn should not return data for the 4 days immediately before the issue date #2007

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
49 changes: 20 additions & 29 deletions google_symptoms/delphi_google_symptoms/date_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,36 @@ def generate_patch_dates(params: Dict) -> Dict[date, Tuple[date]]:

Parameters
----------
issue_date
end_date
params
params: dictionary parsed from params.json

Returns
-------
dict of date and tuple of date
dict(date: dict(export date range settings))
"""
issue_date = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d")
end_date = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d")
num_export_days = _generate_num_export_days(params)
num_export_days = params["validation"]["common"].get("span_length", 14)

patch_dates = dict()
while issue_date <= end_date:
# negate the subtraction done within generate_query_dates
expected_start_dt = issue_date - timedelta(days=num_export_days - PAD_DAYS + 1)
daterange = generate_query_dates(expected_start_dt, issue_date, num_export_days, True)
patch_dates[issue_date] = tuple(daterange)
issue_date += timedelta(days=1)
return patch_dates
global_max_expected_lag = get_max_lag(params)
export_end_date = issue_date - timedelta(days=global_max_expected_lag + 1)
export_start_date = issue_date - timedelta(days=num_export_days + global_max_expected_lag + 1)

patch_dates[issue_date] = {
"export_start_date": export_start_date,
"export_end_date": export_end_date,
"num_export_days": num_export_days,
}

def _generate_num_export_days(params: Dict) -> int:
"""
Generate dates for exporting with possible lag.
issue_date += timedelta(days=1)

Parameters
----------
params: dictionary parsed from params.json
return patch_dates

Returns
-------
number of export days
"""
# Calculate number of days based on what's missing from the API and
# what the validator expects.
def get_max_lag(params: Dict) -> int:
"""Determine reporting lag for data source"""
max_expected_lag = lag_converter(params["validation"]["common"].get("max_expected_lag", {"all": 4}))
global_max_expected_lag = max(list(max_expected_lag.values()))
num_export_days = params["validation"]["common"].get("span_length", 14) + global_max_expected_lag
return num_export_days

return max(list(max_expected_lag.values()))

def generate_num_export_days(params: Dict, logger) -> [int]:
"""
Expand Down Expand Up @@ -85,7 +74,7 @@ def generate_num_export_days(params: Dict, logger) -> [int]:
# Fetch metadata to check how recent each signal is
covidcast.use_api_key(params["indicator"]["api_credentials"])
metadata = covidcast.metadata()
# Filter to only those we currently want to produce, ignore any old or deprecated signals
# Filter to only those signals we currently want to produce for `google-symptoms`
gs_metadata = metadata[(metadata.data_source == "google-symptoms") & (metadata.signal.isin(sensor_names))]

num_export_days = params["indicator"]["num_export_days"]
Expand All @@ -98,7 +87,8 @@ def generate_num_export_days(params: Dict, logger) -> [int]:
num_export_days = (export_end_date - FULL_BKFILL_START_DATE).days + 1
else:
latest_date_diff = (datetime.today() - to_datetime(min(gs_metadata.max_time))).days + 1
expected_date_diff = _generate_num_export_days(params)
global_max_expected_lag = get_max_lag(params)
expected_date_diff = params["validation"]["common"].get("span_length", 14) + global_max_expected_lag

if latest_date_diff > expected_date_diff:
logger.info(f"Missing dates from: {to_datetime(min(gs_metadata.max_time)).date()}")
Expand Down Expand Up @@ -138,3 +128,4 @@ def generate_query_dates(
retrieve_dates = [start_date - timedelta(days=PAD_DAYS - 1), export_end_date]

return retrieve_dates

15 changes: 11 additions & 4 deletions google_symptoms/delphi_google_symptoms/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,24 @@ def patch(params):
patch_dates = generate_patch_dates(params)

while issue_date <= end_issue:
daterange = patch_dates[issue_date]
logger.info(f"""Running issue {issue_date.strftime("%Y-%m-%d")}""")

# Output dir setup
current_issue_yyyymmdd = issue_date.strftime("%Y%m%d")
current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_yyyymmdd}/google-symptom"""
makedirs(f"{current_issue_dir}", exist_ok=True)

params["common"]["export_dir"] = f"""{current_issue_dir}"""
params["indicator"]["custom_run"] = True

date_settings = patch_dates[issue_date]

params["indicator"]["export_start_date"] = date_settings["export_start_date"].strftime("%Y-%m-%d")
params["indicator"]["export_end_date"] = date_settings["export_end_date"].strftime("%Y-%m-%d")
params["indicator"]["num_export_days"] = date_settings["num_export_days"]

params["indicator"]["export_start_date"] = daterange[0].strftime("%Y-%m-%d")
params["indicator"]["export_end_date"] = daterange[1].strftime("%Y-%m-%d")
params["patch"]["patch_flag"] = True
run_module(params, logger)

issue_date += timedelta(days=1)


Expand Down
5 changes: 3 additions & 2 deletions google_symptoms/delphi_google_symptoms/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from google.oauth2 import service_account

from .constants import COMBINED_METRIC, DC_FIPS, DTYPE_CONVERSIONS, METRICS, SYMPTOM_SETS
from .date_utils import generate_query_dates

# Create map of BigQuery symptom column names to desired column names.
colname_map = {"symptom_" +
Expand Down Expand Up @@ -214,7 +215,7 @@ def initialize_credentials(credentials):
pandas_gbq.context.project = credentials.project_id


def pull_gs_data(credentials, export_date_range):
def pull_gs_data(credentials, export_start_date, export_end_date, num_export_days, custom_run_flag):
"""Pull latest dataset for each geo level and combine.

PS: No information for PR
Expand All @@ -237,7 +238,7 @@ def pull_gs_data(credentials, export_date_range):
dict: {"county": pd.DataFrame, "state": pd.DataFrame}
"""
# Fetch and format dates we want to attempt to retrieve

export_date_range = generate_query_dates(export_start_date, export_end_date, num_export_days, custom_run_flag)
retrieve_dates = format_dates_for_query(export_date_range)

initialize_credentials(credentials)
Expand Down
5 changes: 2 additions & 3 deletions google_symptoms/delphi_google_symptoms/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from delphi_utils import create_export_csv, get_structured_logger

from .constants import COMBINED_METRIC, GEO_RESOLUTIONS, SMOOTHERS, SMOOTHERS_MAP
from .date_utils import generate_num_export_days, generate_query_dates
from .date_utils import generate_num_export_days
from .geo import geo_map
from .pull import pull_gs_data

Expand Down Expand Up @@ -58,10 +58,9 @@ def run_module(params, logger=None):
custom_run_flag = (
False if not params["indicator"].get("custom_run", False) else params["indicator"].get("custom_run", False)
)
export_date_range = generate_query_dates(export_start_date, export_end_date, num_export_days, custom_run_flag)

# Pull GS data
dfs = pull_gs_data(params["indicator"]["bigquery_credentials"], export_date_range)
dfs = pull_gs_data(params["indicator"]["bigquery_credentials"], export_start_date, export_end_date, num_export_days, custom_run_flag)
for geo_res in GEO_RESOLUTIONS:
if geo_res == "state":
df_pull = dfs["state"]
Expand Down
70 changes: 47 additions & 23 deletions google_symptoms/tests/test_date_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
from freezegun import freeze_time
from conftest import TEST_DIR, NEW_DATE

import covidcast

from delphi_utils.validator.utils import lag_converter
from delphi_google_symptoms.constants import FULL_BKFILL_START_DATE
from delphi_google_symptoms.date_utils import generate_query_dates, generate_num_export_days, generate_patch_dates


class TestDateUtils:

@freeze_time("2021-01-05")
Expand Down Expand Up @@ -36,57 +40,77 @@ def test_generate_query_dates_custom(self):
assert set(output) == set(expected)

def test_generate_export_dates(self, params, logger, monkeypatch):
import covidcast
metadata_df = pd.read_csv(f"{TEST_DIR}/test_data/covid_metadata.csv")
monkeypatch.setattr(covidcast, "metadata", lambda: metadata_df)
num_export_days = generate_num_export_days(params, logger)

num_export_days = generate_num_export_days(params, logger)
expected_num_export_days = params["indicator"]["num_export_days"]

assert num_export_days == expected_num_export_days

def test_generate_export_dates_normal(self, params_w_no_date, logger, monkeypatch):
import covidcast
metadata_df = pd.read_csv(f"{TEST_DIR}/test_data/covid_metadata.csv")
monkeypatch.setattr(covidcast, "metadata", lambda: metadata_df)

num_export_days = generate_num_export_days(params_w_no_date, logger)

max_expected_lag = lag_converter(params_w_no_date["validation"]["common"].get("max_expected_lag", {"all": 4}))
max_expected_lag = lag_converter(params_w_no_date["validation"]["common"]["max_expected_lag"])
global_max_expected_lag = max(list(max_expected_lag.values()))
expected_num_export_days = params_w_no_date["validation"]["common"].get("span_length", 14) + global_max_expected_lag
expected_num_export_days = params_w_no_date["validation"]["common"]["span_length"] + global_max_expected_lag

assert num_export_days == expected_num_export_days

def test_generate_export_date_missing(self, params_w_no_date, logger, monkeypatch):
import covidcast
metadata_df = pd.read_csv(f"{TEST_DIR}/test_data/covid_metadata_missing.csv")
monkeypatch.setattr(covidcast, "metadata", lambda: metadata_df)

num_export_days = generate_num_export_days(params_w_no_date, logger)
expected_num_export_days = (date.today() - FULL_BKFILL_START_DATE.date()).days + 1
assert num_export_days == expected_num_export_days

def test_generate_patch_dates(self, params_w_patch, logger, monkeypatch):
import covidcast
metadata_df = pd.read_csv(f"{TEST_DIR}/test_data/covid_metadata_missing.csv")
monkeypatch.setattr(covidcast, "metadata", lambda: metadata_df)
max_expected_lag = lag_converter(params_w_patch["validation"]["common"].get("max_expected_lag", {"all": 4}))
def generate_expected_start_end_dates(self, params_, issue_date):
# Actual dates reported on issue dates June 27-29, 2024, by the old
# version of the google-symptoms indicator
# (https://github.com/cmu-delphi/covidcast-indicators/tree/b338a0962bf3a63f70a83f0b719516f914b098e2).
# The patch module should be able to recreate these dates.
dates_dict = {
"2024-06-27": [ '2024-06-02', '2024-06-03', '2024-06-04', '2024-06-05', '2024-06-06', '2024-06-07', '2024-06-08', '2024-06-09', '2024-06-10', '2024-06-11', '2024-06-12', '2024-06-13', '2024-06-14', '2024-06-15', '2024-06-16', '2024-06-17', '2024-06-18', '2024-06-19', '2024-06-20', '2024-06-21', '2024-06-22'],
"2024-06-28": ['2024-06-03', '2024-06-04', '2024-06-05', '2024-06-06', '2024-06-07', '2024-06-08', '2024-06-09', '2024-06-10', '2024-06-11', '2024-06-12', '2024-06-13', '2024-06-14', '2024-06-15', '2024-06-16', '2024-06-17', '2024-06-18', '2024-06-19', '2024-06-20', '2024-06-21', '2024-06-22', '2024-06-23'],
"2024-06-29": ['2024-06-04', '2024-06-05', '2024-06-06','2024-06-07', '2024-06-08', '2024-06-09', '2024-06-10', '2024-06-11', '2024-06-12', '2024-06-13', '2024-06-14', '2024-06-15', '2024-06-16', '2024-06-17', '2024-06-18', '2024-06-19', '2024-06-20', '2024-06-21', '2024-06-22', '2024-06-23', '2024-06-24'],
}

dates_dict = {
datetime.strptime(key, "%Y-%m-%d"): [
datetime.strptime(listvalue, "%Y-%m-%d") for listvalue in value
] for key, value in dates_dict.items()
}

dates = dates_dict[issue_date]

# Raw signals add 6 extra dates of padding for later calculating
# smoothed signals. Since this test is checking an early step in the
# process, before padding has happened, we can drop the first 6
# dates.
return {
"export_start_date": min(dates[6:21]),
"export_end_date": max(dates[6:21])
}

def test_generate_patch_dates(self, params_w_patch, logger):
max_expected_lag = lag_converter(params_w_patch["validation"]["common"]["max_expected_lag"])
global_max_expected_lag = max(list(max_expected_lag.values()))
expected_num_export_days = params_w_patch["validation"]["common"].get("span_length", 14) + global_max_expected_lag
num_export_days = params_w_patch["validation"]["common"]["span_length"]

issue_date = datetime.strptime(params_w_patch["patch"]["start_issue"], "%Y-%m-%d")
end_issue = datetime.strptime(params_w_patch["patch"]["end_issue"], "%Y-%m-%d")

patch_date_dict = generate_patch_dates(params_w_patch)

while issue_date <= end_issue:
expected_daterange = generate_query_dates(
FULL_BKFILL_START_DATE,
issue_date,
expected_num_export_days,
False
)
# in the patch script the date generated by generate_patch_dates becomes the export_start_date and export_end_date
export_start_date, export_end_date = patch_date_dict[issue_date]
actual_daterange = generate_query_dates(export_start_date, export_end_date, expected_num_export_days, True)
assert set(actual_daterange) == set(expected_daterange)
issue_date += timedelta(days=1)
patch_settings = patch_date_dict[issue_date]
expected_dict = self.generate_expected_start_end_dates(params_w_patch, issue_date)
expected_dict["num_export_days"] = num_export_days # unmodified

assert patch_settings == expected_dict

issue_date += timedelta(days=1)
50 changes: 27 additions & 23 deletions google_symptoms/tests/test_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,29 @@ def parse_csv_file(self, file_list: List[str]) -> Tuple[List[datetime]]:
return sorted(smoothed_list), sorted(raw_list)

def generate_expected_dates(self, params_, smoother, issue_date):
max_expected_lag = lag_converter(params_["validation"]["common"].get("max_expected_lag", {"all": 4}))
global_max_expected_lag = max(list(max_expected_lag.values()))

if params_["indicator"].get("num_export_days"):
num_export_days = params_["indicator"]["num_export_days"]
# Actual dates reported on issue dates June 27-29, 2024, by the old
# version of the google-symptoms indicator
# (https://github.com/cmu-delphi/covidcast-indicators/tree/b338a0962bf3a63f70a83f0b719516f914b098e2).
# The patch module should be able to recreate these dates.
dates_dict = {
"2024-06-27": [ '2024-06-02', '2024-06-03', '2024-06-04', '2024-06-05', '2024-06-06', '2024-06-07', '2024-06-08', '2024-06-09', '2024-06-10', '2024-06-11', '2024-06-12', '2024-06-13', '2024-06-14', '2024-06-15', '2024-06-16', '2024-06-17', '2024-06-18', '2024-06-19', '2024-06-20', '2024-06-21', '2024-06-22'],
"2024-06-28": ['2024-06-03', '2024-06-04', '2024-06-05', '2024-06-06', '2024-06-07', '2024-06-08', '2024-06-09', '2024-06-10', '2024-06-11', '2024-06-12', '2024-06-13', '2024-06-14', '2024-06-15', '2024-06-16', '2024-06-17', '2024-06-18', '2024-06-19', '2024-06-20', '2024-06-21', '2024-06-22', '2024-06-23'],
"2024-06-29": ['2024-06-04', '2024-06-05', '2024-06-06','2024-06-07', '2024-06-08', '2024-06-09', '2024-06-10', '2024-06-11', '2024-06-12', '2024-06-13', '2024-06-14', '2024-06-15', '2024-06-16', '2024-06-17', '2024-06-18', '2024-06-19', '2024-06-20', '2024-06-21', '2024-06-22', '2024-06-23', '2024-06-24'],
}

dates_dict = {
datetime.strptime(key, "%Y-%m-%d"): [
datetime.strptime(listvalue, "%Y-%m-%d") for listvalue in value
] for key, value in dates_dict.items()
}

dates = dates_dict[issue_date]

if smoother == "raw":
return dates
else:
num_export_days = params_["validation"]["common"].get("span_length", 14) + global_max_expected_lag

# mimic date generate as if the issue date was "today"
query_start_date, query_end_date = generate_query_dates(
FULL_BKFILL_START_DATE,
issue_date,
num_export_days,
False
)
# the smoother in line 82-88 filters out prev seven days
export_start_date = query_start_date + timedelta(days=6) if smoother == "smoothed" else query_start_date
export_end_date = query_end_date - timedelta(days=global_max_expected_lag)
num_export_days = (export_end_date - export_start_date).days + 1

return sorted([export_start_date + timedelta(days=x) for x in range(num_export_days)])
# Smoothed signals drop the first 6 dates.
return dates[6:21]

def mocked_patch(self, params_):
with mock_patch("delphi_google_symptoms.patch.read_params", return_value=params_), \
Expand All @@ -58,9 +60,7 @@ def side_effect(*args, **kwargs):
df = state_data_gap
pattern = re.compile(r'\d{4}-\d{2}-\d{2}')
start_date, end_date = re.findall(pattern, args[0])
end_date_w_lag = (datetime.strptime(end_date, "%Y-%m-%d") - timedelta(days=4)).strftime("%Y-%m-%d")
return df[(df["date"] >= start_date) & \
(df["date"] <= end_date_w_lag)]
return df[(df["date"] >= start_date) & (df["date"] <= end_date)]
else:
return pd.DataFrame()

Expand All @@ -80,11 +80,15 @@ def side_effect(*args, **kwargs):

assert smoothed_dates == expected_smoothed_dates
assert raw_dates == expected_raw_dates

shutil.rmtree(issue_dir)

start_date += timedelta(days=1)

def test_patch_default(self, params_w_patch):
params_w_patch["indicator"]["num_export_days"] = None
self.mocked_patch(params_w_patch)

def test_patch_date_set(self, params_w_patch):
self.mocked_patch(params_w_patch)

Expand Down
3 changes: 2 additions & 1 deletion google_symptoms/tests/test_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ def test_good_file(self, mock_credentials, mock_read_gbq):
"20201230", "%Y%m%d")
end_date = datetime.combine(date.today(), datetime.min.time())

dfs = pull_gs_data("", [start_date, end_date])
dfs = pull_gs_data("", datetime.strptime(
"20201230", "%Y%m%d"), datetime.combine(date.today(), datetime.min.time()), 0, False)

for level in ["county", "state"]:
df = dfs[level]
Expand Down