diff --git a/nchs_mortality/delphi_nchs_mortality/pull.py b/nchs_mortality/delphi_nchs_mortality/pull.py index 08358badd..4119ffcb6 100644 --- a/nchs_mortality/delphi_nchs_mortality/pull.py +++ b/nchs_mortality/delphi_nchs_mortality/pull.py @@ -3,6 +3,7 @@ import numpy as np import pandas as pd from sodapy import Socrata +from .constants import METRICS def pull_nchs_mortality_data(token: str, map_df: pd.DataFrame, test_mode: str): """Pull the latest NCHS Mortality data, and conforms it into a dataset. @@ -33,10 +34,7 @@ def pull_nchs_mortality_data(token: str, map_df: pd.DataFrame, test_mode: str): Dataframe as described above. """ # Constants - keep_columns = ['covid_deaths', 'total_deaths', - 'percent_of_expected_deaths', 'pneumonia_deaths', - 'pneumonia_and_covid_deaths', 'influenza_deaths', - 'pneumonia_influenza_or_covid_19_deaths'] + keep_columns = METRICS.copy() type_dict = {key: float for key in keep_columns} type_dict["timestamp"] = 'datetime64[ns]' @@ -64,31 +62,23 @@ def pull_nchs_mortality_data(token: str, map_df: pd.DataFrame, test_mode: str): "schema may have changed. Please investigate and " "amend the code.") from exc + # Drop rows for locations outside US df = df[df["state"] != "United States"] - df.loc[df["state"] == "New York City", "state"] = "New York" + df = df.loc[:, keep_columns + ["timestamp", "state"]].set_index("timestamp") - state_list = df["state"].unique() - date_list = df["timestamp"].unique() - index_df = pd.MultiIndex.from_product( - [state_list, date_list], names=['state', 'timestamp'] - ) - df = df.groupby( - ["state", "timestamp"]).sum().reindex(index_df).reset_index() - - # Final sanity checks - days_by_states = df.groupby("state").count()["covid_deaths"].unique() - unique_days = df["timestamp"].unique() - # each FIPS has same number of rows - if (len(days_by_states) > 1) or (days_by_states[0] != len(unique_days)): - raise ValueError("Differing number of days by fips") - min_timestamp = min(unique_days) - max_timestamp = max(unique_days) - n_days = (max_timestamp - min_timestamp) / np.timedelta64(1, 'D') / 7 + 1 - if n_days != len(unique_days): - raise ValueError( - f"Not every day between {min_timestamp} and " - "{max_timestamp} is represented." - ) + # NCHS considers NYC as an individual state, however, we want it included + # in NY. If values are nan for both NYC and NY, the aggreagtion should + # also have NAN. + df_ny = df.loc[df["state"] == "New York", :].drop("state", axis=1) + df_nyc = df.loc[df["state"] == "New York City", :].drop("state", axis=1) + # Get mask df to ignore cells where both of them have NAN values + mask = (df_ny[keep_columns].isnull().values \ + & df_nyc[keep_columns].isnull().values) + df_ny = df_ny.append(df_nyc).groupby("timestamp").sum().where(~mask, np.nan) + df_ny["state"] = "New York" + # Drop NYC and NY in the full dataset + df = df.loc[~df["state"].isin(["New York", "New York City"]), :] + df = df.append(df_ny).reset_index().sort_values(["state", "timestamp"]) # Add population info keep_columns.extend(["timestamp", "geo_id", "population"]) diff --git a/nchs_mortality/delphi_nchs_mortality/run.py b/nchs_mortality/delphi_nchs_mortality/run.py index ba4851f6d..2ba251dfc 100644 --- a/nchs_mortality/delphi_nchs_mortality/run.py +++ b/nchs_mortality/delphi_nchs_mortality/run.py @@ -17,7 +17,7 @@ from .constants import (METRICS, SENSOR_NAME_MAP, SENSORS, INCIDENCE_BASE, GEO_RES) -def run_module(): # pylint: disable=too-many-branches,too-many-statements +def run_module(): """Run module for processing NCHS mortality data.""" params = read_params() export_start_date = params["export_start_date"] @@ -41,13 +41,15 @@ def run_module(): # pylint: disable=too-many-branches,too-many-statements join(static_file_dir, "state_pop.csv"), dtype={"fips": int} ) - df = pull_nchs_mortality_data(token, map_df, test_mode) + df_pull = pull_nchs_mortality_data(token, map_df, test_mode) for metric in METRICS: if metric == 'percent_of_expected_deaths': print(metric) + df = df_pull.copy() df["val"] = df[metric] df["se"] = np.nan df["sample_size"] = np.nan + df = df[~df["val"].isnull()] sensor_name = "_".join(["wip", SENSOR_NAME_MAP[metric]]) export_csv( df, @@ -59,12 +61,14 @@ def run_module(): # pylint: disable=too-many-branches,too-many-statements else: for sensor in SENSORS: print(metric, sensor) + df = df_pull.copy() if sensor == "num": df["val"] = df[metric] else: df["val"] = df[metric] / df["population"] * INCIDENCE_BASE df["se"] = np.nan df["sample_size"] = np.nan + df = df[~df["val"].isnull()] sensor_name = "_".join(["wip", SENSOR_NAME_MAP[metric], sensor]) export_csv( df, @@ -74,10 +78,10 @@ def run_module(): # pylint: disable=too-many-branches,too-many-statements sensor=sensor_name, ) - # Weekly run of archive utility on Monday - # - Does not upload to S3, that is handled by daily run of archive utility - # - Exports issues into receiving for the API - # Daily run of archiving utility - # - Uploads changed files to S3 - # - Does not export any issues into receiving +# Weekly run of archive utility on Monday +# - Does not upload to S3, that is handled by daily run of archive utility +# - Exports issues into receiving for the API +# Daily run of archiving utility +# - Uploads changed files to S3 +# - Does not export any issues into receiving arch_diffs(params, daily_arch_diff) diff --git a/nchs_mortality/tests/test_pull.py b/nchs_mortality/tests/test_pull.py index f7cf93c3f..a6932f4e9 100644 --- a/nchs_mortality/tests/test_pull.py +++ b/nchs_mortality/tests/test_pull.py @@ -6,6 +6,7 @@ from delphi_utils import read_params from delphi_nchs_mortality.pull import pull_nchs_mortality_data +from delphi_nchs_mortality.constants import METRICS params = read_params() export_start_date = params["export_start_date"] @@ -17,15 +18,38 @@ join(static_file_dir, "state_pop.csv"), dtype={"fips": int} ) -class TestPullUSAFacts: +class TestPullNCHS: def test_good_file(self): df = pull_nchs_mortality_data(token, map_df, "test_data.csv") - + + # Test columns assert (df.columns.values == [ 'covid_deaths', 'total_deaths', 'percent_of_expected_deaths', 'pneumonia_deaths', 'pneumonia_and_covid_deaths', 'influenza_deaths', 'pneumonia_influenza_or_covid_19_deaths', "timestamp", "geo_id", "population"]).all() + + # Test aggregation for NYC and NY + raw_df = pd.read_csv("./test_data/test_data.csv", parse_dates=["timestamp"]) + for metric in METRICS: + ny_list = raw_df.loc[(raw_df["state"] == "New York") + & (raw_df[metric].isnull()), "timestamp"].values + nyc_list = raw_df.loc[(raw_df["state"] == "New York City") + & (raw_df[metric].isnull()), "timestamp"].values + final_list = df.loc[(df["geo_id"] == "ny") + & (df[metric].isnull()), "timestamp"].values + assert set(final_list) == set(ny_list).intersection(set(nyc_list)) + + # Test missing value + for state, geo_id in zip(map_df["state"], map_df["geo_id"]): + if state in set(["New York", "New York City"]): + continue + for metric in METRICS: + test_list = raw_df.loc[(raw_df["state"] == state) + & (raw_df[metric].isnull()), "timestamp"].values + final_list = df.loc[(df["geo_id"] == geo_id) + & (df[metric].isnull()), "timestamp"].values + assert set(final_list) == set(test_list) def test_bad_file_with_inconsistent_time_col(self): with pytest.raises(ValueError): @@ -36,3 +60,6 @@ def test_bad_file_with_inconsistent_time_col(self): with pytest.raises(ValueError): df = pull_nchs_mortality_data(token, map_df, "bad_data_with_missing_cols.csv") + + + diff --git a/nchs_mortality/tests/test_run.py b/nchs_mortality/tests/test_run.py index 389964f3e..52269b5d4 100644 --- a/nchs_mortality/tests/test_run.py +++ b/nchs_mortality/tests/test_run.py @@ -20,17 +20,13 @@ def test_output_files_exist(self, run_as_module, date): csv_files = listdir(output_folder) dates = [ - "202016", - "202017", - "202018", - "202019", - "202020", - "202021", - "202022", - "202023", - "202024", - "202025", - "202026", + "202030", + "202031", + "202032", + "202033", + "202034", + "202035", + "202036", ] metrics = ['deaths_covid_incidence', 'deaths_allcause_incidence',