Skip to content

Commit ed89c11

Browse files
authored
Merge pull request #2108 from cmu-delphi/release/indicators_v0.3.59_utils_v0.3.26
Release covidcast-indicators 0.3.59
2 parents a7eaab6 + 21350ab commit ed89c11

File tree

29 files changed

+498
-233
lines changed

29 files changed

+498
-233
lines changed

.bumpversion.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 0.3.58
2+
current_version = 0.3.59
33
commit = True
44
message = chore: bump covidcast-indicators to {new_version}
55
tag = False

ansible/templates/sir_complainsalot-params-prod.json.j2

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,13 @@
4444
},
4545
"nssp": {
4646
"max_age":19,
47-
"maintainers": []
47+
"maintainers": [],
48+
"retired-signals": [
49+
"pct_ed_visits_combined_2023rvr",
50+
"pct_ed_visits_covid_2023rvr",
51+
"pct_ed_visits_influenza_2023rvr",
52+
"pct_ed_visits_rsv_2023rvr"
53+
]
4854
},
4955
"nhsn": {
5056
"max_age":19,

changehc/version.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
current_version = 0.3.58
1+
current_version = 0.3.59

claims_hosp/version.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
current_version = 0.3.58
1+
current_version = 0.3.59

doctor_visits/version.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
current_version = 0.3.58
1+
current_version = 0.3.59

google_symptoms/version.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
current_version = 0.3.58
1+
current_version = 0.3.59

hhs_hosp/version.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
current_version = 0.3.58
1+
current_version = 0.3.59

nchs_mortality/version.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
current_version = 0.3.58
1+
current_version = 0.3.59

nhsn/delphi_nhsn/constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
GEOS = ["state", "nation", "hhs"]
44

5+
MAIN_DATASET_ID = "ua7e-t2fy"
6+
PRELIM_DATASET_ID = "mpgq-jmmr"
7+
58
# column name from socrata
69
TOTAL_ADMISSION_COVID_API = "totalconfc19newadm"
710
TOTAL_ADMISSION_FLU_API = "totalconfflunewadm"

nhsn/delphi_nhsn/patch.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
"""
2+
This module is used for patching data in the delphi_nhsn package.
3+
4+
To use this module, you need to specify the range of issue dates in params.json, like so:
5+
6+
{
7+
"common": {
8+
...
9+
},
10+
"validation": {
11+
...
12+
},
13+
"patch": {
14+
"patch_dir": "/Users/minhkhuele/Desktop/delphi/covidcast-indicators/nhsn/patch"
15+
}
16+
}
17+
18+
It will generate data for the range of issue dates corresponding to source data files available in "backup_dir"
19+
specified under "common", and store them in batch issue format under "patch_dir":
20+
[name-of-patch]/issue_[issue-date]/nhsn/actual_data_file.csv
21+
"""
22+
23+
from datetime import datetime
24+
from os import makedirs
25+
from pathlib import Path
26+
from typing import List
27+
28+
from delphi_utils import get_structured_logger, read_params
29+
from epiweeks import Week
30+
31+
from .run import run_module
32+
33+
34+
def filter_source_files(source_files: List[Path]):
35+
"""
36+
Filter patch files such that each element in the list is an unique epiweek with the latest issue date.
37+
38+
Parameters
39+
----------
40+
source_files
41+
42+
Returns
43+
-------
44+
list of issue dates
45+
46+
"""
47+
epiweek_dict = dict()
48+
49+
for file in source_files:
50+
if "prelim" not in file.stem:
51+
current_issue_date = datetime.strptime(file.name.split(".")[0], "%Y%m%d")
52+
epiweek = Week.fromdate(current_issue_date)
53+
epiweek_dict[epiweek] = file
54+
55+
filtered_patch_list = list(epiweek_dict.values())
56+
return filtered_patch_list
57+
58+
59+
def patch(params):
60+
"""
61+
Run the doctor visits indicator for a range of issue dates.
62+
63+
The range of issue dates is specified in params.json using the following keys:
64+
- "patch": Only used for patching data
65+
- "patch_dir": str, directory to write all issues output
66+
"""
67+
logger = get_structured_logger("delphi_nhsn.patch", filename=params["common"]["log_filename"])
68+
69+
source_files = sorted(Path(params["common"]["backup_dir"]).glob("*.csv.gz"))
70+
makedirs(params["patch"]["patch_dir"], exist_ok=True)
71+
72+
logger.info(
73+
"Starting patching",
74+
patch_directory=params["patch"]["patch_dir"],
75+
start_issue=source_files[0].name.split(".")[0],
76+
end_issue=source_files[-1].name.split(".")[0],
77+
)
78+
79+
patch_list = filter_source_files(source_files)
80+
for file in patch_list:
81+
issue_date_str = file.name.split(".")[0]
82+
logger.info("Running issue", issue_date=datetime.strptime(issue_date_str, "%Y%m%d").strftime("%Y-%m-%d"))
83+
params["patch"]["issue_date"] = issue_date_str
84+
# regardless of week date type or not the directory name must be issue_date_YYYYMMDD
85+
# conversion in done in acquisition
86+
current_issue_dir = f"{params['patch']['patch_dir']}/issue_{issue_date_str}/nhsn"
87+
makedirs(current_issue_dir, exist_ok=True)
88+
params["common"]["export_dir"] = current_issue_dir
89+
params["common"]["custom_run"] = True
90+
run_module(params, logger)
91+
92+
93+
if __name__ == "__main__":
94+
patch(read_params())

nhsn/delphi_nhsn/pull.py

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
# -*- coding: utf-8 -*-
22
"""Functions for pulling NSSP ER data."""
33
import logging
4+
from pathlib import Path
45
from typing import Optional
56

67
import pandas as pd
78
from delphi_utils import create_backup_csv
89
from sodapy import Socrata
910

10-
from .constants import PRELIM_SIGNALS_MAP, PRELIM_TYPE_DICT, SIGNALS_MAP, TYPE_DICT
11+
from .constants import MAIN_DATASET_ID, PRELIM_DATASET_ID, PRELIM_SIGNALS_MAP, PRELIM_TYPE_DICT, SIGNALS_MAP, TYPE_DICT
1112

1213

1314
def pull_data(socrata_token: str, dataset_id: str):
@@ -27,7 +28,42 @@ def pull_data(socrata_token: str, dataset_id: str):
2728
return df
2829

2930

30-
def pull_nhsn_data(socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None):
31+
def pull_data_from_file(filepath: str, issue_date: str, logger, prelim_flag=False) -> pd.DataFrame:
32+
"""
33+
Pull data from source file.
34+
35+
The source file is generated from delphi_utils.create_backup_csv
36+
Parameters
37+
----------
38+
filepath: full path where the source file is located
39+
issue_date: date when the file was pulled / generated
40+
logger
41+
prelim_flag: boolean to indicate which dataset to grab
42+
43+
Returns
44+
-------
45+
pd.DataFrame
46+
Dataframe as described above.
47+
"""
48+
df = pd.DataFrame()
49+
if issue_date:
50+
issue_date = issue_date.replace("-", "")
51+
filename = f"{issue_date}_prelim.csv.gz" if prelim_flag else f"{issue_date}.csv.gz"
52+
backup_file = Path(filepath, filename)
53+
54+
if backup_file.exists():
55+
df = pd.read_csv(backup_file, compression="gzip")
56+
logger.info("Pulling data from file", file=filename, num_rows=len(df))
57+
return df
58+
59+
60+
def pull_nhsn_data(
61+
socrata_token: str,
62+
backup_dir: str,
63+
custom_run: bool,
64+
issue_date: Optional[str],
65+
logger: Optional[logging.Logger] = None,
66+
):
3167
"""Pull the latest NHSN hospital admission data, and conforms it into a dataset.
3268
3369
The output dataset has:
@@ -52,7 +88,11 @@ def pull_nhsn_data(socrata_token: str, backup_dir: str, custom_run: bool, logger
5288
Dataframe as described above.
5389
"""
5490
# Pull data from Socrata API
55-
df = pull_data(socrata_token, dataset_id="ua7e-t2fy")
91+
df = (
92+
pull_data(socrata_token, dataset_id=MAIN_DATASET_ID)
93+
if not custom_run
94+
else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=False)
95+
)
5696

