Skip to content

1996 patch google symptoms #1999

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 37 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
111000e
refactoring date parsing to seperate file
aysim319 Jul 16, 2024
2ca8867
first implimentation
aysim319 Jul 19, 2024
6497f8c
small cleanup and adding missing data
aysim319 Jul 19, 2024
963bdc6
changed wording and some logic
aysim319 Jul 22, 2024
33ae858
made test more explicit
aysim319 Jul 22, 2024
17e0fc5
add missing file
aysim319 Jul 22, 2024
b56fbae
cleaning up tests
aysim319 Jul 22, 2024
71c6e96
consolidating tests into one
aysim319 Jul 22, 2024
ba13c66
fixed wording
aysim319 Jul 22, 2024
c115c58
consolidating test/fixture
aysim319 Jul 23, 2024
6cbdaad
clean up test
aysim319 Jul 23, 2024
11f983b
Update google_symptoms/delphi_google_symptoms/patch.py
aysim319 Jul 23, 2024
c2b33db
clean up lint
aysim319 Jul 23, 2024
8125450
remove teardown
aysim319 Jul 23, 2024
6721ea5
cleaning and renaming
aysim319 Jul 23, 2024
5018b79
more explicit branching for regular run vs patching, renaming, cleaning
aysim319 Jul 23, 2024
36bee6a
cleanup and rewrite logic
aysim319 Jul 24, 2024
1cb791b
changes based on suggestion
aysim319 Jul 24, 2024
0e14bf1
standardizing output dir in test
aysim319 Jul 24, 2024
e181f54
changing back to session scope
aysim319 Jul 25, 2024
ff22993
more logging and update params
aysim319 Jul 26, 2024
cbebcc3
adding back conditional for num_export_days
aysim319 Jul 26, 2024
240fe5d
test_run does not like fixtures
aysim319 Jul 26, 2024
dd819a7
make deep copies of params fixtures
nmdefries Jul 26, 2024
20c14d9
linting
nmdefries Jul 26, 2024
a3a55ae
test patch check if num_export_days is set or not
nmdefries Jul 26, 2024
eb6a9be
patch fn takes params as an arg, like run_module
nmdefries Jul 29, 2024
46b6317
list values added using actual day as an example
nmdefries Jul 29, 2024
5532aac
move pull date creation into pull_gs_data
nmdefries Jul 29, 2024
d209a9c
don't modify num_export_days; define start/end date in terms of it an…
nmdefries Jul 30, 2024
66aa2cd
refer to params by name; patch_flag -> custom_run flag
nmdefries Jul 30, 2024
0708a22
use test data based on recent actual indicator run
nmdefries Jul 30, 2024
d0d3a7a
say where gold test data came from
nmdefries Jul 30, 2024
766f49e
not all date_utils tests need metadata
nmdefries Jul 30, 2024
a04bdea
formatting
nmdefries Jul 30, 2024
6fce6d8
fn docs
nmdefries Jul 30, 2024
528a1e3
lint and cleanup
aysim319 Jul 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion google_symptoms/delphi_google_symptoms/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Registry for constants."""
from datetime import timedelta

from datetime import datetime, timedelta

from delphi_utils import Smoother

Expand Down Expand Up @@ -108,3 +109,7 @@
'Wyoming': 'wy'}

DC_FIPS = "11001"

FULL_BKFILL_START_DATE = datetime.strptime("2020-02-20", "%Y-%m-%d")

PAD_DAYS = 7
132 changes: 132 additions & 0 deletions google_symptoms/delphi_google_symptoms/date_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""utility functions for date parsing."""

from datetime import date, datetime, timedelta
from itertools import product
from typing import Dict, List, Union

import covidcast
from delphi_utils.validator.utils import lag_converter
from pandas import to_datetime

from .constants import COMBINED_METRIC, FULL_BKFILL_START_DATE, PAD_DAYS, SMOOTHERS


def generate_patch_dates(params: Dict) -> Dict[date, Dict[str, Union[date, int]]]:
"""
Generate date range for chunking backfilled dates.

Parameters
----------
params: dictionary parsed from params.json

Returns
-------
dict(date: dict(export date range settings))
"""
issue_date = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d")
end_date = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d")
num_export_days = params["validation"]["common"].get("span_length", 14)

patch_dates = dict()
while issue_date <= end_date:
global_max_expected_lag = get_max_lag(params)
export_end_date = issue_date - timedelta(days=global_max_expected_lag + 1)
export_start_date = issue_date - timedelta(days=num_export_days + global_max_expected_lag + 1)

patch_dates[issue_date] = {
"export_start_date": export_start_date,
"export_end_date": export_end_date,
"num_export_days": num_export_days,
}

issue_date += timedelta(days=1)

return patch_dates


