Skip to content

2085 add proportions nhsn #2111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 29 additions & 10 deletions nhsn/delphi_nhsn/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,48 @@
PRELIM_DATASET_ID = "mpgq-jmmr"

# column name from socrata
TOTAL_ADMISSION_COVID_API = "totalconfc19newadm"
TOTAL_ADMISSION_FLU_API = "totalconfflunewadm"
TOTAL_ADMISSION_COVID_COL = "totalconfc19newadm"
TOTAL_ADMISSION_FLU_COL = "totalconfflunewadm"
NUM_HOSP_REPORTING_COVID_COL = "totalconfc19newadmhosprep"
NUM_HOSP_REPORTING_FLU_COL = "totalconfflunewadmhosprep"

# signal name
TOTAL_ADMISSION_COVID = "confirmed_admissions_covid_ew"
TOTAL_ADMISSION_FLU = "confirmed_admissions_flu_ew"
NUM_HOSP_REPORTING_COVID = "hosprep_confirmed_admissions_covid_ew"
NUM_HOSP_REPORTING_FLU = "hosprep_confirmed_admissions_flu_ew"

SIGNALS_MAP = {
"confirmed_admissions_covid_ew": TOTAL_ADMISSION_COVID_API,
"confirmed_admissions_flu_ew": TOTAL_ADMISSION_FLU_API,
TOTAL_ADMISSION_COVID: TOTAL_ADMISSION_COVID_COL,
TOTAL_ADMISSION_FLU: TOTAL_ADMISSION_FLU_COL,
NUM_HOSP_REPORTING_COVID: NUM_HOSP_REPORTING_COVID_COL,
NUM_HOSP_REPORTING_FLU: NUM_HOSP_REPORTING_FLU_COL,
}

TYPE_DICT = {
"timestamp": "datetime64[ns]",
"geo_id": str,
"confirmed_admissions_covid_ew": float,
"confirmed_admissions_flu_ew": float,
TOTAL_ADMISSION_COVID: float,
TOTAL_ADMISSION_FLU: float,
NUM_HOSP_REPORTING_COVID: float,
NUM_HOSP_REPORTING_FLU: float,
}

# signal mapping for secondary, preliminary source
# made copy incase things would diverge

PRELIM_SIGNALS_MAP = {
"confirmed_admissions_covid_ew_prelim": TOTAL_ADMISSION_COVID_API,
"confirmed_admissions_flu_ew_prelim": TOTAL_ADMISSION_FLU_API,
f"{TOTAL_ADMISSION_COVID}_prelim": TOTAL_ADMISSION_COVID_COL,
f"{TOTAL_ADMISSION_FLU}_prelim": TOTAL_ADMISSION_FLU_COL,
f"{NUM_HOSP_REPORTING_COVID}_prelim": NUM_HOSP_REPORTING_COVID_COL,
f"{NUM_HOSP_REPORTING_FLU}_prelim": NUM_HOSP_REPORTING_FLU_COL,
}

PRELIM_TYPE_DICT = {
"timestamp": "datetime64[ns]",
"geo_id": str,
"confirmed_admissions_covid_ew_prelim": float,
"confirmed_admissions_flu_ew_prelim": float,
f"{TOTAL_ADMISSION_COVID}_prelim": float,
f"{TOTAL_ADMISSION_FLU}_prelim": float,
f"{NUM_HOSP_REPORTING_COVID}_prelim": float,
f"{NUM_HOSP_REPORTING_FLU}_prelim": float,
}
107 changes: 90 additions & 17 deletions nhsn/delphi_nhsn/pull.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
# -*- coding: utf-8 -*-
"""Functions for pulling NSSP ER data."""
import copy
import logging
import random
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from urllib.error import HTTPError

import pandas as pd
from delphi_utils import create_backup_csv
Expand All @@ -11,20 +16,73 @@
from .constants import MAIN_DATASET_ID, PRELIM_DATASET_ID, PRELIM_SIGNALS_MAP, PRELIM_TYPE_DICT, SIGNALS_MAP, TYPE_DICT


def pull_data(socrata_token: str, dataset_id: str):
def check_last_updated(socrata_token, dataset_id, logger):
"""
Check last updated timestamp to determine data should be pulled or not.

Note -- the behavior of the api fail is to treat is as stale
as having possible duplicate is preferable compared to possible missing data

Parameters
----------
socrata_token
dataset_id
logger

Returns bool
-------

"""
recently_updated_source = True
try:
client = Socrata("data.cdc.gov", socrata_token)
response = client.get_metadata(dataset_id)

