From d7933c12f477adf7c798d0111fb89eee05eb48fa Mon Sep 17 00:00:00 2001 From: Jingjing Tang Date: Tue, 17 Nov 2020 00:50:25 -0500 Subject: [PATCH 1/9] do not report missing values --- nchs_mortality/delphi_nchs_mortality/pull.py | 24 -------------------- nchs_mortality/delphi_nchs_mortality/run.py | 16 +++++++------ 2 files changed, 9 insertions(+), 31 deletions(-) diff --git a/nchs_mortality/delphi_nchs_mortality/pull.py b/nchs_mortality/delphi_nchs_mortality/pull.py index 08358badd..c3949f4b4 100644 --- a/nchs_mortality/delphi_nchs_mortality/pull.py +++ b/nchs_mortality/delphi_nchs_mortality/pull.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- """Functions for pulling NCHS mortality data API.""" -import numpy as np import pandas as pd from sodapy import Socrata @@ -67,29 +66,6 @@ def pull_nchs_mortality_data(token: str, map_df: pd.DataFrame, test_mode: str): df = df[df["state"] != "United States"] df.loc[df["state"] == "New York City", "state"] = "New York" - state_list = df["state"].unique() - date_list = df["timestamp"].unique() - index_df = pd.MultiIndex.from_product( - [state_list, date_list], names=['state', 'timestamp'] - ) - df = df.groupby( - ["state", "timestamp"]).sum().reindex(index_df).reset_index() - - # Final sanity checks - days_by_states = df.groupby("state").count()["covid_deaths"].unique() - unique_days = df["timestamp"].unique() - # each FIPS has same number of rows - if (len(days_by_states) > 1) or (days_by_states[0] != len(unique_days)): - raise ValueError("Differing number of days by fips") - min_timestamp = min(unique_days) - max_timestamp = max(unique_days) - n_days = (max_timestamp - min_timestamp) / np.timedelta64(1, 'D') / 7 + 1 - if n_days != len(unique_days): - raise ValueError( - f"Not every day between {min_timestamp} and " - "{max_timestamp} is represented." - ) - # Add population info keep_columns.extend(["timestamp", "geo_id", "population"]) df = df.merge(map_df, on="state")[keep_columns] diff --git a/nchs_mortality/delphi_nchs_mortality/run.py b/nchs_mortality/delphi_nchs_mortality/run.py index ba4851f6d..fec91a641 100644 --- a/nchs_mortality/delphi_nchs_mortality/run.py +++ b/nchs_mortality/delphi_nchs_mortality/run.py @@ -17,7 +17,7 @@ from .constants import (METRICS, SENSOR_NAME_MAP, SENSORS, INCIDENCE_BASE, GEO_RES) -def run_module(): # pylint: disable=too-many-branches,too-many-statements +def run_module(): """Run module for processing NCHS mortality data.""" params = read_params() export_start_date = params["export_start_date"] @@ -48,6 +48,7 @@ def run_module(): # pylint: disable=too-many-branches,too-many-statements df["val"] = df[metric] df["se"] = np.nan df["sample_size"] = np.nan + df = df[~df["val"].isnull()] sensor_name = "_".join(["wip", SENSOR_NAME_MAP[metric]]) export_csv( df, @@ -65,6 +66,7 @@ def run_module(): # pylint: disable=too-many-branches,too-many-statements df["val"] = df[metric] / df["population"] * INCIDENCE_BASE df["se"] = np.nan df["sample_size"] = np.nan + df = df[~df["val"].isnull()] sensor_name = "_".join(["wip", SENSOR_NAME_MAP[metric], sensor]) export_csv( df, @@ -74,10 +76,10 @@ def run_module(): # pylint: disable=too-many-branches,too-many-statements sensor=sensor_name, ) - # Weekly run of archive utility on Monday - # - Does not upload to S3, that is handled by daily run of archive utility - # - Exports issues into receiving for the API - # Daily run of archiving utility - # - Uploads changed files to S3 - # - Does not export any issues into receiving +# Weekly run of archive utility on Monday +# - Does not upload to S3, that is handled by daily run of archive utility +# - Exports issues into receiving for the API +# Daily run of archiving utility +# - Uploads changed files to S3 +# - Does not export any issues into receiving arch_diffs(params, daily_arch_diff) From e35d71436e87b0b370d9996540cf7c700fc6abe0 Mon Sep 17 00:00:00 2001 From: Jingjing Tang Date: Tue, 17 Nov 2020 00:50:43 -0500 Subject: [PATCH 2/9] update unit tests --- nchs_mortality/tests/test_run.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/nchs_mortality/tests/test_run.py b/nchs_mortality/tests/test_run.py index 389964f3e..52269b5d4 100644 --- a/nchs_mortality/tests/test_run.py +++ b/nchs_mortality/tests/test_run.py @@ -20,17 +20,13 @@ def test_output_files_exist(self, run_as_module, date): csv_files = listdir(output_folder) dates = [ - "202016", - "202017", - "202018", - "202019", - "202020", - "202021", - "202022", - "202023", - "202024", - "202025", - "202026", + "202030", + "202031", + "202032", + "202033", + "202034", + "202035", + "202036", ] metrics = ['deaths_covid_incidence', 'deaths_allcause_incidence', From 0bf41a2d0a7303f5623c46abd5b3e22ce51987e0 Mon Sep 17 00:00:00 2001 From: Jingjing Tang Date: Tue, 17 Nov 2020 23:57:00 -0500 Subject: [PATCH 3/9] Reconsider missing value in NY --- nchs_mortality/delphi_nchs_mortality/pull.py | 23 ++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/nchs_mortality/delphi_nchs_mortality/pull.py b/nchs_mortality/delphi_nchs_mortality/pull.py index c3949f4b4..2695745eb 100644 --- a/nchs_mortality/delphi_nchs_mortality/pull.py +++ b/nchs_mortality/delphi_nchs_mortality/pull.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- """Functions for pulling NCHS mortality data API.""" +import numpy as np import pandas as pd from sodapy import Socrata @@ -62,12 +63,26 @@ def pull_nchs_mortality_data(token: str, map_df: pd.DataFrame, test_mode: str): raise ValueError("Expected column(s) missed, The dataset " "schema may have changed. Please investigate and " "amend the code.") from exc - + + # Drop rows for locations outside US df = df[df["state"] != "United States"] - df.loc[df["state"] == "New York City", "state"] = "New York" + df = df.loc[:, keep_columns + ["timestamp", "state"]].set_index("timestamp") + + # NCHS considers NYC as an individual state, however, we want it included + # in NY. If values are nan for both NYC and NY, the aggreagtion should + # also have NAN. + df_ny = df.loc[df["state"] == "New York", :].drop("state", axis=1) + df_nyc = df.loc[df["state"] == "New York City", :].drop("state", axis=1) + # Get mask df to ignore cells where both of them have NAN values + mask = (df_ny[keep_columns].isnull().values \ + & df_nyc[keep_columns].isnull().values) + df_ny = df_ny.append(df_nyc).groupby("timestamp").sum().where(~mask, np.nan) + df_ny["state"] = "New York" + # Drop NYC and NY in the full dataset + df = df.loc[~df["state"].isin(["New York", "New York City"]), :] + df = df.append(df_ny).reset_index().sort_values(["state", "timestamp"]) # Add population info - keep_columns.extend(["timestamp", "geo_id", "population"]) - df = df.merge(map_df, on="state")[keep_columns] + df = df.merge(map_df, on="state")[keep_columns + ["geo_id", "population"]] return df From b09c6d6ac15e5e4a75b01476a86b00f70639ca2d Mon Sep 17 00:00:00 2001 From: Jingjing Tang Date: Tue, 17 Nov 2020 23:58:52 -0500 Subject: [PATCH 4/9] delete Trailing whitespacs --- nchs_mortality/delphi_nchs_mortality/pull.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nchs_mortality/delphi_nchs_mortality/pull.py b/nchs_mortality/delphi_nchs_mortality/pull.py index 2695745eb..b227ec19c 100644 --- a/nchs_mortality/delphi_nchs_mortality/pull.py +++ b/nchs_mortality/delphi_nchs_mortality/pull.py @@ -63,16 +63,16 @@ def pull_nchs_mortality_data(token: str, map_df: pd.DataFrame, test_mode: str): raise ValueError("Expected column(s) missed, The dataset " "schema may have changed. Please investigate and " "amend the code.") from exc - + # Drop rows for locations outside US df = df[df["state"] != "United States"] df = df.loc[:, keep_columns + ["timestamp", "state"]].set_index("timestamp") - + # NCHS considers NYC as an individual state, however, we want it included # in NY. If values are nan for both NYC and NY, the aggreagtion should # also have NAN. df_ny = df.loc[df["state"] == "New York", :].drop("state", axis=1) - df_nyc = df.loc[df["state"] == "New York City", :].drop("state", axis=1) + df_nyc = df.loc[df["state"] == "New York City", :].drop("state", axis=1) # Get mask df to ignore cells where both of them have NAN values mask = (df_ny[keep_columns].isnull().values \ & df_nyc[keep_columns].isnull().values) From 568d165ff502add75c606b56564199a8d12fa00a Mon Sep 17 00:00:00 2001 From: Jingjing Tang Date: Wed, 18 Nov 2020 00:02:18 -0500 Subject: [PATCH 5/9] fix errors --- nchs_mortality/delphi_nchs_mortality/pull.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nchs_mortality/delphi_nchs_mortality/pull.py b/nchs_mortality/delphi_nchs_mortality/pull.py index b227ec19c..6a9ceb60c 100644 --- a/nchs_mortality/delphi_nchs_mortality/pull.py +++ b/nchs_mortality/delphi_nchs_mortality/pull.py @@ -83,6 +83,7 @@ def pull_nchs_mortality_data(token: str, map_df: pd.DataFrame, test_mode: str): df = df.append(df_ny).reset_index().sort_values(["state", "timestamp"]) # Add population info - df = df.merge(map_df, on="state")[keep_columns + ["geo_id", "population"]] + keep_columns.extend(["timestamp", "geo_id", "population"]) + df = df.merge(map_df, on="state")[keep_columns] return df From e98a224f9a971873e77a3f613e6c56e7d80a29e4 Mon Sep 17 00:00:00 2001 From: Jingjing Tang Date: Wed, 18 Nov 2020 11:32:26 -0500 Subject: [PATCH 6/9] fixed an error --- nchs_mortality/delphi_nchs_mortality/run.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nchs_mortality/delphi_nchs_mortality/run.py b/nchs_mortality/delphi_nchs_mortality/run.py index fec91a641..2ba251dfc 100644 --- a/nchs_mortality/delphi_nchs_mortality/run.py +++ b/nchs_mortality/delphi_nchs_mortality/run.py @@ -41,10 +41,11 @@ def run_module(): join(static_file_dir, "state_pop.csv"), dtype={"fips": int} ) - df = pull_nchs_mortality_data(token, map_df, test_mode) + df_pull = pull_nchs_mortality_data(token, map_df, test_mode) for metric in METRICS: if metric == 'percent_of_expected_deaths': print(metric) + df = df_pull.copy() df["val"] = df[metric] df["se"] = np.nan df["sample_size"] = np.nan @@ -60,6 +61,7 @@ def run_module(): else: for sensor in SENSORS: print(metric, sensor) + df = df_pull.copy() if sensor == "num": df["val"] = df[metric] else: From 05e776c0edb6a2349bb8cc42a18726780002ca54 Mon Sep 17 00:00:00 2001 From: Jingjing Tang Date: Wed, 18 Nov 2020 12:07:57 -0500 Subject: [PATCH 7/9] use constants --- nchs_mortality/delphi_nchs_mortality/pull.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/nchs_mortality/delphi_nchs_mortality/pull.py b/nchs_mortality/delphi_nchs_mortality/pull.py index 6a9ceb60c..4119ffcb6 100644 --- a/nchs_mortality/delphi_nchs_mortality/pull.py +++ b/nchs_mortality/delphi_nchs_mortality/pull.py @@ -3,6 +3,7 @@ import numpy as np import pandas as pd from sodapy import Socrata +from .constants import METRICS def pull_nchs_mortality_data(token: str, map_df: pd.DataFrame, test_mode: str): """Pull the latest NCHS Mortality data, and conforms it into a dataset. @@ -33,10 +34,7 @@ def pull_nchs_mortality_data(token: str, map_df: pd.DataFrame, test_mode: str): Dataframe as described above. """ # Constants - keep_columns = ['covid_deaths', 'total_deaths', - 'percent_of_expected_deaths', 'pneumonia_deaths', - 'pneumonia_and_covid_deaths', 'influenza_deaths', - 'pneumonia_influenza_or_covid_19_deaths'] + keep_columns = METRICS.copy() type_dict = {key: float for key in keep_columns} type_dict["timestamp"] = 'datetime64[ns]' From 736d690c7eaabd861be395df108a0229f1979439 Mon Sep 17 00:00:00 2001 From: Jingjing Tang Date: Wed, 18 Nov 2020 12:08:21 -0500 Subject: [PATCH 8/9] add tests for missing values --- nchs_mortality/tests/test_pull.py | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/nchs_mortality/tests/test_pull.py b/nchs_mortality/tests/test_pull.py index f7cf93c3f..10870f225 100644 --- a/nchs_mortality/tests/test_pull.py +++ b/nchs_mortality/tests/test_pull.py @@ -6,6 +6,7 @@ from delphi_utils import read_params from delphi_nchs_mortality.pull import pull_nchs_mortality_data +from delphi_nchs_mortality.constants import METRICS params = read_params() export_start_date = params["export_start_date"] @@ -20,12 +21,35 @@ class TestPullUSAFacts: def test_good_file(self): df = pull_nchs_mortality_data(token, map_df, "test_data.csv") - + + # Test columns assert (df.columns.values == [ 'covid_deaths', 'total_deaths', 'percent_of_expected_deaths', 'pneumonia_deaths', 'pneumonia_and_covid_deaths', 'influenza_deaths', 'pneumonia_influenza_or_covid_19_deaths', "timestamp", "geo_id", "population"]).all() + + # Test aggregation for NYC and NY + raw_df = pd.read_csv("./test_data/test_data.csv", parse_dates=["timestamp"]) + for metric in METRICS: + ny_list = raw_df.loc[(raw_df["state"] == "New York") + & (raw_df[metric].isnull()), "timestamp"].values + nyc_list = raw_df.loc[(raw_df["state"] == "New York City") + & (raw_df[metric].isnull()), "timestamp"].values + final_list = df.loc[(df["geo_id"] == "ny") + & (df[metric].isnull()), "timestamp"].values + assert set(final_list) == set(ny_list).intersection(set(nyc_list)) + + # Test missing value + for state, geo_id in zip(map_df["state"], map_df["geo_id"]): + if state in set(["New York", "New York City"]): + continue + for metric in METRICS: + test_list = raw_df.loc[(raw_df["state"] == state) + & (raw_df[metric].isnull()), "timestamp"].values + final_list = df.loc[(df["geo_id"] == geo_id) + & (df[metric].isnull()), "timestamp"].values + assert set(final_list) == set(test_list) def test_bad_file_with_inconsistent_time_col(self): with pytest.raises(ValueError): @@ -36,3 +60,6 @@ def test_bad_file_with_inconsistent_time_col(self): with pytest.raises(ValueError): df = pull_nchs_mortality_data(token, map_df, "bad_data_with_missing_cols.csv") + + + From c9814c604cbe068698e77eeb4f8495028f0e2f89 Mon Sep 17 00:00:00 2001 From: Jingjing Tang Date: Wed, 18 Nov 2020 12:27:28 -0500 Subject: [PATCH 9/9] change the class name to nchs related --- nchs_mortality/tests/test_pull.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nchs_mortality/tests/test_pull.py b/nchs_mortality/tests/test_pull.py index 10870f225..a6932f4e9 100644 --- a/nchs_mortality/tests/test_pull.py +++ b/nchs_mortality/tests/test_pull.py @@ -18,7 +18,7 @@ join(static_file_dir, "state_pop.csv"), dtype={"fips": int} ) -class TestPullUSAFacts: +class TestPullNCHS: def test_good_file(self): df = pull_nchs_mortality_data(token, map_df, "test_data.csv")