def get_max_lag(params: Dict) -> int:
"""Determine reporting lag for data source."""
max_expected_lag = lag_converter(params["validation"]["common"].get("max_expected_lag", {"all": 4}))
return max(list(max_expected_lag.values()))


def generate_num_export_days(params: Dict, logger) -> [int]:
"""
Generate dates for exporting based on current available data.

Parameters

----------
params: dictionary parsed from params.json

Returns
-------
num_export_days: int
"""
# If end_date not specified, use current date.
export_end_date = datetime.strptime(
params["indicator"].get("export_end_date", datetime.strftime(date.today(), "%Y-%m-%d")), "%Y-%m-%d"
)

# Generate a list of signals we expect to produce
sensor_names = set(
"_".join([metric, smoother, "search"]) for metric, smoother in product(COMBINED_METRIC, SMOOTHERS)
)

# Fetch metadata to check how recent each signal is
covidcast.use_api_key(params["indicator"]["api_credentials"])
metadata = covidcast.metadata()
# Filter to only those signals we currently want to produce for `google-symptoms`
gs_metadata = metadata[(metadata.data_source == "google-symptoms") & (metadata.signal.isin(sensor_names))]

num_export_days = params["indicator"]["num_export_days"]
custom_run = False if not params["common"].get("custom_run") else params["common"].get("custom_run", False)

if num_export_days is None and not custom_run:
if sensor_names.difference(set(gs_metadata.signal)):
# If any signal not in metadata yet, we need to backfill its full history.
logger.warning("Signals missing in the epidata; backfilling full history")
num_export_days = (export_end_date - FULL_BKFILL_START_DATE).days + 1
else:
latest_date_diff = (datetime.today() - to_datetime(min(gs_metadata.max_time))).days + 1
global_max_expected_lag = get_max_lag(params)
expected_date_diff = params["validation"]["common"].get("span_length", 14) + global_max_expected_lag

if latest_date_diff > expected_date_diff:
logger.info(f"Missing dates from: {to_datetime(min(gs_metadata.max_time)).date()}")

num_export_days = expected_date_diff

return num_export_days


def generate_query_dates(
export_start_date: date, export_end_date: date, num_export_days: int, custom_run_flag: bool
) -> List[date]:
"""Produce date range to retrieve data for.

Calculate start of date range as a static offset from the end date.
Pad date range by an additional `PAD_DAYS` days before the earliest date to
produce data for calculating smoothed estimates.

Parameters
----------
export_start_date: date
first date to retrieve data for
export_end_date: date
last date to retrieve data for
num_export_days: int
number of days before end date to export
custom_run_flag: bool
flag to indicate if the date should be taken from export or calculated based on if it's a patch or regular run

Returns
-------
List[date, date]
"""
start_date = export_start_date
if not custom_run_flag:
start_date = export_end_date - timedelta(days=num_export_days)
retrieve_dates = [start_date - timedelta(days=PAD_DAYS - 1), export_end_date]

return retrieve_dates
92 changes: 92 additions & 0 deletions google_symptoms/delphi_google_symptoms/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""
This module is used for patching data in the delphi_google_symptom package.

To use this module, you need to specify the range of issue dates in params.json, like so:

{
"common": {
...
"custom_run": true
},
Comment on lines +7 to +10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: custom_run should be in the "indicator" section, based on run.py code

Suggested change
"common": {
...
"custom_run": true
},
"common": {
...
},
"indicator": {
...
"custom_run": true
},

"validation": {
...
},
"patch": {
"patch_dir": ".../covidcast-indicators/google-symptoms/AprilPatch",
"start_issue": "2024-04-20",
"end_issue": "2024-04-21"
}
}

It will generate data for that range of issue dates, and store them in batch issue format:
[params patch_dir]/issue_[issue-date]/google-symptoms/xxx.csv
"""

from datetime import datetime, timedelta
from os import makedirs

from delphi_utils import get_structured_logger, read_params

from .date_utils import generate_patch_dates
from .run import run_module


def patch(params):
"""
Run the google symptoms indicator for a range of issue dates.