updated_timestamp = datetime.utcfromtimestamp(int(response["rowsUpdatedAt"]))
now = datetime.utcnow()
recently_updated_source = (now - updated_timestamp) < timedelta(days=1)

prelim_prefix = "Preliminary " if dataset_id == PRELIM_DATASET_ID else ""
if recently_updated_source:
logger.info(
f"{prelim_prefix}NHSN data was recently updated; Pulling data", updated_timestamp=updated_timestamp
)
else:
logger.info(f"{prelim_prefix}NHSN data is stale; Skipping", updated_timestamp=updated_timestamp)
# pylint: disable=W0703
except Exception as e:
logger.info("error while processing socrata metadata; treating data as stale", error=str(e))
return recently_updated_source


def pull_data(socrata_token: str, dataset_id: str, backup_dir: str, logger):
"""Pull data from Socrata API."""
client = Socrata("data.cdc.gov", socrata_token)
logger.info("Pulling data from Socrata API")
results = []
offset = 0
limit = 50000 # maximum limit allowed by SODA 2.0
while True:
# retry logic for 500 error
try:
page = client.get(dataset_id, limit=limit, offset=offset)
if not page:
break # exit the loop if no more results
except HTTPError as err:
if err.code == 503:
time.sleep(2 + random.randint(0, 1000) / 1000.0)
page = client.get(dataset_id, limit=limit, offset=offset)
else:
logger.info("Error pulling data from Socrata API", error=str(err))
raise err

while len(page) > 0:
results.extend(page)
offset += limit
page = client.get(dataset_id, limit=limit, offset=offset)

df = pd.DataFrame.from_records(results)
if results:
df = pd.DataFrame.from_records(results)
create_backup_csv(df, backup_dir, False, logger=logger)
else:
df = pd.DataFrame()
return df


Expand Down Expand Up @@ -89,25 +147,33 @@ def pull_nhsn_data(
"""
# Pull data from Socrata API
df = (
pull_data(socrata_token, dataset_id=MAIN_DATASET_ID)
pull_data(socrata_token, MAIN_DATASET_ID, backup_dir, logger)
if not custom_run
else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=False)
)

keep_columns = list(TYPE_DICT.keys())
recently_updated = True if custom_run else check_last_updated(socrata_token, MAIN_DATASET_ID, logger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: if we put this before the pull_data logic, we could avoid fetching from the source API in most cases (since this is or will be running every day but only updates once a week).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that was the original thought, but previously you brought up if there's multiple failures in a row and the solution was to squirrel away for now, but at least avoid duplicating for both raw and processed


if not df.empty:
create_backup_csv(df, backup_dir, custom_run, logger=logger)
keep_columns = list(TYPE_DICT.keys())

if not df.empty and recently_updated:
df = df.rename(columns={"weekendingdate": "timestamp", "jurisdiction": "geo_id"})
filtered_type_dict = copy.deepcopy(TYPE_DICT)

for signal, col_name in SIGNALS_MAP.items():
df[signal] = df[col_name]
# older backups don't have certain columns
try:
df[signal] = df[col_name]
except KeyError:
logger.info("column not available in data", col_name=col_name)
keep_columns.remove(signal)
del filtered_type_dict[signal]

df = df[keep_columns]
df["geo_id"] = df["geo_id"].str.lower()
df.loc[df["geo_id"] == "usa", "geo_id"] = "us"
df = df.astype(TYPE_DICT)

df = df.astype(filtered_type_dict)
else:
df = pd.DataFrame(columns=keep_columns)

Expand Down Expand Up @@ -144,24 +210,31 @@ def pull_preliminary_nhsn_data(
pd.DataFrame
Dataframe as described above.
"""
# Pull data from Socrata API
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: pull_preliminary_nhsn_data and pull_nhsn_data are really similar. I think it will become a maintenance issue to keep both. We should probably keep these two functions as wrappers of a shared fn that takes a is_prelim flag (or similar).

Diff of the two fns:

Screen Shot 2025-02-11 at 15 20 55

Copy link
Contributor Author

@aysim319 aysim319 Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know they're similar, i thought about it and went back and forth about it but I was in the thought of maybe in the future there would be something different going on so kept it seperate. I'm not too concerned about this, since we'll be slowly deprecating this codebase;

df = (
pull_data(socrata_token, dataset_id=PRELIM_DATASET_ID)
pull_data(socrata_token, PRELIM_DATASET_ID, backup_dir, logger)
if not custom_run
else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=True)
)

