Skip to content

Commit 7f07c41

Browse files
authored
Merge pull request #233 from cmu-delphi/nchs_mortality
Add `percent_of_expected_deaths` signal and dry-run mode to NCHS mortality data pipeline
2 parents 2904c1d + 4999ced commit 7f07c41

File tree

20 files changed

+5462
-217
lines changed

20 files changed

+5462
-217
lines changed

_delphi_utils_python/delphi_utils/archive.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,11 @@ def update_cache(self):
351351

352352
self._cache_updated = True
353353

354-
def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]:
354+
def archive_exports(self,
355+
exported_files: Files,
356+
update_cache: bool = True,
357+
update_s3: bool = True
358+
) -> Tuple[Files, Files]:
355359
"""
356360
Handles actual archiving of files to the S3 bucket.
357361
@@ -375,10 +379,12 @@ def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]:
375379
archive_key = join(self.indicator_prefix, basename(exported_file))
376380

377381
try:
378-
# Update local cache
379-
shutil.copyfile(exported_file, cached_file)
382+
if update_cache:
383+
# Update local cache
384+
shutil.copyfile(exported_file, cached_file)
380385

381-
self.bucket.Object(archive_key).upload_file(exported_file)
386+
if update_s3:
387+
self.bucket.Object(archive_key).upload_file(exported_file)
382388

383389
archive_success.append(exported_file)
384390
except FileNotFoundError:

nchs_mortality/DETAILS.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ consistency how NCHS reports the data, please refer to [Exceptions](#Exceptions)
1313
* `covid_deaths`: All Deaths with confirmed or presumed COVID-19,
1414
coded to ICD–10 code U07.1
1515
* `total_deaths`: Deaths from all causes.
16+
* `percent_of_expected_deaths`: the number of deaths for all causes for this
17+
week in 2020 compared to the average number
18+
across the same week in 2017–2019.
1619
* `pneumonia_deaths`: Counts of deaths involving Pneumonia, with or without
1720
COVID-19, excluding Influenza deaths(J12.0-J18.9).
1821
* `pneumonia_and_covid_deaths`: Counts of deaths involving COVID-19 and Pneumonia,
@@ -24,9 +27,12 @@ consistency how NCHS reports the data, please refer to [Exceptions](#Exceptions)
2427
Influenza, or COVID-19, coded to ICD–10
2528
codes U07.1 or J09–J18.9
2629

30+
Detailed descriptions are provided in the notes under Table 1 [here](https://www.cdc.gov/nchs/nvss/vsrr/COVID19/index.htm).
31+
2732
## Metrics, Level 2 (`m2`)
2833
* `num`: number of new deaths on a given week
2934
* `prop`: `num` / population * 100,000
35+
* _**No** `m2` for signal `percent_of_expected_deaths`._
3036

3137
## Exceptions
3238

@@ -47,3 +53,10 @@ refers to an epiweek). However, NCHS reports their weekly data from Saturday to
4753
Saturday. We assume there is a one day shift. For example, they report a death counts
4854
for Alaska in a week starting from date D, we will report the timestamp of this report
4955
as the corresponding epiweek of date(D + 1).
56+
57+
### Data Versioning
58+
Data versions are tracked on both a daily and weekly level.
59+
On a daily level, we check for updates for NCHS mortality data every weekday as how it is reported by
60+
CDC and stash these daily updates on S3, but not our API.
61+
On a weekly level (on Mondays), we additionally upload the changes to the data
62+
made over the past week (due to backfill) to our API.

nchs_mortality/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ the state-level data as-is. For detailed information see the files
1010
steps below to create a MyAppToken.
1111
- Click the `Sign up for an app toekn` buttom in the linked website
1212
- Sign In or Sign Up with Socrata ID
13-
- Clck the `Create New App Token` buttom
13+
- Clck the `Create New App Token` button
1414
- Fill in `Application Name` and `Description` (You can just use NCHS_Mortality
1515
for both) and click `Save`
1616
- Copy the `App Token`

nchs_mortality/daily_cache/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
*.csv
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
*.csv

nchs_mortality/delphi_nchs_mortality/pull.py

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import pandas as pd
44
from sodapy import Socrata
55

6-
def pull_nchs_mortality_data(token: str, map_df: pd.DataFrame) -> pd.DataFrame:
6+
def pull_nchs_mortality_data(token: str, map_df: pd.DataFrame, test_mode: str):
77
"""Pulls the latest NCHS Mortality data, and conforms it into a dataset
88
99
The output dataset has:
@@ -23,24 +23,30 @@ def pull_nchs_mortality_data(token: str, map_df: pd.DataFrame) -> pd.DataFrame:
2323
My App Token for pulling the NCHS mortality data
2424
map_df: pd.DataFrame
2525
Read from static file "state_pop.csv".
26+
test_mode:str
27+
Check whether to run in a test mode
2628
2729
Returns
2830
-------
2931
pd.DataFrame
3032
Dataframe as described above.
3133
"""
3234
# Constants
33-
KEEP_COLUMNS = ['covid_deaths', 'total_deaths', 'pneumonia_deaths',
35+
KEEP_COLUMNS = ['covid_deaths', 'total_deaths',
36+
'percent_of_expected_deaths', 'pneumonia_deaths',
3437
'pneumonia_and_covid_deaths', 'influenza_deaths',
3538
'pneumonia_influenza_or_covid_19_deaths']
3639
TYPE_DICT = {key: float for key in KEEP_COLUMNS}
3740
TYPE_DICT["timestamp"] = 'datetime64[ns]'
3841

39-
# Pull data from Socrata API
40-
client = Socrata("data.cdc.gov", token)
41-
results = client.get("r8kw-7aab", limit=10**10)
42-
df = pd.DataFrame.from_records(results).rename(
43-
{"start_week": "timestamp"}, axis=1)
42+
if test_mode == "":
43+
# Pull data from Socrata API
44+
client = Socrata("data.cdc.gov", token)
45+
results = client.get("r8kw-7aab", limit=10**10)
46+
df = pd.DataFrame.from_records(results).rename(
47+
{"start_week": "timestamp"}, axis=1)
48+
else:
49+
df = pd.read_csv("./test_data/%s"%test_mode)
4450

4551
# Check missing start_week == end_week
4652
try:
@@ -50,7 +56,13 @@ def pull_nchs_mortality_data(token: str, map_df: pd.DataFrame) -> pd.DataFrame:
5056
"end_week is not always the same as start_week, check the raw file"
5157
)
5258

53-
df = df.astype(TYPE_DICT)
59+
try:
60+
df = df.astype(TYPE_DICT)
61+
except KeyError:
62+
raise ValueError("Expected column(s) missed, The dataset "
63+
"schema may have changed. Please investigate and "
64+
"amend the code.")
65+
5466
df = df[df["state"] != "United States"]
5567
df.loc[df["state"] == "New York City", "state"] = "New York"
5668

@@ -79,10 +91,6 @@ def pull_nchs_mortality_data(token: str, map_df: pd.DataFrame) -> pd.DataFrame:
7991

8092
# Add population info
8193
KEEP_COLUMNS.extend(["timestamp", "geo_id", "population"])
82-
try:
83-
df = df.merge(map_df, on="state")[KEEP_COLUMNS]
84-
except KeyError:
85-
raise ValueError("Expected column(s) missed, The dataset "
86-
"schema may have changed. Please investigate and "
87-
"amend the code.")
94+
df = df.merge(map_df, on="state")[KEEP_COLUMNS]
95+
8896
return df

nchs_mortality/delphi_nchs_mortality/run.py

Lines changed: 102 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,21 @@
55
when the module is run with `python -m MODULE_NAME`.
66
"""
77
from datetime import datetime, date, timedelta
8-
from itertools import product
98
from os.path import join
9+
from os import remove, listdir
10+
from shutil import copy
1011

