Skip to content

Fix nchs missing value #535

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 17 additions & 27 deletions nchs_mortality/delphi_nchs_mortality/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import numpy as np
import pandas as pd
from sodapy import Socrata
from .constants import METRICS

def pull_nchs_mortality_data(token: str, map_df: pd.DataFrame, test_mode: str):
"""Pull the latest NCHS Mortality data, and conforms it into a dataset.
Expand Down Expand Up @@ -33,10 +34,7 @@ def pull_nchs_mortality_data(token: str, map_df: pd.DataFrame, test_mode: str):
Dataframe as described above.
"""
# Constants
keep_columns = ['covid_deaths', 'total_deaths',
'percent_of_expected_deaths', 'pneumonia_deaths',
'pneumonia_and_covid_deaths', 'influenza_deaths',
'pneumonia_influenza_or_covid_19_deaths']
keep_columns = METRICS.copy()
type_dict = {key: float for key in keep_columns}
type_dict["timestamp"] = 'datetime64[ns]'

Expand Down Expand Up @@ -64,31 +62,23 @@ def pull_nchs_mortality_data(token: str, map_df: pd.DataFrame, test_mode: str):
"schema may have changed. Please investigate and "
"amend the code.") from exc

# Drop rows for locations outside US
df = df[df["state"] != "United States"]
df.loc[df["state"] == "New York City", "state"] = "New York"
df = df.loc[:, keep_columns + ["timestamp", "state"]].set_index("timestamp")

state_list = df["state"].unique()
date_list = df["timestamp"].unique()
index_df = pd.MultiIndex.from_product(
[state_list, date_list], names=['state', 'timestamp']
)
df = df.groupby(
["state", "timestamp"]).sum().reindex(index_df).reset_index()
Comment on lines -75 to -76
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this reindex step was used to fill in missing values by state and time and removing it makes sense given the discussion.
I am not super familiar with the actual NCHS data, but were there cases of duplicated (state, timestamp) where the sum aggregation should still be retained?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you are right! Let me fix this. Thanks!


# Final sanity checks
days_by_states = df.groupby("state").count()["covid_deaths"].unique()
unique_days = df["timestamp"].unique()
# each FIPS has same number of rows
if (len(days_by_states) > 1) or (days_by_states[0] != len(unique_days)):
raise ValueError("Differing number of days by fips")
min_timestamp = min(unique_days)
max_timestamp = max(unique_days)
n_days = (max_timestamp - min_timestamp) / np.timedelta64(1, 'D') / 7 + 1
if n_days != len(unique_days):
raise ValueError(
f"Not every day between {min_timestamp} and "
"{max_timestamp} is represented."
)
# NCHS considers NYC as an individual state, however, we want it included
# in NY. If values are nan for both NYC and NY, the aggreagtion should
# also have NAN.
df_ny = df.loc[df["state"] == "New York", :].drop("state", axis=1)
df_nyc = df.loc[df["state"] == "New York City", :].drop("state", axis=1)
# Get mask df to ignore cells where both of them have NAN values
mask = (df_ny[keep_columns].isnull().values \
& df_nyc[keep_columns].isnull().values)
df_ny = df_ny.append(df_nyc).groupby("timestamp").sum().where(~mask, np.nan)
df_ny["state"] = "New York"
# Drop NYC and NY in the full dataset
df = df.loc[~df["state"].isin(["New York", "New York City"]), :]
df = df.append(df_ny).reset_index().sort_values(["state", "timestamp"])

# Add population info
keep_columns.extend(["timestamp", "geo_id", "population"])
Expand Down
20 changes: 12 additions & 8 deletions nchs_mortality/delphi_nchs_mortality/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from .constants import (METRICS, SENSOR_NAME_MAP,
SENSORS, INCIDENCE_BASE, GEO_RES)

def run_module(): # pylint: disable=too-many-branches,too-many-statements
def run_module():
"""Run module for processing NCHS mortality data."""
params = read_params()
export_start_date = params["export_start_date"]
Expand All @@ -41,13 +41,15 @@ def run_module(): # pylint: disable=too-many-branches,too-many-statements
join(static_file_dir, "state_pop.csv"), dtype={"fips": int}
)

