Skip to content

Commit f543fe9

Browse files
aysim319nmdefries
andauthored
2085 add proportions nhsn (#2111)
* initial implimentation for proportion * in progress * check for update in progress * adding checking updates in progress * adding just num reporting hospital * tests and undoing proportion signal code * lint * fixed test * fix test part 2 * fixed bugs related to patching added test for missing signal columns for patching * suggestion * missed fix for test data * fixed test * suggested change * changed logic to be cleaner; always create backups * lint * wrapped in try block * retrigger jenkin build * cleaned test and suggested changes * adding more context/comments for check update * doc string fix * fixed copy issue * lint * Apply suggestions from code review Co-authored-by: nmdefries <[email protected]> --------- Co-authored-by: nmdefries <[email protected]>
1 parent b50db1d commit f543fe9

File tree

8 files changed

+348
-167
lines changed

8 files changed

+348
-167
lines changed

nhsn/delphi_nhsn/constants.py

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,29 +6,48 @@
66
PRELIM_DATASET_ID = "mpgq-jmmr"
77

88
# column name from socrata
9-
TOTAL_ADMISSION_COVID_API = "totalconfc19newadm"
10-
TOTAL_ADMISSION_FLU_API = "totalconfflunewadm"
9+
TOTAL_ADMISSION_COVID_COL = "totalconfc19newadm"
10+
TOTAL_ADMISSION_FLU_COL = "totalconfflunewadm"
11+
NUM_HOSP_REPORTING_COVID_COL = "totalconfc19newadmhosprep"
12+
NUM_HOSP_REPORTING_FLU_COL = "totalconfflunewadmhosprep"
13+
14+
# signal name
15+
TOTAL_ADMISSION_COVID = "confirmed_admissions_covid_ew"
16+
TOTAL_ADMISSION_FLU = "confirmed_admissions_flu_ew"
17+
NUM_HOSP_REPORTING_COVID = "hosprep_confirmed_admissions_covid_ew"
18+
NUM_HOSP_REPORTING_FLU = "hosprep_confirmed_admissions_flu_ew"
1119

1220
SIGNALS_MAP = {
13-
"confirmed_admissions_covid_ew": TOTAL_ADMISSION_COVID_API,
14-
"confirmed_admissions_flu_ew": TOTAL_ADMISSION_FLU_API,
21+
TOTAL_ADMISSION_COVID: TOTAL_ADMISSION_COVID_COL,
22+
TOTAL_ADMISSION_FLU: TOTAL_ADMISSION_FLU_COL,
23+
NUM_HOSP_REPORTING_COVID: NUM_HOSP_REPORTING_COVID_COL,
24+
NUM_HOSP_REPORTING_FLU: NUM_HOSP_REPORTING_FLU_COL,
1525
}
1626

1727
TYPE_DICT = {
1828
"timestamp": "datetime64[ns]",
1929
"geo_id": str,
20-
"confirmed_admissions_covid_ew": float,
21-
"confirmed_admissions_flu_ew": float,
30+
TOTAL_ADMISSION_COVID: float,
31+
TOTAL_ADMISSION_FLU: float,
32+
NUM_HOSP_REPORTING_COVID: float,
33+
NUM_HOSP_REPORTING_FLU: float,
2234
}
2335

2436
# signal mapping for secondary, preliminary source
37+
# made copy incase things would diverge
38+
2539
PRELIM_SIGNALS_MAP = {
26-
"confirmed_admissions_covid_ew_prelim": TOTAL_ADMISSION_COVID_API,
27-
"confirmed_admissions_flu_ew_prelim": TOTAL_ADMISSION_FLU_API,
40+
f"{TOTAL_ADMISSION_COVID}_prelim": TOTAL_ADMISSION_COVID_COL,
41+
f"{TOTAL_ADMISSION_FLU}_prelim": TOTAL_ADMISSION_FLU_COL,
42+
f"{NUM_HOSP_REPORTING_COVID}_prelim": NUM_HOSP_REPORTING_COVID_COL,
43+
f"{NUM_HOSP_REPORTING_FLU}_prelim": NUM_HOSP_REPORTING_FLU_COL,
2844
}
45+
2946
PRELIM_TYPE_DICT = {
3047
"timestamp": "datetime64[ns]",
3148
"geo_id": str,
32-
"confirmed_admissions_covid_ew_prelim": float,
33-
"confirmed_admissions_flu_ew_prelim": float,
49+
f"{TOTAL_ADMISSION_COVID}_prelim": float,
50+
f"{TOTAL_ADMISSION_FLU}_prelim": float,
51+
f"{NUM_HOSP_REPORTING_COVID}_prelim": float,
52+
f"{NUM_HOSP_REPORTING_FLU}_prelim": float,
3453
}

nhsn/delphi_nhsn/pull.py

Lines changed: 90 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
# -*- coding: utf-8 -*-
22
"""Functions for pulling NSSP ER data."""
3+
import copy
34
import logging
5+
import random
6+
import time
7+
from datetime import datetime, timedelta
48
from pathlib import Path
59
from typing import Optional
10+
from urllib.error import HTTPError
611

712
import pandas as pd
813
from delphi_utils import create_backup_csv
@@ -11,20 +16,73 @@
1116
from .constants import MAIN_DATASET_ID, PRELIM_DATASET_ID, PRELIM_SIGNALS_MAP, PRELIM_TYPE_DICT, SIGNALS_MAP, TYPE_DICT
1217

1318

14-
def pull_data(socrata_token: str, dataset_id: str):
19+
def check_last_updated(socrata_token, dataset_id, logger):
20+
"""
21+
Check last updated timestamp to determine if data should be pulled or not.
22+
23+
Note -- if the call to the API fails, the behavior is to treat the data as stale,
24+
as possibly having duplicate is preferable to missing data
25+
26+
Parameters
27+
----------
28+
socrata_token
29+
dataset_id
30+
logger
31+
32+
Returns bool
33+
-------
34+
35+
"""
36+
recently_updated_source = True
37+
try:
38+
client = Socrata("data.cdc.gov", socrata_token)
39+
response = client.get_metadata(dataset_id)
40+
41+
updated_timestamp = datetime.utcfromtimestamp(int(response["rowsUpdatedAt"]))
42+
now = datetime.utcnow()
43+
recently_updated_source = (now - updated_timestamp) < timedelta(days=1)
44+
45+
prelim_prefix = "Preliminary " if dataset_id == PRELIM_DATASET_ID else ""
46+
if recently_updated_source:
47+
logger.info(
48+
f"{prelim_prefix}NHSN data was recently updated; Pulling data", updated_timestamp=updated_timestamp
49+
)
50+
else:
51+
logger.info(f"{prelim_prefix}NHSN data is stale; Skipping", updated_timestamp=updated_timestamp)
52+
# pylint: disable=W0703
53+
except Exception as e:
54+
logger.info("error while processing socrata metadata; treating data as stale", error=str(e))
55+
return recently_updated_source
56+
57+
58+
def pull_data(socrata_token: str, dataset_id: str, backup_dir: str, logger):
1559
"""Pull data from Socrata API."""
1660
client = Socrata("data.cdc.gov", socrata_token)
61+
logger.info("Pulling data from Socrata API")
1762
results = []
1863
offset = 0
1964
limit = 50000 # maximum limit allowed by SODA 2.0
20-
while True:
65+
# retry logic for 500 error
66+
try:
2167
page = client.get(dataset_id, limit=limit, offset=offset)
22-
if not page:
23-
break # exit the loop if no more results
68+
except HTTPError as err:
69+
if err.code == 503:
70+
time.sleep(2 + random.randint(0, 1000) / 1000.0)
71+
page = client.get(dataset_id, limit=limit, offset=offset)
72+
else:
73+
logger.info("Error pulling data from Socrata API", error=str(err))
74+
raise err
75+
76+
while len(page) > 0:
2477
results.extend(page)
2578
offset += limit
79+
page = client.get(dataset_id, limit=limit, offset=offset)
2680

27-
df = pd.DataFrame.from_records(results)
81+
if results:
82+
df = pd.DataFrame.from_records(results)
83+
create_backup_csv(df, backup_dir, False, logger=logger)
84+
else:
85+
df = pd.DataFrame()
2886
return df
2987

3088

@@ -89,25 +147,33 @@ def pull_nhsn_data(
89147
"""
90148
# Pull data from Socrata API
91149
df = (
92-
pull_data(socrata_token, dataset_id=MAIN_DATASET_ID)
150+
pull_data(socrata_token, MAIN_DATASET_ID, backup_dir, logger)
93151
if not custom_run
94152
else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=False)
95153
)
96154

97-
keep_columns = list(TYPE_DICT.keys())
155+
recently_updated = True if custom_run else check_last_updated(socrata_token, MAIN_DATASET_ID, logger)
98156

99-
if not df.empty:
100-
create_backup_csv(df, backup_dir, custom_run, logger=logger)
157+
keep_columns = list(TYPE_DICT.keys())
101158

159+
if not df.empty and recently_updated:
102160
df = df.rename(columns={"weekendingdate": "timestamp", "jurisdiction": "geo_id"})
161+
filtered_type_dict = copy.deepcopy(TYPE_DICT)
103162

104163
for signal, col_name in SIGNALS_MAP.items():
105-
df[signal] = df[col_name]
164+
# older backups don't have certain columns
165+
try:
166+
df[signal] = df[col_name]
167+
except KeyError:
168+
logger.info("column not available in data", col_name=col_name)
169+
keep_columns.remove(signal)
170+
del filtered_type_dict[signal]
106171

107172
df = df[keep_columns]
108173
df["geo_id"] = df["geo_id"].str.lower()
109174
df.loc[df["geo_id"] == "usa", "geo_id"] = "us"
110-
df = df.astype(TYPE_DICT)
175+
176+
df = df.astype(filtered_type_dict)
111177
else:
112178
df = pd.DataFrame(columns=keep_columns)
113179

@@ -144,24 +210,31 @@ def pull_preliminary_nhsn_data(
144210
pd.DataFrame
145211
Dataframe as described above.
146212
"""
213+
# Pull data from Socrata API
147214
df = (
148-
pull_data(socrata_token, dataset_id=PRELIM_DATASET_ID)
215+
pull_data(socrata_token, PRELIM_DATASET_ID, backup_dir, logger)
149216
if not custom_run
150217
else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=True)
151218
)
152219

153220
keep_columns = list(PRELIM_TYPE_DICT.keys())
221+
recently_updated = True if custom_run else check_last_updated(socrata_token, PRELIM_DATASET_ID, logger)
154222

155-
if not df.empty:
156-
create_backup_csv(df, backup_dir, custom_run, sensor="prelim", logger=logger)
157-
223+
if not df.empty and recently_updated:
158224
df = df.rename(columns={"weekendingdate": "timestamp", "jurisdiction": "geo_id"})
225+
filtered_type_dict = copy.deepcopy(PRELIM_TYPE_DICT)
159226

160227
for signal, col_name in PRELIM_SIGNALS_MAP.items():
161-
df[signal] = df[col_name]
228+
try:
229+
df[signal] = df[col_name]
230+
except KeyError:
231+
logger.info("column not available in data", col_name=col_name, signal=signal)
232+
keep_columns.remove(signal)
233+
del filtered_type_dict[signal]
162234

163235
df = df[keep_columns]
164-
df = df.astype(PRELIM_TYPE_DICT)
236+
df = df.astype(filtered_type_dict)
237+
165238
df["geo_id"] = df["geo_id"].str.lower()
166239
df.loc[df["geo_id"] == "usa", "geo_id"] = "us"
167240
else:

nhsn/delphi_nhsn/run.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@
1414
unpublished signals are. See `delphi_utils.add_prefix()`
1515
- Any other indicator-specific settings
1616
"""
17+
import re
1718
import time
1819
from datetime import date, datetime, timedelta
20+
from itertools import product
1921

2022
import numpy as np
2123
from delphi_utils import GeoMapper, get_structured_logger
@@ -59,16 +61,20 @@ def run_module(params, logger=None):
5961
)
6062

6163
geo_mapper = GeoMapper()
62-
signal_df_dict = {signal: nhsn_df for signal in SIGNALS_MAP}
63-
# some of the source backups do not include for preliminary data TODO remove after first patch
64+
signal_df_dict = dict()
65+
if not nhsn_df.empty:
66+
signal_df_dict.update({signal: nhsn_df for signal in SIGNALS_MAP})
67+
# some of the source backups do not include for preliminary data
6468
if not preliminary_nhsn_df.empty:
6569
signal_df_dict.update({signal: preliminary_nhsn_df for signal in PRELIM_SIGNALS_MAP})
6670

67-
for signal, df_pull in signal_df_dict.items():
68-
for geo in GEOS:
69-
df = df_pull.copy()
71+
for geo, signals_df in product(GEOS, signal_df_dict.items()):
72+
signal, df_pull = signals_df
73+
df = df_pull.copy()
74+
try:
7075
df = df[["timestamp", "geo_id", signal]]
7176
df.rename({signal: "val"}, axis=1, inplace=True)
77+
7278
if geo == "nation":
7379
df = df[df["geo_id"] == "us"]
7480
elif geo == "hhs":
@@ -96,6 +102,14 @@ def run_module(params, logger=None):
96102
)
97103
if len(dates) > 0:
98104
run_stats.append((max(dates), len(dates)))
105+
# some signal columns are unavailable for patching.
106+
except KeyError as e:
107+
missing_signal = re.search(r"'([^']*)'", str(e)).group(1)
108+
full_signal_list = list(SIGNALS_MAP.keys()) + list(PRELIM_SIGNALS_MAP.keys())
109+
if missing_signal in full_signal_list:
110+
logger.info("signal not available in data", signal=missing_signal)
111+
else:
112+
raise RuntimeError("Column(s) that shouldn't be missing is missing") from e
99113

100114
elapsed_time_in_seconds = round(time.time() - start_time, 2)
101115
min_max_date = run_stats and min(s[0] for s in run_stats)

nhsn/tests/conftest.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import copy
22
import json
3+
import time
34
from unittest.mock import patch
45

56
import pytest
@@ -60,7 +61,8 @@ def params_w_patch(params):
6061

6162
@pytest.fixture(scope="function")
6263
def run_as_module(params):
63-
with patch('sodapy.Socrata.get') as mock_get:
64+
with patch('sodapy.Socrata.get') as mock_get, \
65+
patch('sodapy.Socrata.get_metadata') as mock_get_metadata:
6466
def side_effect(*args, **kwargs):
6567
if kwargs['offset'] == 0:
6668
if "ua7e-t2fy" in args[0]:
@@ -70,5 +72,6 @@ def side_effect(*args, **kwargs):
7072
else:
7173
return []
7274
mock_get.side_effect = side_effect
75+
mock_get_metadata.return_value = {"rowsUpdatedAt": time.time()}
7376
run_module(params)
7477

nhsn/tests/patch_dir/.gitignore

Whitespace-only changes.

nhsn/tests/test_data/20241119.csv.gz

3.34 KB
Binary file not shown.

nhsn/tests/test_patch.py

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
1-
import glob
21
import os
32
from collections import defaultdict
43
from pathlib import Path
54
import shutil
65
from unittest.mock import patch as mock_patch
7-
6+
import re
87
import pandas as pd
98
from datetime import datetime, timedelta
109

10+
import pytest
1111
from epiweeks import Week
1212

1313
from delphi_nhsn.patch import filter_source_files, patch
14-
from delphi_nhsn.constants import TOTAL_ADMISSION_COVID_API, TOTAL_ADMISSION_FLU_API
14+
from delphi_nhsn.constants import TOTAL_ADMISSION_COVID_COL, TOTAL_ADMISSION_FLU_COL, \
15+
NUM_HOSP_REPORTING_FLU_COL, NUM_HOSP_REPORTING_COVID_COL, GEOS, TOTAL_ADMISSION_COVID, TOTAL_ADMISSION_FLU, \
16+
NUM_HOSP_REPORTING_COVID, NUM_HOSP_REPORTING_FLU
1517
from conftest import TEST_DATA, PRELIM_TEST_DATA, TEST_DIR
1618

1719
class TestPatch:
@@ -85,11 +87,15 @@ def generate_test_source_files(self):
8587
custom_filename = f"{TEST_DIR}/backups/{date}.csv.gz"
8688
custom_filename_prelim = f"{TEST_DIR}/backups/{date}_prelim.csv.gz"
8789
test_data = pd.DataFrame(TEST_DATA)
88-
test_data[TOTAL_ADMISSION_COVID_API] = int(date)
89-
test_data[TOTAL_ADMISSION_FLU_API] = int(date)
90+
test_data[TOTAL_ADMISSION_COVID_COL] = int(date)
91+
test_data[TOTAL_ADMISSION_FLU_COL] = int(date)
92+
test_data[NUM_HOSP_REPORTING_COVID_COL] = int(date)
93+
test_data[NUM_HOSP_REPORTING_FLU_COL] = int(date)
9094
test_prelim_data = pd.DataFrame(PRELIM_TEST_DATA)
91-
test_prelim_data[TOTAL_ADMISSION_COVID_API] = int(date)
92-
test_prelim_data[TOTAL_ADMISSION_FLU_API] = int(date)
95+
test_prelim_data[TOTAL_ADMISSION_COVID_COL] = int(date)
96+
test_prelim_data[TOTAL_ADMISSION_FLU_COL] = int(date)
97+
test_prelim_data[NUM_HOSP_REPORTING_COVID_COL] = int(date)
98+
test_prelim_data[NUM_HOSP_REPORTING_FLU_COL] = int(date)
9399

94100
test_data = test_data.head(2)
95101
test_data.to_csv(
@@ -108,21 +114,39 @@ def test_patch(self, params_w_patch):
108114
file_list, prelim_file_list = self.generate_test_source_files()
109115
patch(params_w_patch)
110116

111-
for issue_path in Path(f"{TEST_DIR}/patch_dir").glob("*"):
117+
for issue_path in Path(f"{TEST_DIR}/patch_dir").glob("issue*"):
112118
issue_dt_str = issue_path.name.replace("issue_", "")
113119
for file in Path(issue_path / "nhsn").iterdir():
114120
df = pd.read_csv(file)
115121
assert issue_dt_str == str(int(df["val"][0]))
116122

117123
# clean up
118-
shutil.rmtree(f"{TEST_DIR}/patch_dir")
124+
for file in Path(f"{TEST_DIR}/patch_dir").glob("issue*"):
125+
shutil.rmtree(file)
119126

120127
for file in file_list:
121128
os.remove(file)
122129

123130
for file in prelim_file_list:
124131
os.remove(file)
125132

133+
def test_patch_incomplete_file(self, params_w_patch):
134+
os.makedirs(params_w_patch["patch"]["patch_dir"], exist_ok=True)
135+
issue_date = "20241119"
136+
existing_signals = [TOTAL_ADMISSION_COVID, TOTAL_ADMISSION_FLU]
137+
backup_dir = params_w_patch.get("common").get("backup_dir")
138+
shutil.copy(f"{TEST_DIR}/test_data/{issue_date}.csv.gz", backup_dir)
139+
140+
with mock_patch("delphi_nhsn.patch.read_params", return_value=params_w_patch):
141+
patch(params_w_patch)
142+
143+
files = list(Path(f"{TEST_DIR}/patch_dir/issue_{issue_date}/nhsn").glob("*.csv"))
144+
dates = set([re.search(r"\d{6}", file.name).group() for file in files])
145+
assert len(files) == len(GEOS) * len(existing_signals) * len(dates)
146+
# clean up
147+
for file in Path(f"{TEST_DIR}/patch_dir").glob("issue*"):
148+
shutil.rmtree(file)
149+
126150

127151

128152

0 commit comments

Comments
 (0)