1112
import numpy as np
1213
import pandas as pd
13-
from delphi_utils import read_params
14+
from delphi_utils import read_params, S3ArchiveDiffer
1415

1516
from .pull import pull_nchs_mortality_data
1617
from .export import export_csv
1718

1819
# global constants
1920
METRICS = [
20-
'covid_deaths', 'total_deaths', 'pneumonia_deaths',
21-
'pneumonia_and_covid_deaths', 'influenza_deaths',
21+
'covid_deaths', 'total_deaths', 'percent_of_expected_deaths',
22+
'pneumonia_deaths', 'pneumonia_and_covid_deaths', 'influenza_deaths',
2223
'pneumonia_influenza_or_covid_19_deaths'
2324
]
2425
SENSORS = [
@@ -37,27 +38,109 @@ def run_module():
3738
days=date.today().weekday() + 2)
3839
export_start_date = export_start_date.strftime('%Y-%m-%d')
3940
export_dir = params["export_dir"]
41+
daily_export_dir = params["daily_export_dir"]
42+
cache_dir = params["cache_dir"]
43+
daily_cache_dir = params["daily_cache_dir"]
4044
static_file_dir = params["static_file_dir"]
4145
token = params["token"]
46+
test_mode = params["mode"]
47+
48+
daily_arch_diff = S3ArchiveDiffer(
49+
daily_cache_dir, daily_export_dir,
50+
params["bucket_name"], "nchs_mortality",
51+
params["aws_credentials"])
52+
daily_arch_diff.update_cache()
4253