df = pull_nchs_mortality_data(token, map_df, test_mode)
df_pull = pull_nchs_mortality_data(token, map_df, test_mode)
for metric in METRICS:
if metric == 'percent_of_expected_deaths':
print(metric)
df = df_pull.copy()
df["val"] = df[metric]
df["se"] = np.nan
df["sample_size"] = np.nan
df = df[~df["val"].isnull()]
sensor_name = "_".join(["wip", SENSOR_NAME_MAP[metric]])
export_csv(
df,
Expand All @@ -59,12 +61,14 @@ def run_module(): # pylint: disable=too-many-branches,too-many-statements
else:
for sensor in SENSORS:
print(metric, sensor)
df = df_pull.copy()
if sensor == "num":
df["val"] = df[metric]
else:
df["val"] = df[metric] / df["population"] * INCIDENCE_BASE
df["se"] = np.nan
df["sample_size"] = np.nan
df = df[~df["val"].isnull()]
sensor_name = "_".join(["wip", SENSOR_NAME_MAP[metric], sensor])
export_csv(
df,
Expand All @@ -74,10 +78,10 @@ def run_module(): # pylint: disable=too-many-branches,too-many-statements
sensor=sensor_name,
)

# Weekly run of archive utility on Monday
# - Does not upload to S3, that is handled by daily run of archive utility
# - Exports issues into receiving for the API
# Daily run of archiving utility
# - Uploads changed files to S3
# - Does not export any issues into receiving
# Weekly run of archive utility on Monday
# - Does not upload to S3, that is handled by daily run of archive utility
# - Exports issues into receiving for the API
# Daily run of archiving utility
# - Uploads changed files to S3
# - Does not export any issues into receiving
arch_diffs(params, daily_arch_diff)
31 changes: 29 additions & 2 deletions nchs_mortality/tests/test_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from delphi_utils import read_params

from delphi_nchs_mortality.pull import pull_nchs_mortality_data
from delphi_nchs_mortality.constants import METRICS

params = read_params()
export_start_date = params["export_start_date"]
Expand All @@ -17,15 +18,38 @@
join(static_file_dir, "state_pop.csv"), dtype={"fips": int}
)

class TestPullUSAFacts:
class TestPullNCHS:
def test_good_file(self):
df = pull_nchs_mortality_data(token, map_df, "test_data.csv")


# Test columns
assert (df.columns.values == [
'covid_deaths', 'total_deaths', 'percent_of_expected_deaths',
'pneumonia_deaths', 'pneumonia_and_covid_deaths',
'influenza_deaths', 'pneumonia_influenza_or_covid_19_deaths',
"timestamp", "geo_id", "population"]).all()

# Test aggregation for NYC and NY
raw_df = pd.read_csv("./test_data/test_data.csv", parse_dates=["timestamp"])
for metric in METRICS:
ny_list = raw_df.loc[(raw_df["state"] == "New York")
& (raw_df[metric].isnull()), "timestamp"].values
nyc_list = raw_df.loc[(raw_df["state"] == "New York City")
& (raw_df[metric].isnull()), "timestamp"].values
final_list = df.loc[(df["geo_id"] == "ny")
& (df[metric].isnull()), "timestamp"].values
assert set(final_list) == set(ny_list).intersection(set(nyc_list))

# Test missing value
for state, geo_id in zip(map_df["state"], map_df["geo_id"]):
if state in set(["New York", "New York City"]):
continue
for metric in METRICS:
test_list = raw_df.loc[(raw_df["state"] == state)
& (raw_df[metric].isnull()), "timestamp"].values
final_list = df.loc[(df["geo_id"] == geo_id)
& (df[metric].isnull()), "timestamp"].values
assert set(final_list) == set(test_list)

def test_bad_file_with_inconsistent_time_col(self):
with pytest.raises(ValueError):
Expand All @@ -36,3 +60,6 @@ def test_bad_file_with_inconsistent_time_col(self):
with pytest.raises(ValueError):
df = pull_nchs_mortality_data(token, map_df,
"bad_data_with_missing_cols.csv")



18 changes: 7 additions & 11 deletions nchs_mortality/tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,13 @@ def test_output_files_exist(self, run_as_module, date):
csv_files = listdir(output_folder)

dates = [
"202016",
"202017",
"202018",
"202019",
"202020",
"202021",
"202022",
"202023",
"202024",
"202025",
"202026",
"202030",
"202031",
"202032",
"202033",
"202034",
"202035",
"202036",
]
metrics = ['deaths_covid_incidence',
'deaths_allcause_incidence',
Expand Down