Skip to content

2085 add proportions nhsn #2111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 29 additions & 10 deletions nhsn/delphi_nhsn/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,48 @@
PRELIM_DATASET_ID = "mpgq-jmmr"

# column name from socrata
TOTAL_ADMISSION_COVID_API = "totalconfc19newadm"
TOTAL_ADMISSION_FLU_API = "totalconfflunewadm"
TOTAL_ADMISSION_COVID_COL = "totalconfc19newadm"
TOTAL_ADMISSION_FLU_COL = "totalconfflunewadm"
NUM_HOSP_REPORTING_COVID_COL = "totalconfc19newadmhosprep"
NUM_HOSP_REPORTING_FLU_COL = "totalconfflunewadmhosprep"

# signal name
TOTAL_ADMISSION_COVID = "confirmed_admissions_covid_ew"
TOTAL_ADMISSION_FLU = "confirmed_admissions_flu_ew"
NUM_HOSP_REPORTING_COVID = "num_reporting_hospital_covid_ew"
NUM_HOSP_REPORTING_FLU = "num_reporting_hospital_flu_ew"

SIGNALS_MAP = {
"confirmed_admissions_covid_ew": TOTAL_ADMISSION_COVID_API,
"confirmed_admissions_flu_ew": TOTAL_ADMISSION_FLU_API,
TOTAL_ADMISSION_COVID: TOTAL_ADMISSION_COVID_COL,
TOTAL_ADMISSION_FLU: TOTAL_ADMISSION_FLU_COL,
NUM_HOSP_REPORTING_COVID: NUM_HOSP_REPORTING_COVID_COL,
NUM_HOSP_REPORTING_FLU: NUM_HOSP_REPORTING_FLU_COL,
}

TYPE_DICT = {
"timestamp": "datetime64[ns]",
"geo_id": str,
"confirmed_admissions_covid_ew": float,
"confirmed_admissions_flu_ew": float,
TOTAL_ADMISSION_COVID: float,
TOTAL_ADMISSION_FLU: float,
NUM_HOSP_REPORTING_COVID: float,
NUM_HOSP_REPORTING_FLU: float,
}

# signal mapping for secondary, preliminary source
# made copy incase things would diverge

PRELIM_SIGNALS_MAP = {
"confirmed_admissions_covid_ew_prelim": TOTAL_ADMISSION_COVID_API,
"confirmed_admissions_flu_ew_prelim": TOTAL_ADMISSION_FLU_API,
f"{TOTAL_ADMISSION_COVID}_prelim": TOTAL_ADMISSION_COVID_COL,
f"{TOTAL_ADMISSION_FLU}_prelim": TOTAL_ADMISSION_FLU_COL,
f"{NUM_HOSP_REPORTING_COVID}_prelim": NUM_HOSP_REPORTING_COVID_COL,
f"{NUM_HOSP_REPORTING_FLU}_prelim": NUM_HOSP_REPORTING_FLU_COL,
}

PRELIM_TYPE_DICT = {
"timestamp": "datetime64[ns]",
"geo_id": str,
"confirmed_admissions_covid_ew_prelim": float,
"confirmed_admissions_flu_ew_prelim": float,
f"{TOTAL_ADMISSION_COVID}_prelim": float,
f"{TOTAL_ADMISSION_FLU}_prelim": float,
f"{NUM_HOSP_REPORTING_COVID}_prelim": float,
f"{NUM_HOSP_REPORTING_FLU}_prelim": float,
}
58 changes: 44 additions & 14 deletions nhsn/delphi_nhsn/pull.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
# -*- coding: utf-8 -*-
"""Functions for pulling NSSP ER data."""
import logging
import random
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from urllib.error import HTTPError

import pandas as pd
from delphi_utils import create_backup_csv
Expand All @@ -11,20 +15,45 @@
from .constants import MAIN_DATASET_ID, PRELIM_DATASET_ID, PRELIM_SIGNALS_MAP, PRELIM_TYPE_DICT, SIGNALS_MAP, TYPE_DICT