4354
map_df = pd.read_csv(
4455
join(static_file_dir, "state_pop.csv"), dtype={"fips": int}
4556
)
4657

47-
df = pull_nchs_mortality_data(token, map_df)
48-
for metric, sensor in product(METRICS, SENSORS):
49-
print(metric, sensor)
50-
if sensor == "num":
58+
df = pull_nchs_mortality_data(token, map_df, test_mode)
59+
for metric in METRICS:
60+
if metric == 'percent_of_expected_deaths':
61+
print(metric)
5162
df["val"] = df[metric]
63+
df["se"] = np.nan
64+
df["sample_size"] = np.nan
65+
sensor_name = "_".join(["wip", metric])
66+
export_csv(
67+
df,
68+
geo_name=geo_res,
69+
export_dir=daily_export_dir,
70+
start_date=datetime.strptime(export_start_date, "%Y-%m-%d"),
71+
sensor=sensor_name,
72+
)
5273
else:
53-
df["val"] = df[metric] / df["population"] * INCIDENCE_BASE
54-
df["se"] = np.nan
55-
df["sample_size"] = np.nan
56-
sensor_name = "_".join(["wip", metric, sensor])
57-
export_csv(
58-
df,
59-
geo_name=geo_res,
60-
export_dir=export_dir,
61-
start_date=datetime.strptime(export_start_date, "%Y-%m-%d"),
62-
sensor=sensor_name,
63-
)
74+
for sensor in SENSORS:
75+
print(metric, sensor)
76+
if sensor == "num":
77+
df["val"] = df[metric]
78+
else:
79+
df["val"] = df[metric] / df["population"] * INCIDENCE_BASE
80+
df["se"] = np.nan
81+
df["sample_size"] = np.nan
82+
sensor_name = "_".join(["wip", metric, sensor])
83+
export_csv(
84+
df,
85+
geo_name=geo_res,
86+
export_dir=daily_export_dir,
87+
start_date=datetime.strptime(export_start_date, "%Y-%m-%d"),
88+
sensor=sensor_name,
89+
)
90+
91+
# Weekly run of archive utility on Monday
92+
# - Does not upload to S3, that is handled by daily run of archive utility
93+
# - Exports issues into receiving for the API
94+
if datetime.today().weekday() == 0:
95+
# Copy todays raw output to receiving
96+
for output_file in listdir(daily_export_dir):
97+
copy(
98+
join(daily_export_dir, output_file),
99+
join(export_dir, output_file))
100+
101+
weekly_arch_diff = S3ArchiveDiffer(
102+
cache_dir, export_dir,
103+
params["bucket_name"], "nchs_mortality",
104+
params["aws_credentials"])
105+
106+
# Dont update cache from S3 (has daily files), only simulate a update_cache() call
107+
weekly_arch_diff._cache_updated = True
108+
109+
# Diff exports, and make incremental versions
110+
_, common_diffs, new_files = weekly_arch_diff.diff_exports()
111+
112+
# Archive changed and new files only
113+
to_archive = [f for f, diff in common_diffs.items() if diff is not None]
114+
to_archive += new_files
115+
_, fails = weekly_arch_diff.archive_exports(to_archive, update_s3=False)
116+
117+
# Filter existing exports to exclude those that failed to archive
118+
succ_common_diffs = {f: diff for f, diff in common_diffs.items() if f not in fails}
119+
weekly_arch_diff.filter_exports(succ_common_diffs)
120+
121+
# Report failures: someone should probably look at them
122+
for exported_file in fails:
123+
print(f"Failed to archive (weekly) '{exported_file}'")
124+
125+
# Daily run of archiving utility
126+
# - Uploads changed files to S3
127+
# - Does not export any issues into receiving
128+
129+
# Diff exports, and make incremental versions
130+
_, common_diffs, new_files = daily_arch_diff.diff_exports()
131+
132+
# Archive changed and new files only
133+
to_archive = [f for f, diff in common_diffs.items() if diff is not None]
134+
to_archive += new_files
135+
_, fails = daily_arch_diff.archive_exports(to_archive)
136+
137+
# Daily output not needed anymore, remove them
138+
for exported_file in new_files:
139+
remove(exported_file)
140+
for exported_file, diff_file in common_diffs.items():
141+
remove(exported_file)
142+
remove(diff_file)
143+
144+
# Report failures: someone should probably look at them
145+
for exported_file in fails:
146+
print(f"Failed to archive (daily) '{exported_file}'")