Parameters
----------
params
Dictionary containing indicator configuration. Expected to have the following structure:
- "common":
- "export_dir": str, directory to write output
- "log_exceptions" (optional): bool, whether to log exceptions to file
- "log_filename" (optional): str, name of file to write logs
- "indicator":
- "export_start_date": str, YYYY-MM-DD format, date from which to export data
- "num_export_days": int, number of days before end date (today) to export
- "path_to_bigquery_credentials": str, path to BigQuery API key and service account
JSON file
- "patch": Only used for patching data
- "start_date": str, YYYY-MM-DD format, first issue date
- "end_date": str, YYYY-MM-DD format, last issue date
- "patch_dir": str, directory to write all issues output
"""
logger = get_structured_logger("delphi_google_symptom.patch", filename=params["common"]["log_filename"])

issue_date = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d")
end_issue = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d")

logger.info(f"""Start patching {params["patch"]["patch_dir"]}""")
logger.info(f"""Start issue: {issue_date.strftime("%Y-%m-%d")}""")
logger.info(f"""End issue: {end_issue.strftime("%Y-%m-%d")}""")

makedirs(params["patch"]["patch_dir"], exist_ok=True)

patch_dates = generate_patch_dates(params)

while issue_date <= end_issue:
logger.info(f"""Running issue {issue_date.strftime("%Y-%m-%d")}""")

# Output dir setup
current_issue_yyyymmdd = issue_date.strftime("%Y%m%d")
current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_yyyymmdd}/google-symptom"""
makedirs(f"{current_issue_dir}", exist_ok=True)

params["common"]["export_dir"] = f"""{current_issue_dir}"""
params["indicator"]["custom_run"] = True

date_settings = patch_dates[issue_date]

params["indicator"]["export_start_date"] = date_settings["export_start_date"].strftime("%Y-%m-%d")
params["indicator"]["export_end_date"] = date_settings["export_end_date"].strftime("%Y-%m-%d")
params["indicator"]["num_export_days"] = date_settings["num_export_days"]

run_module(params, logger)

issue_date += timedelta(days=1)


if __name__ == "__main__":
patch(read_params())
58 changes: 9 additions & 49 deletions google_symptoms/delphi_google_symptoms/pull.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
"""Retrieve data and wrangle into appropriate format."""
# -*- coding: utf-8 -*-
import re
from datetime import date, datetime, timedelta # pylint: disable=unused-import
import pandas_gbq
from google.oauth2 import service_account
from datetime import date, datetime # pylint: disable=unused-import

import numpy as np
import pandas as pd
import pandas_gbq
from google.oauth2 import service_account

from .constants import DC_FIPS, METRICS, COMBINED_METRIC, SYMPTOM_SETS, DTYPE_CONVERSIONS

from .constants import COMBINED_METRIC, DC_FIPS, DTYPE_CONVERSIONS, METRICS, SYMPTOM_SETS
from .date_utils import generate_query_dates

# Create map of BigQuery symptom column names to desired column names.
colname_map = {"symptom_" +
Expand Down Expand Up @@ -95,45 +96,6 @@ def preprocess(df, level):
return df


def get_date_range(export_start_date, export_end_date, num_export_days):
"""Produce date range to retrieve data for.

Calculate start of date range as a static offset from the end date.
Pad date range by an additional 7 days before the earliest date to
produce data for calculating smoothed estimates.

Parameters
----------
export_start_date: date
first date to retrieve data for
export_end_date: date
last date to retrieve data for
num_export_days: int
number of days before end date to export

Returns
-------
list
"""
PAD_DAYS = 7

if num_export_days == "all":
# Get all dates since export_start_date.
start_date = export_start_date
else:
# Don't fetch data before the user-set start date.
start_date = max(
export_end_date - timedelta(days=num_export_days),
export_start_date
)

retrieve_dates = [
start_date - timedelta(days=PAD_DAYS - 1),
export_end_date]

return retrieve_dates


def format_dates_for_query(date_list):
"""Format list of dates as needed for query.

Expand Down Expand Up @@ -224,7 +186,6 @@ def pull_gs_data_one_geolevel(level, date_range):
query = produce_query(level, date_range)

df = pandas_gbq.read_gbq(query, progress_bar_type=None, dtypes = DTYPE_CONVERSIONS)

if len(df) == 0:
df = pd.DataFrame(
columns=["open_covid_region_code", "date"] +
Expand Down Expand Up @@ -254,7 +215,7 @@ def initialize_credentials(credentials):
pandas_gbq.context.project = credentials.project_id


def pull_gs_data(credentials, export_start_date, export_end_date, num_export_days):
def pull_gs_data(credentials, export_start_date, export_end_date, num_export_days, custom_run_flag):
"""Pull latest dataset for each geo level and combine.

PS: No information for PR
Expand All @@ -277,9 +238,8 @@ def pull_gs_data(credentials, export_start_date, export_end_date, num_export_day
dict: {"county": pd.DataFrame, "state": pd.DataFrame}
"""
# Fetch and format dates we want to attempt to retrieve
retrieve_dates = get_date_range(
export_start_date, export_end_date, num_export_days)
retrieve_dates = format_dates_for_query(retrieve_dates)
export_date_range = generate_query_dates(export_start_date, export_end_date, num_export_days, custom_run_flag)
retrieve_dates = format_dates_for_query(export_date_range)

initialize_credentials(credentials)

Expand Down
Loading
Loading