5797
keep_columns = list(TYPE_DICT.keys())
5898

@@ -75,7 +115,11 @@ def pull_nhsn_data(socrata_token: str, backup_dir: str, custom_run: bool, logger
75115

76116

77117
def pull_preliminary_nhsn_data(
78-
socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None
118+
socrata_token: str,
119+
backup_dir: str,
120+
custom_run: bool,
121+
issue_date: Optional[str],
122+
logger: Optional[logging.Logger] = None,
79123
):
80124
"""Pull the latest preliminary NHSN hospital admission data, and conforms it into a dataset.
81125
@@ -100,8 +144,11 @@ def pull_preliminary_nhsn_data(
100144
pd.DataFrame
101145
Dataframe as described above.
102146
"""
103-
# Pull data from Socrata API
104-
df = pull_data(socrata_token, dataset_id="mpgq-jmmr")
147+
df = (
148+
pull_data(socrata_token, dataset_id=PRELIM_DATASET_ID)
149+
if not custom_run
150+
else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=True)
151+
)
105152

106153
keep_columns = list(PRELIM_TYPE_DICT.keys())
107154

nhsn/delphi_nhsn/run.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from .pull import pull_nhsn_data, pull_preliminary_nhsn_data
2626

2727

28-
def run_module(params):
28+
def run_module(params, logger=None):
2929
"""
3030
Run the indicator.
3131
@@ -35,14 +35,16 @@ def run_module(params):
3535
Nested dictionary of parameters.
3636
"""
3737
start_time = time.time()
38-
logger = get_structured_logger(
39-
__name__,
40-
filename=params["common"].get("log_filename"),
41-
log_exceptions=params["common"].get("log_exceptions", True),
42-
)
38+
if not logger:
39+
logger = get_structured_logger(
40+
__name__,
41+
filename=params["common"].get("log_filename"),
42+
log_exceptions=params["common"].get("log_exceptions", True),
43+
)
4344
export_dir = params["common"]["export_dir"]
4445
backup_dir = params["common"]["backup_dir"]
4546
custom_run = params["common"].get("custom_run", False)
47+
issue_date = params.get("patch", dict()).get("issue_date", None)
4648
socrata_token = params["indicator"]["socrata_token"]
4749
export_start_date = params["indicator"]["export_start_date"]
4850
run_stats = []
@@ -51,12 +53,16 @@ def run_module(params):
5153
export_start_date = date.today() - timedelta(days=date.today().weekday() + 2)
5254
export_start_date = export_start_date.strftime("%Y-%m-%d")
5355