keep_columns = list(PRELIM_TYPE_DICT.keys())
recently_updated = True if custom_run else check_last_updated(socrata_token, PRELIM_DATASET_ID, logger)

if not df.empty:
create_backup_csv(df, backup_dir, custom_run, sensor="prelim", logger=logger)

if not df.empty and recently_updated:
df = df.rename(columns={"weekendingdate": "timestamp", "jurisdiction": "geo_id"})
filtered_type_dict = copy.deepcopy(PRELIM_TYPE_DICT)

for signal, col_name in PRELIM_SIGNALS_MAP.items():
df[signal] = df[col_name]
try:
df[signal] = df[col_name]
except KeyError:
logger.info("column not available in data", col_name=col_name, signal=signal)
keep_columns.remove(signal)
del filtered_type_dict[signal]

df = df[keep_columns]
df = df.astype(PRELIM_TYPE_DICT)
df = df.astype(filtered_type_dict)

df["geo_id"] = df["geo_id"].str.lower()
df.loc[df["geo_id"] == "usa", "geo_id"] = "us"
else:
Expand Down
24 changes: 19 additions & 5 deletions nhsn/delphi_nhsn/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
unpublished signals are. See `delphi_utils.add_prefix()`
- Any other indicator-specific settings
"""
import re
import time
from datetime import date, datetime, timedelta
from itertools import product

import numpy as np
from delphi_utils import GeoMapper, get_structured_logger
Expand Down Expand Up @@ -59,16 +61,20 @@ def run_module(params, logger=None):
)

geo_mapper = GeoMapper()
signal_df_dict = {signal: nhsn_df for signal in SIGNALS_MAP}
# some of the source backups do not include for preliminary data TODO remove after first patch
signal_df_dict = dict()
if not nhsn_df.empty:
signal_df_dict.update({signal: nhsn_df for signal in SIGNALS_MAP})
# some of the source backups do not include for preliminary data
if not preliminary_nhsn_df.empty:
signal_df_dict.update({signal: preliminary_nhsn_df for signal in PRELIM_SIGNALS_MAP})

for signal, df_pull in signal_df_dict.items():
for geo in GEOS:
df = df_pull.copy()
for geo, signals_df in product(GEOS, signal_df_dict.items()):
signal, df_pull = signals_df
df = df_pull.copy()
try:
df = df[["timestamp", "geo_id", signal]]
df.rename({signal: "val"}, axis=1, inplace=True)

if geo == "nation":
df = df[df["geo_id"] == "us"]
elif geo == "hhs":
Expand Down Expand Up @@ -96,6 +102,14 @@ def run_module(params, logger=None):
)
if len(dates) > 0:
run_stats.append((max(dates), len(dates)))
# some signal columns are unavailable for patching.
except KeyError as e:
missing_signal = re.search(r"'([^']*)'", str(e)).group(1)
full_signal_list = list(SIGNALS_MAP.keys()) + list(PRELIM_SIGNALS_MAP.keys())
if missing_signal in full_signal_list:
logger.info("signal not available in data", signal=missing_signal)
else:
raise RuntimeError("Column(s) that shouldn't be missing is missing") from e

elapsed_time_in_seconds = round(time.time() - start_time, 2)
min_max_date = run_stats and min(s[0] for s in run_stats)
Expand Down
5 changes: 4 additions & 1 deletion nhsn/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import copy
import json
import time
from unittest.mock import patch

import pytest
Expand Down Expand Up @@ -60,7 +61,8 @@ def params_w_patch(params):

@pytest.fixture(scope="function")
def run_as_module(params):
with patch('sodapy.Socrata.get') as mock_get:
with patch('sodapy.Socrata.get') as mock_get, \
patch('sodapy.Socrata.get_metadata') as mock_get_metadata:
def side_effect(*args, **kwargs):
if kwargs['offset'] == 0:
if "ua7e-t2fy" in args[0]:
Expand All @@ -70,5 +72,6 @@ def side_effect(*args, **kwargs):
else:
return []
mock_get.side_effect = side_effect
mock_get_metadata.return_value = {"rowsUpdatedAt": time.time()}
run_module(params)

Empty file added nhsn/tests/patch_dir/.gitignore
Empty file.
Binary file added nhsn/tests/test_data/20241119.csv.gz
Binary file not shown.
42 changes: 33 additions & 9 deletions nhsn/tests/test_patch.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import glob
import os
from collections import defaultdict
from pathlib import Path
import shutil
from unittest.mock import patch as mock_patch

import re
import pandas as pd
from datetime import datetime, timedelta

import pytest
from epiweeks import Week

from delphi_nhsn.patch import filter_source_files, patch
from delphi_nhsn.constants import TOTAL_ADMISSION_COVID_API, TOTAL_ADMISSION_FLU_API
from delphi_nhsn.constants import TOTAL_ADMISSION_COVID_COL, TOTAL_ADMISSION_FLU_COL, \
NUM_HOSP_REPORTING_FLU_COL, NUM_HOSP_REPORTING_COVID_COL, GEOS, TOTAL_ADMISSION_COVID, TOTAL_ADMISSION_FLU, \
NUM_HOSP_REPORTING_COVID, NUM_HOSP_REPORTING_FLU
from conftest import TEST_DATA, PRELIM_TEST_DATA, TEST_DIR

class TestPatch:
Expand Down Expand Up @@ -85,11 +87,15 @@ def generate_test_source_files(self):
custom_filename = f"{TEST_DIR}/backups/{date}.csv.gz"
custom_filename_prelim = f"{TEST_DIR}/backups/{date}_prelim.csv.gz"
test_data = pd.DataFrame(TEST_DATA)
test_data[TOTAL_ADMISSION_COVID_API] = int(date)
test_data[TOTAL_ADMISSION_FLU_API] = int(date)
test_data[TOTAL_ADMISSION_COVID_COL] = int(date)
test_data[TOTAL_ADMISSION_FLU_COL] = int(date)
test_data[NUM_HOSP_REPORTING_COVID_COL] = int(date)
test_data[NUM_HOSP_REPORTING_FLU_COL] = int(date)
test_prelim_data = pd.DataFrame(PRELIM_TEST_DATA)
test_prelim_data[TOTAL_ADMISSION_COVID_API] = int(date)
test_prelim_data[TOTAL_ADMISSION_FLU_API] = int(date)
test_prelim_data[TOTAL_ADMISSION_COVID_COL] = int(date)
test_prelim_data[TOTAL_ADMISSION_FLU_COL] = int(date)
test_prelim_data[NUM_HOSP_REPORTING_COVID_COL] = int(date)
test_prelim_data[NUM_HOSP_REPORTING_FLU_COL] = int(date)

test_data = test_data.head(2)
test_data.to_csv(
Expand All @@ -108,21 +114,39 @@ def test_patch(self, params_w_patch):
file_list, prelim_file_list = self.generate_test_source_files()
patch(params_w_patch)

for issue_path in Path(f"{TEST_DIR}/patch_dir").glob("*"):
for issue_path in Path(f"{TEST_DIR}/patch_dir").glob("issue*"):
issue_dt_str = issue_path.name.replace("issue_", "")
for file in Path(issue_path / "nhsn").iterdir():
df = pd.read_csv(file)
assert issue_dt_str == str(int(df["val"][0]))

# clean up
shutil.rmtree(f"{TEST_DIR}/patch_dir")
for file in Path(f"{TEST_DIR}/patch_dir").glob("issue*"):
shutil.rmtree(file)

for file in file_list:
os.remove(file)

for file in prelim_file_list:
os.remove(file)

def test_patch_incomplete_file(self, params_w_patch):
os.makedirs(params_w_patch["patch"]["patch_dir"], exist_ok=True)
issue_date = "20241119"
existing_signals = [TOTAL_ADMISSION_COVID, TOTAL_ADMISSION_FLU]
backup_dir = params_w_patch.get("common").get("backup_dir")
shutil.copy(f"{TEST_DIR}/test_data/{issue_date}.csv.gz", backup_dir)

with mock_patch("delphi_nhsn.patch.read_params", return_value=params_w_patch):
patch(params_w_patch)

files = list(Path(f"{TEST_DIR}/patch_dir/issue_{issue_date}/nhsn").glob("*.csv"))
dates = set([re.search(r"\d{6}", file.name).group() for file in files])
assert len(files) == len(GEOS) * len(existing_signals) * len(dates)
# clean up
for file in Path(f"{TEST_DIR}/patch_dir").glob("issue*"):
shutil.rmtree(file)




Expand Down
Loading
Loading