Skip to content

Release covidcast-indicators 0.3.59 #2108

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.3.58
current_version = 0.3.59
commit = True
message = chore: bump covidcast-indicators to {new_version}
tag = False
8 changes: 7 additions & 1 deletion ansible/templates/sir_complainsalot-params-prod.json.j2
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,13 @@
},
"nssp": {
"max_age":19,
"maintainers": []
"maintainers": [],
"retired-signals": [
"pct_ed_visits_combined_2023rvr",
"pct_ed_visits_covid_2023rvr",
"pct_ed_visits_influenza_2023rvr",
"pct_ed_visits_rsv_2023rvr"
]
},
"nhsn": {
"max_age":19,
Expand Down
2 changes: 1 addition & 1 deletion changehc/version.cfg
Original file line number Diff line number Diff line change
@@ -1 +1 @@
current_version = 0.3.58
current_version = 0.3.59
2 changes: 1 addition & 1 deletion claims_hosp/version.cfg
Original file line number Diff line number Diff line change
@@ -1 +1 @@
current_version = 0.3.58
current_version = 0.3.59
2 changes: 1 addition & 1 deletion doctor_visits/version.cfg
Original file line number Diff line number Diff line change
@@ -1 +1 @@
current_version = 0.3.58
current_version = 0.3.59
2 changes: 1 addition & 1 deletion google_symptoms/version.cfg
Original file line number Diff line number Diff line change
@@ -1 +1 @@
current_version = 0.3.58
current_version = 0.3.59
2 changes: 1 addition & 1 deletion hhs_hosp/version.cfg
Original file line number Diff line number Diff line change
@@ -1 +1 @@
current_version = 0.3.58
current_version = 0.3.59
2 changes: 1 addition & 1 deletion nchs_mortality/version.cfg
Original file line number Diff line number Diff line change
@@ -1 +1 @@
current_version = 0.3.58
current_version = 0.3.59
3 changes: 3 additions & 0 deletions nhsn/delphi_nhsn/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

GEOS = ["state", "nation", "hhs"]

MAIN_DATASET_ID = "ua7e-t2fy"
PRELIM_DATASET_ID = "mpgq-jmmr"

# column name from socrata
TOTAL_ADMISSION_COVID_API = "totalconfc19newadm"
TOTAL_ADMISSION_FLU_API = "totalconfflunewadm"
Expand Down
94 changes: 94 additions & 0 deletions nhsn/delphi_nhsn/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""
This module is used for patching data in the delphi_nhsn package.

To use this module, you need to specify the range of issue dates in params.json, like so:

{
"common": {
...
},
"validation": {
...
},
"patch": {
"patch_dir": "/Users/minhkhuele/Desktop/delphi/covidcast-indicators/nhsn/patch"
}
}

It will generate data for the range of issue dates corresponding to source data files available in "backup_dir"
specified under "common", and store them in batch issue format under "patch_dir":
[name-of-patch]/issue_[issue-date]/nhsn/actual_data_file.csv
"""

from datetime import datetime
from os import makedirs
from pathlib import Path
from typing import List

from delphi_utils import get_structured_logger, read_params
from epiweeks import Week

from .run import run_module


def filter_source_files(source_files: List[Path]):
"""
Filter patch files such that each element in the list is an unique epiweek with the latest issue date.

Parameters
----------
source_files

Returns
-------
list of issue dates

"""
epiweek_dict = dict()

for file in source_files:
if "prelim" not in file.stem:
current_issue_date = datetime.strptime(file.name.split(".")[0], "%Y%m%d")
epiweek = Week.fromdate(current_issue_date)
epiweek_dict[epiweek] = file

filtered_patch_list = list(epiweek_dict.values())
return filtered_patch_list


def patch(params):
"""
Run the doctor visits indicator for a range of issue dates.

The range of issue dates is specified in params.json using the following keys:
- "patch": Only used for patching data
- "patch_dir": str, directory to write all issues output
"""
logger = get_structured_logger("delphi_nhsn.patch", filename=params["common"]["log_filename"])

source_files = sorted(Path(params["common"]["backup_dir"]).glob("*.csv.gz"))
makedirs(params["patch"]["patch_dir"], exist_ok=True)

logger.info(
"Starting patching",
patch_directory=params["patch"]["patch_dir"],
start_issue=source_files[0].name.split(".")[0],
end_issue=source_files[-1].name.split(".")[0],
)

patch_list = filter_source_files(source_files)
for file in patch_list:
issue_date_str = file.name.split(".")[0]
logger.info("Running issue", issue_date=datetime.strptime(issue_date_str, "%Y%m%d").strftime("%Y-%m-%d"))
params["patch"]["issue_date"] = issue_date_str
# regardless of week date type or not the directory name must be issue_date_YYYYMMDD
# conversion in done in acquisition
current_issue_dir = f"{params['patch']['patch_dir']}/issue_{issue_date_str}/nhsn"
makedirs(current_issue_dir, exist_ok=True)
params["common"]["export_dir"] = current_issue_dir
params["common"]["custom_run"] = True
run_module(params, logger)


if __name__ == "__main__":
patch(read_params())
59 changes: 53 additions & 6 deletions nhsn/delphi_nhsn/pull.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# -*- coding: utf-8 -*-
"""Functions for pulling NSSP ER data."""
import logging
from pathlib import Path
from typing import Optional

import pandas as pd
from delphi_utils import create_backup_csv
from sodapy import Socrata

from .constants import PRELIM_SIGNALS_MAP, PRELIM_TYPE_DICT, SIGNALS_MAP, TYPE_DICT
from .constants import MAIN_DATASET_ID, PRELIM_DATASET_ID, PRELIM_SIGNALS_MAP, PRELIM_TYPE_DICT, SIGNALS_MAP, TYPE_DICT


def pull_data(socrata_token: str, dataset_id: str):
Expand All @@ -27,7 +28,42 @@ def pull_data(socrata_token: str, dataset_id: str):
return df


def pull_nhsn_data(socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None):
def pull_data_from_file(filepath: str, issue_date: str, logger, prelim_flag=False) -> pd.DataFrame:
"""
Pull data from source file.

The source file is generated from delphi_utils.create_backup_csv
Parameters
----------
filepath: full path where the source file is located
issue_date: date when the file was pulled / generated
logger
prelim_flag: boolean to indicate which dataset to grab

Returns
-------
pd.DataFrame
Dataframe as described above.
"""
df = pd.DataFrame()
if issue_date:
issue_date = issue_date.replace("-", "")
filename = f"{issue_date}_prelim.csv.gz" if prelim_flag else f"{issue_date}.csv.gz"
backup_file = Path(filepath, filename)

if backup_file.exists():
df = pd.read_csv(backup_file, compression="gzip")
logger.info("Pulling data from file", file=filename, num_rows=len(df))
return df


def pull_nhsn_data(
socrata_token: str,
backup_dir: str,
custom_run: bool,
issue_date: Optional[str],
logger: Optional[logging.Logger] = None,
):
"""Pull the latest NHSN hospital admission data, and conforms it into a dataset.

The output dataset has:
Expand All @@ -52,7 +88,11 @@ def pull_nhsn_data(socrata_token: str, backup_dir: str, custom_run: bool, logger
Dataframe as described above.
"""
# Pull data from Socrata API
df = pull_data(socrata_token, dataset_id="ua7e-t2fy")
df = (
pull_data(socrata_token, dataset_id=MAIN_DATASET_ID)
if not custom_run
else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=False)
)

keep_columns = list(TYPE_DICT.keys())

Expand All @@ -75,7 +115,11 @@ def pull_nhsn_data(socrata_token: str, backup_dir: str, custom_run: bool, logger


def pull_preliminary_nhsn_data(
socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None
socrata_token: str,
backup_dir: str,
custom_run: bool,
issue_date: Optional[str],
logger: Optional[logging.Logger] = None,
):
"""Pull the latest preliminary NHSN hospital admission data, and conforms it into a dataset.

Expand All @@ -100,8 +144,11 @@ def pull_preliminary_nhsn_data(
pd.DataFrame
Dataframe as described above.
"""
# Pull data from Socrata API
df = pull_data(socrata_token, dataset_id="mpgq-jmmr")
df = (
pull_data(socrata_token, dataset_id=PRELIM_DATASET_ID)
if not custom_run
else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=True)
)

keep_columns = list(PRELIM_TYPE_DICT.keys())

Expand Down
29 changes: 19 additions & 10 deletions nhsn/delphi_nhsn/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from .pull import pull_nhsn_data, pull_preliminary_nhsn_data


def run_module(params):
def run_module(params, logger=None):
"""
Run the indicator.

Expand All @@ -35,14 +35,16 @@ def run_module(params):
Nested dictionary of parameters.
"""
start_time = time.time()
logger = get_structured_logger(
__name__,
filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True),
)
if not logger:
logger = get_structured_logger(
__name__,
filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True),
)
export_dir = params["common"]["export_dir"]
backup_dir = params["common"]["backup_dir"]
custom_run = params["common"].get("custom_run", False)
issue_date = params.get("patch", dict()).get("issue_date", None)
socrata_token = params["indicator"]["socrata_token"]
export_start_date = params["indicator"]["export_start_date"]
run_stats = []
Expand All @@ -51,12 +53,16 @@ def run_module(params):
export_start_date = date.today() - timedelta(days=date.today().weekday() + 2)
export_start_date = export_start_date.strftime("%Y-%m-%d")

nhsn_df = pull_nhsn_data(socrata_token, backup_dir, custom_run=custom_run, logger=logger)
preliminary_nhsn_df = pull_preliminary_nhsn_data(socrata_token, backup_dir, custom_run=custom_run, logger=logger)
nhsn_df = pull_nhsn_data(socrata_token, backup_dir, custom_run=custom_run, issue_date=issue_date, logger=logger)
preliminary_nhsn_df = pull_preliminary_nhsn_data(
socrata_token, backup_dir, custom_run=custom_run, issue_date=issue_date, logger=logger
)

geo_mapper = GeoMapper()
signal_df_dict = {signal: nhsn_df for signal in SIGNALS_MAP}
signal_df_dict.update({signal: preliminary_nhsn_df for signal in PRELIM_SIGNALS_MAP})
# some of the source backups do not include for preliminary data TODO remove after first patch
if not preliminary_nhsn_df.empty:
signal_df_dict.update({signal: preliminary_nhsn_df for signal in PRELIM_SIGNALS_MAP})

for signal, df_pull in signal_df_dict.items():
for geo in GEOS:
Expand All @@ -67,14 +73,17 @@ def run_module(params):
df = df[df["geo_id"] == "us"]
elif geo == "hhs":
df = df[df["geo_id"] != "us"]
df = df[df["geo_id"].str.len() == 2]
df.rename(columns={"geo_id": "state_id"}, inplace=True)
df = geo_mapper.add_geocode(df, "state_id", "state_code", from_col="state_id")
df = geo_mapper.add_geocode(df, "state_code", "hhs", from_col="state_code", new_col="hhs")
df = geo_mapper.replace_geocode(
df, from_col="state_code", from_code="state_code", new_col="geo_id", new_code="hhs"
)
else:
elif geo == "state":
df = df[df_pull["geo_id"] != "us"]
df = df[df["geo_id"].str.len() == 2] # hhs region is a value in geo_id column

df["se"] = np.nan
df["sample_size"] = np.nan
dates = create_export_csv(
Expand Down
11 changes: 6 additions & 5 deletions nhsn/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
# queries the nhsn data with timestamp (2021-08-19, 2021-10-19) with CO and USA data


with open("test_data/page.json", "r") as f:
with open(f"{TEST_DIR}/test_data/page.json", "r") as f:
TEST_DATA = json.load(f)

with open("test_data/prelim_page.json", "r") as f:
with open(f"{TEST_DIR}/test_data/prelim_page.json", "r") as f:
PRELIM_TEST_DATA = json.load(f)

@pytest.fixture(scope="session")
Expand Down Expand Up @@ -50,11 +50,12 @@ def params():
@pytest.fixture
def params_w_patch(params):
params_copy = copy.deepcopy(params)
params_copy["common"]["custom_run"] = True
params_copy["patch"] = {
"start_issue": "2024-06-27",
"end_issue": "2024-06-29",
"patch_dir": "./patch_dir"
"patch_dir": f"{TEST_DIR}/patch_dir",
"issue_date": "2024-12-12",
}

return params_copy

@pytest.fixture(scope="function")
Expand Down
Binary file added nhsn/tests/test_data/20241212.csv.gz
Binary file not shown.
Binary file added nhsn/tests/test_data/20241212_prelim.csv.gz
Binary file not shown.
Loading