54-
nhsn_df = pull_nhsn_data(socrata_token, backup_dir, custom_run=custom_run, logger=logger)
55-
preliminary_nhsn_df = pull_preliminary_nhsn_data(socrata_token, backup_dir, custom_run=custom_run, logger=logger)
56+
nhsn_df = pull_nhsn_data(socrata_token, backup_dir, custom_run=custom_run, issue_date=issue_date, logger=logger)
57+
preliminary_nhsn_df = pull_preliminary_nhsn_data(
58+
socrata_token, backup_dir, custom_run=custom_run, issue_date=issue_date, logger=logger
59+
)
5660

5761
geo_mapper = GeoMapper()
5862
signal_df_dict = {signal: nhsn_df for signal in SIGNALS_MAP}
59-
signal_df_dict.update({signal: preliminary_nhsn_df for signal in PRELIM_SIGNALS_MAP})
63+
# some of the source backups do not include for preliminary data TODO remove after first patch
64+
if not preliminary_nhsn_df.empty:
65+
signal_df_dict.update({signal: preliminary_nhsn_df for signal in PRELIM_SIGNALS_MAP})
6066

6167
for signal, df_pull in signal_df_dict.items():
6268
for geo in GEOS:
@@ -67,14 +73,17 @@ def run_module(params):
6773
df = df[df["geo_id"] == "us"]
6874
elif geo == "hhs":
6975
df = df[df["geo_id"] != "us"]
76+
df = df[df["geo_id"].str.len() == 2]
7077
df.rename(columns={"geo_id": "state_id"}, inplace=True)
7178
df = geo_mapper.add_geocode(df, "state_id", "state_code", from_col="state_id")
7279
df = geo_mapper.add_geocode(df, "state_code", "hhs", from_col="state_code", new_col="hhs")
7380
df = geo_mapper.replace_geocode(
7481
df, from_col="state_code", from_code="state_code", new_col="geo_id", new_code="hhs"
7582
)
76-
else:
83+
elif geo == "state":
7784
df = df[df_pull["geo_id"] != "us"]
85+
df = df[df["geo_id"].str.len() == 2] # hhs region is a value in geo_id column
86+
7887
df["se"] = np.nan
7988
df["sample_size"] = np.nan
8089
dates = create_export_csv(

nhsn/tests/conftest.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616
# queries the nhsn data with timestamp (2021-08-19, 2021-10-19) with CO and USA data
1717

1818

19-
with open("test_data/page.json", "r") as f:
19+
with open(f"{TEST_DIR}/test_data/page.json", "r") as f:
2020
TEST_DATA = json.load(f)
2121

22-
with open("test_data/prelim_page.json", "r") as f:
22+
with open(f"{TEST_DIR}/test_data/prelim_page.json", "r") as f:
2323
PRELIM_TEST_DATA = json.load(f)
2424

2525
@pytest.fixture(scope="session")
@@ -50,11 +50,12 @@ def params():
5050
@pytest.fixture
5151
def params_w_patch(params):
5252
params_copy = copy.deepcopy(params)
53+
params_copy["common"]["custom_run"] = True
5354
params_copy["patch"] = {
54-
"start_issue": "2024-06-27",
55-
"end_issue": "2024-06-29",
56-
"patch_dir": "./patch_dir"
55+
"patch_dir": f"{TEST_DIR}/patch_dir",
56+
"issue_date": "2024-12-12",
5757
}
58+
5859
return params_copy
5960

6061
@pytest.fixture(scope="function")

nhsn/tests/test_data/20241212.csv.gz

3.39 KB
Binary file not shown.
3.33 KB
Binary file not shown.

0 commit comments

Comments
 (0)