def pull_data(socrata_token: str, dataset_id: str):
def check_last_updated(client, dataset_id, logger):
"""Check last updated timestamp to determine data should be pulled or not."""
try:
response = client.get_metadata(dataset_id)
except HTTPError as err:
if err.code == 503:
time.sleep(2 + random.randint(0, 1000) / 1000.0)
response = client.get_metadata(dataset_id)
else:
raise err

updated_timestamp = datetime.utcfromtimestamp(int(response["rowsUpdatedAt"]))
now = datetime.utcnow()
recently_updated = (now - updated_timestamp) < timedelta(days=1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: I think this "recently-updated" logic is sufficient but not robust. For example, if we fail to pull data for multiple days, the next day we run we would not pull data we had never seen before if it was not posted in the last day.

The more robust solution would be to save last pull's updated_timestamp to a local file. We would then load that and compare updated_timestamp to that -- if exactly equal, skip update; if unequal, pull data.

Copy link
Contributor Author

@aysim319 aysim319 Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

definitely makes sense and something I didn't think about! The only thing I did different was use the api instead of scanning the file since I imagine the file list is going to go and doesn't make much sense to scan the file list every day

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, checking the API could make sense, too. The one thing I'd caution is timezones -- your previous approach explicitly used UTC on both "old" and "now" timestamps, but I don't know what the API uses.

Second, the API only has dates, not times. Would that ever cause problems? E.g. we want to check for updates multiple times a day.

Copy link
Contributor Author

@aysim319 aysim319 Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, checking the API could make sense, too. The one thing I'd caution is timezones -- your previous
approach explicitly used UTC on both "old" and "now" timestamps, but I don't know what the API uses

since the data and the dates are just date and not datetime, I didn't take timezones into account....hmm i also don't know for sure which timezone, i believe it's EST, but have to double check

the API only has dates, not times. Would that ever cause problems? E.g. we want to check for updates multiple times a day.

since this is data that generally updates weekly, I was planning on just running once a day, so I thought timezone wouldn't be as much of an issue

Copy link
Contributor

@nmdefries nmdefries Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, given these complications, I'm thinking reading/writing to a file is easier. We wouldn't need to keep a complete list of all update datetimes ever, just the single most recent datetime. So the file wouldn't keep getting bigger and bigger, we could just read a single line.

This lets us store a UTC date (no timezones to worry about), no API date-processing to worry about, and we can store a datetime to be extra precise.

Copy link
Contributor Author

@aysim319 aysim319 Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't a fan of have metadata files, seems overkill / introduce more complexity than I would like, so after talking things through with Nolan just now, I decided to simplify the logic and create backups daily, but still do simple check to see recently updated to actually continue processing and create the csv files, so if there are outages that happened after the initial pulls, we can go back and do patches for them.

Nolan also mentioned that for the future, we could look into creating a generic tool/script to dedup things specifically and I like that direction since it would seperate the complexity away from this code base

prelim_prefix = "Preliminary " if dataset_id == PRELIM_DATASET_ID else ""
if recently_updated:
logger.info(f"{prelim_prefix}NHSN data was recently updated; Pulling data", updated_timestamp=updated_timestamp)
else:
logger.info(f"{prelim_prefix}NHSN data is stale; Skipping", updated_timestamp=updated_timestamp)
return recently_updated


def pull_data(socrata_token: str, dataset_id: str, logger):
"""Pull data from Socrata API."""
client = Socrata("data.cdc.gov", socrata_token)
results = []
offset = 0
limit = 50000 # maximum limit allowed by SODA 2.0
while True:
page = client.get(dataset_id, limit=limit, offset=offset)
if not page:
break # exit the loop if no more results
results.extend(page)
offset += limit

df = pd.DataFrame.from_records(results)
recently_updated = check_last_updated(client, "ua7e-t2fy", logger)
df = pd.DataFrame()
if recently_updated:
results = []
offset = 0
limit = 50000 # maximum limit allowed by SODA 2.0
while True:
page = client.get(dataset_id, limit=limit, offset=offset)
if not page:
break # exit the loop if no more results
results.extend(page)
offset += limit

df = pd.DataFrame.from_records(results)
return df


Expand Down Expand Up @@ -89,7 +118,7 @@ def pull_nhsn_data(
"""
# Pull data from Socrata API
df = (
pull_data(socrata_token, dataset_id=MAIN_DATASET_ID)
pull_data(socrata_token, MAIN_DATASET_ID, logger)
if not custom_run
else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=False)
)
Expand Down Expand Up @@ -144,8 +173,9 @@ def pull_preliminary_nhsn_data(
pd.DataFrame
Dataframe as described above.
"""
# Pull data from Socrata API
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: pull_preliminary_nhsn_data and pull_nhsn_data are really similar. I think it will become a maintenance issue to keep both. We should probably keep these two functions as wrappers of a shared fn that takes a is_prelim flag (or similar).

Diff of the two fns:

Screen Shot 2025-02-11 at 15 20 55

Copy link
Contributor Author

@aysim319 aysim319 Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know they're similar, i thought about it and went back and forth about it but I was in the thought of maybe in the future there would be something different going on so kept it seperate. I'm not too concerned about this, since we'll be slowly deprecating this codebase;

df = (
pull_data(socrata_token, dataset_id=PRELIM_DATASET_ID)
pull_data(socrata_token, PRELIM_DATASET_ID, logger)
if not custom_run
else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=True)
)
Expand Down
66 changes: 35 additions & 31 deletions nhsn/delphi_nhsn/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"""
import time
from datetime import date, datetime, timedelta
from itertools import product

import numpy as np
from delphi_utils import GeoMapper, get_structured_logger
Expand Down Expand Up @@ -59,43 +60,46 @@ def run_module(params, logger=None):
)

geo_mapper = GeoMapper()
signal_df_dict = {signal: nhsn_df for signal in SIGNALS_MAP}
signal_df_dict = dict()
if not nhsn_df.empty:
signal_df_dict.update({signal: nhsn_df for signal in SIGNALS_MAP})
# some of the source backups do not include for preliminary data TODO remove after first patch
if not preliminary_nhsn_df.empty:
signal_df_dict.update({signal: preliminary_nhsn_df for signal in PRELIM_SIGNALS_MAP})

for signal, df_pull in signal_df_dict.items():
for geo in GEOS:
df = df_pull.copy()
df = df[["timestamp", "geo_id", signal]]
df.rename({signal: "val"}, axis=1, inplace=True)
if geo == "nation":
df = df[df["geo_id"] == "us"]
elif geo == "hhs":
df = df[df["geo_id"] != "us"]
df = df[df["geo_id"].str.len() == 2]
df.rename(columns={"geo_id": "state_id"}, inplace=True)
df = geo_mapper.add_geocode(df, "state_id", "state_code", from_col="state_id")
df = geo_mapper.add_geocode(df, "state_code", "hhs", from_col="state_code", new_col="hhs")
df = geo_mapper.replace_geocode(
df, from_col="state_code", from_code="state_code", new_col="geo_id", new_code="hhs"
)
elif geo == "state":
df = df[df_pull["geo_id"] != "us"]
df = df[df["geo_id"].str.len() == 2] # hhs region is a value in geo_id column
for geo, signals_df in product(GEOS, signal_df_dict.items()):
signal, df_pull = signals_df
df = df_pull.copy()
df = df[["timestamp", "geo_id", signal]]
df.rename({signal: "val"}, axis=1, inplace=True)

df["se"] = np.nan
df["sample_size"] = np.nan
dates = create_export_csv(
df,
geo_res=geo,
export_dir=export_dir,
start_date=datetime.strptime(export_start_date, "%Y-%m-%d"),
sensor=signal,
weekly_dates=True,
if geo == "nation":
df = df[df["geo_id"] == "us"]
elif geo == "hhs":
df = df[df["geo_id"] != "us"]
df = df[df["geo_id"].str.len() == 2]
df.rename(columns={"geo_id": "state_id"}, inplace=True)
df = geo_mapper.add_geocode(df, "state_id", "state_code", from_col="state_id")
df = geo_mapper.add_geocode(df, "state_code", "hhs", from_col="state_code", new_col="hhs")
df = geo_mapper.replace_geocode(
df, from_col="state_code", from_code="state_code", new_col="geo_id", new_code="hhs"
)
if len(dates) > 0:
run_stats.append((max(dates), len(dates)))
elif geo == "state":
df = df[df_pull["geo_id"] != "us"]
df = df[df["geo_id"].str.len() == 2] # hhs region is a value in geo_id column

df["se"] = np.nan
df["sample_size"] = np.nan
dates = create_export_csv(
df,
geo_res=geo,
export_dir=export_dir,
start_date=datetime.strptime(export_start_date, "%Y-%m-%d"),
sensor=signal,
weekly_dates=True,
)
if len(dates) > 0:
run_stats.append((max(dates), len(dates)))

elapsed_time_in_seconds = round(time.time() - start_time, 2)
min_max_date = run_stats and min(s[0] for s in run_stats)
Expand Down
5 changes: 4 additions & 1 deletion nhsn/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import copy
import json
import time
from unittest.mock import patch

import pytest
Expand Down Expand Up @@ -60,7 +61,8 @@ def params_w_patch(params):

@pytest.fixture(scope="function")
def run_as_module(params):
with patch('sodapy.Socrata.get') as mock_get:
with patch('sodapy.Socrata.get') as mock_get, \
patch('sodapy.Socrata.get_metadata') as mock_get_metadata:
def side_effect(*args, **kwargs):
if kwargs['offset'] == 0:
if "ua7e-t2fy" in args[0]:
Expand All @@ -70,5 +72,6 @@ def side_effect(*args, **kwargs):
else:
return []
mock_get.side_effect = side_effect
mock_get_metadata.return_value = {"rowsUpdatedAt": time.time()}
run_module(params)

15 changes: 10 additions & 5 deletions nhsn/tests/test_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
from epiweeks import Week

from delphi_nhsn.patch import filter_source_files, patch
from delphi_nhsn.constants import TOTAL_ADMISSION_COVID_API, TOTAL_ADMISSION_FLU_API
from delphi_nhsn.constants import TOTAL_ADMISSION_COVID_COL, TOTAL_ADMISSION_FLU_COL, \
NUM_HOSP_REPORTING_FLU_COL, NUM_HOSP_REPORTING_COVID_COL
from conftest import TEST_DATA, PRELIM_TEST_DATA, TEST_DIR

class TestPatch:
Expand Down Expand Up @@ -85,11 +86,15 @@ def generate_test_source_files(self):
custom_filename = f"{TEST_DIR}/backups/{date}.csv.gz"
custom_filename_prelim = f"{TEST_DIR}/backups/{date}_prelim.csv.gz"
test_data = pd.DataFrame(TEST_DATA)
test_data[TOTAL_ADMISSION_COVID_API] = int(date)
test_data[TOTAL_ADMISSION_FLU_API] = int(date)
test_data[TOTAL_ADMISSION_COVID_COL] = int(date)
test_data[TOTAL_ADMISSION_FLU_COL] = int(date)
test_data[NUM_HOSP_REPORTING_COVID_COL] = int(date)
test_data[NUM_HOSP_REPORTING_FLU_COL] = int(date)
test_prelim_data = pd.DataFrame(PRELIM_TEST_DATA)
test_prelim_data[TOTAL_ADMISSION_COVID_API] = int(date)
test_prelim_data[TOTAL_ADMISSION_FLU_API] = int(date)
test_prelim_data[TOTAL_ADMISSION_COVID_COL] = int(date)
test_prelim_data[TOTAL_ADMISSION_FLU_COL] = int(date)
test_prelim_data[NUM_HOSP_REPORTING_COVID_COL] = int(date)
test_prelim_data[NUM_HOSP_REPORTING_FLU_COL] = int(date)

test_data = test_data.head(2)
test_data.to_csv(
Expand Down
Loading