nchs_mortality/params.json.template

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,13 @@
33
"static_file_dir": "./static",
44
"export_dir": "./receiving",
55
"cache_dir": "./cache",
6-
"token": ""
6+
"daily_export_dir": "./daily_receiving",
7+
"daily_cache_dir": "./daily_cache",
8+
"token": "",
9+
"mode":"",
10+
"aws_credentials": {
11+
"aws_access_key_id": "",
12+
"aws_secret_access_key": ""
13+
},
14+
"bucket_name": ""
715
}

nchs_mortality/setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
"pylint",
1010
"delphi-utils",
1111
"sodapy",
12-
"epiweeks"
12+
"epiweeks",
13+
"freezegun",
1314
]
1415

1516
setup(

nchs_mortality/tests/cache/.gitignore

Whitespace-only changes.

nchs_mortality/tests/conftest.py

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,42 @@
11
# -*- coding: utf-8 -*-
22

3+
from boto3 import Session
4+
from freezegun import freeze_time
5+
from moto import mock_s3
36
import pytest
47

58
from os import listdir, remove
69
from os.path import join
710

11+
from delphi_utils import read_params
812
from delphi_nchs_mortality.run import run_module
913

1014

11-
@pytest.fixture(scope="session")
12-
def run_as_module():
13-
# Clean receiving directory
15+
@pytest.fixture(scope="function")
16+
def run_as_module(date):
17+
# Clean directories
1418
for fname in listdir("receiving"):
1519
if ".csv" in fname:
1620
remove(join("receiving", fname))
1721

18-
run_module()
22+
for fname in listdir("cache"):
23+
if ".csv" in fname:
24+
remove(join("cache", fname))
25+
26+
for fname in listdir("daily_cache"):
27+
if ".csv" in fname:
28+
remove(join("daily_cache", fname))
29+
30+
for fname in listdir("daily_receiving"):
31+
if ".csv" in fname:
32+
remove(join("daily_receiving", fname))
33+
34+
with mock_s3():
35+
with freeze_time(date):
36+
# Create the fake bucket we will be using
37+
params = read_params()
38+
aws_credentials = params["aws_credentials"]
39+
s3_client = Session(**aws_credentials).client("s3")
40+
s3_client.create_bucket(Bucket=params["bucket_name"])
41+
42+
run_module()
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
*.csv
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
*.csv

nchs_mortality/tests/params.json.template

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,13 @@
33
"static_file_dir": "../static",
44
"export_dir": "./receiving",
55
"cache_dir": "./cache",
6-
"token": ""
6+
"daily_export_dir": "./daily_receiving",
7+
"daily_cache_dir": "./daily_cache",
8+
"token": "",
9+
"mode":"test_data.csv",
10+
"aws_credentials": {
11+
"aws_access_key_id": "FAKE_TEST_ACCESS_KEY_ID",
12+
"aws_secret_access_key": "FAKE_TEST_SECRET_ACCESS_KEY"
13+
},
14+
"bucket_name": "test-bucket"
715
}

0 commit comments

Comments
 (0)