diff --git a/nchs_mortality/delphi_nchs_mortality/constants.py b/nchs_mortality/delphi_nchs_mortality/constants.py index 800444e58..2bdd78419 100644 --- a/nchs_mortality/delphi_nchs_mortality/constants.py +++ b/nchs_mortality/delphi_nchs_mortality/constants.py @@ -25,8 +25,3 @@ "prop" ] INCIDENCE_BASE = 100000 - -# this is necessary as a delimiter in the f-string expressions we use to -# construct detailed error reports -# (https://www.python.org/dev/peps/pep-0498/#escape-sequences) -NEWLINE = "\n" diff --git a/nchs_mortality/delphi_nchs_mortality/pull.py b/nchs_mortality/delphi_nchs_mortality/pull.py index 18bbfd59a..5a96d9a1f 100644 --- a/nchs_mortality/delphi_nchs_mortality/pull.py +++ b/nchs_mortality/delphi_nchs_mortality/pull.py @@ -9,7 +9,7 @@ from delphi_utils.geomap import GeoMapper -from .constants import METRICS, RENAME, NEWLINE +from .constants import METRICS, RENAME def standardize_columns(df): """Rename columns to comply with a standard set. @@ -90,10 +90,10 @@ def pull_nchs_mortality_data(socrata_token: str, test_file: Optional[str] = None have changed. Please investigate and amend the code. Columns needed: -{NEWLINE.join(type_dict.keys())} +{'\n'.join(type_dict.keys())} Columns available: -{NEWLINE.join(df.columns)} +{'\n'.join(df.columns)} """) from exc df = df[keep_columns + ["timestamp", "state"]].set_index("timestamp") diff --git a/nwss_wastewater/delphi_nwss/constants.py b/nwss_wastewater/delphi_nwss/constants.py index 99af24d5f..b04206d14 100644 --- a/nwss_wastewater/delphi_nwss/constants.py +++ b/nwss_wastewater/delphi_nwss/constants.py @@ -22,18 +22,23 @@ "microbial", ], } -METRIC_DATES = ["date_start", "date_end"] -SAMPLE_SITE_NAMES = { - "wwtp_jurisdiction": "category", - "wwtp_id": int, - "reporting_jurisdiction": "category", - "sample_location": "category", - "county_names": "category", - "county_fips": "category", - "population_served": float, - "sampling_prior": bool, - "sample_location_specify": float, -} SIG_DIGITS = 4 -NEWLINE = "\n" +TYPE_DICT = {key: float for key in SIGNALS} +TYPE_DICT.update({"timestamp": "datetime64[ns]"}) +TYPE_DICT_METRIC = {key: float for key in METRIC_SIGNALS} +TYPE_DICT_METRIC.update({key: "datetime64[ns]" for key in ["date_start", "date_end"]}) +# Sample site names +TYPE_DICT_METRIC.update( + { + "wwtp_jurisdiction": "category", + "wwtp_id": int, + "reporting_jurisdiction": "category", + "sample_location": "category", + "county_names": "category", + "county_fips": "category", + "population_served": float, + "sampling_prior": bool, + "sample_location_specify": float, + } +) diff --git a/nwss_wastewater/delphi_nwss/pull.py b/nwss_wastewater/delphi_nwss/pull.py index 4bc4aba23..8cebaa64a 100644 --- a/nwss_wastewater/delphi_nwss/pull.py +++ b/nwss_wastewater/delphi_nwss/pull.py @@ -9,10 +9,9 @@ SIGNALS, PROVIDER_NORMS, METRIC_SIGNALS, - METRIC_DATES, - SAMPLE_SITE_NAMES, SIG_DIGITS, - NEWLINE, + TYPE_DICT, + TYPE_DICT_METRIC, ) @@ -35,34 +34,29 @@ def sig_digit_round(value, n_digits): return result -def construct_typedicts(): - """Create the type conversion dictionary for both dataframes.""" - # basic type conversion - type_dict = {key: float for key in SIGNALS} - type_dict["timestamp"] = "datetime64[ns]" - # metric type conversion - signals_dict_metric = {key: float for key in METRIC_SIGNALS} - metric_dates_dict = {key: "datetime64[ns]" for key in METRIC_DATES} - type_dict_metric = {**metric_dates_dict, **signals_dict_metric, **SAMPLE_SITE_NAMES} - return type_dict, type_dict_metric - - -def warn_string(df, type_dict): - """Format the warning string.""" - return f""" +def convert_df_type(df, type_dict, logger): + """Convert types and warn if there are unexpected columns.""" + try: + df = df.astype(type_dict) + except KeyError as exc: + newline = "\n" + raise KeyError( + f""" Expected column(s) missed, The dataset schema may have changed. Please investigate and amend the code. -Columns needed: -{NEWLINE.join(sorted(type_dict.keys()))} +expected={newline.join(sorted(type_dict.keys()))} -Columns available: -{NEWLINE.join(sorted(df.columns))} +received={newline.join(sorted(df.columns))} """ + ) from exc + if new_columns := set(df.columns) - set(type_dict.keys()): + logger.info("New columns found in NWSS dataset.", new_columns=new_columns) + return df def reformat(df, df_metric): - """Add columns from df_metric to df, and rename some columns. + """Add columns from df_metric to df, and rename some columns. Specifically the population and METRIC_SIGNAL columns, and renames date_start to timestamp. """ @@ -80,27 +74,16 @@ def reformat(df, df_metric): return df -def drop_unnormalized(df): - """Drop unnormalized. - - mutate `df` to no longer have rows where the normalization scheme isn't actually identified, - as we can't classify the kind of signal - """ - return df[~df["normalization"].isna()] - - def add_identifier_columns(df): """Add identifier columns. Add columns to get more detail than key_plot_id gives; specifically, state, and `provider_normalization`, which gives the signal identifier """ - df["state"] = df.key_plot_id.str.extract( - r"_(\w\w)_" - ) # a pair of alphanumerics surrounded by _ - df["provider"] = df.key_plot_id.str.extract( - r"(.*)_[a-z]{2}_" - ) # anything followed by state ^ + # a pair of alphanumerics surrounded by _ + df["state"] = df.key_plot_id.str.extract(r"_(\w\w)_") + # anything followed by state ^ + df["provider"] = df.key_plot_id.str.extract(r"(.*)_[a-z]{2}_") df["signal_name"] = df.provider + "_" + df.normalization @@ -120,7 +103,7 @@ def check_endpoints(df): ) -def pull_nwss_data(token: str): +def pull_nwss_data(token: str, logger): """Pull the latest NWSS Wastewater data, and conforms it into a dataset. The output dataset has: @@ -141,11 +124,6 @@ def pull_nwss_data(token: str): pd.DataFrame Dataframe as described above. """ - # Constants - keep_columns = [*SIGNALS, *METRIC_SIGNALS] - # concentration key types - type_dict, type_dict_metric = construct_typedicts() - # Pull data from Socrata API client = Socrata("data.cdc.gov", token) results_concentration = client.get("g653-rqe2", limit=10 ** 10) @@ -154,19 +132,14 @@ def pull_nwss_data(token: str): df_concentration = pd.DataFrame.from_records(results_concentration) df_concentration = df_concentration.rename(columns={"date": "timestamp"}) - try: - df_concentration = df_concentration.astype(type_dict) - except KeyError as exc: - raise ValueError(warn_string(df_concentration, type_dict)) from exc + # Schema checks. + df_concentration = convert_df_type(df_concentration, TYPE_DICT, logger) + df_metric = convert_df_type(df_metric, TYPE_DICT_METRIC, logger) - try: - df_metric = df_metric.astype(type_dict_metric) - except KeyError as exc: - raise ValueError(warn_string(df_metric, type_dict_metric)) from exc + # Drop sites without a normalization scheme. + df = df_concentration[~df_concentration["normalization"].isna()] - # if the normalization scheme isn't recorded, why is it even included as a sample site? - df = drop_unnormalized(df_concentration) - # pull 2 letter state labels out of the key_plot_id labels + # Pull 2 letter state labels out of the key_plot_id labels. add_identifier_columns(df) # move population and metric signals over to df @@ -180,13 +153,14 @@ def pull_nwss_data(token: str): # otherwise, best to assume some value rather than break the data) df.population_served = df.population_served.ffill() check_endpoints(df) - keep_columns.extend( - [ - "timestamp", - "state", - "population_served", - "normalization", - "provider", - ] - ) + + keep_columns = [ + *SIGNALS, + *METRIC_SIGNALS, + "timestamp", + "state", + "population_served", + "normalization", + "provider", + ] return df[keep_columns] diff --git a/nwss_wastewater/delphi_nwss/run.py b/nwss_wastewater/delphi_nwss/run.py index 5a21ec49f..3355bb254 100644 --- a/nwss_wastewater/delphi_nwss/run.py +++ b/nwss_wastewater/delphi_nwss/run.py @@ -21,6 +21,7 @@ - "bucket_name: str, name of S3 bucket to read/write - "cache_dir": str, directory of locally cached data """ + import time from datetime import datetime @@ -138,10 +139,10 @@ def run_module(params): run_stats = [] ## build the base version of the signal at the most detailed geo level you can get. ## compute stuff here or farm out to another function or file - df_pull = pull_nwss_data(socrata_token) + df_pull = pull_nwss_data(socrata_token, logger) ## aggregate # iterate over the providers and the normalizations that they specifically provide - for (provider, normalization) in zip( + for provider, normalization in zip( PROVIDER_NORMS["provider"], PROVIDER_NORMS["normalization"] ): # copy by only taking the relevant subsection diff --git a/nwss_wastewater/tests/test_pull.py b/nwss_wastewater/tests/test_pull.py index 5ed60b504..6b7bffa24 100644 --- a/nwss_wastewater/tests/test_pull.py +++ b/nwss_wastewater/tests/test_pull.py @@ -1,22 +1,12 @@ -from datetime import datetime, date -import json -from unittest.mock import patch -import tempfile -import os -import time -from datetime import datetime - import pandas as pd import pandas.api.types as ptypes from delphi_nwss.pull import ( add_identifier_columns, - check_endpoints, - construct_typedicts, sig_digit_round, reformat, - warn_string, ) +from delphi_nwss.constants import TYPE_DICT, TYPE_DICT_METRIC import numpy as np @@ -31,32 +21,10 @@ def test_sig_digit(): ).all() -def test_column_type_dicts(): - type_dict, type_dict_metric = construct_typedicts() - assert type_dict == {"pcr_conc_smoothed": float, "timestamp": "datetime64[ns]"} - assert type_dict_metric == { - "date_start": "datetime64[ns]", - "date_end": "datetime64[ns]", - "detect_prop_15d": float, - "percentile": float, - "ptc_15d": float, - "wwtp_jurisdiction": "category", - "wwtp_id": int, - "reporting_jurisdiction": "category", - "sample_location": "category", - "county_names": "category", - "county_fips": "category", - "population_served": float, - "sampling_prior": bool, - "sample_location_specify": float, - } - - def test_column_conversions_concentration(): - type_dict, type_dict_metric = construct_typedicts() df = pd.read_csv("test_data/conc_data.csv", index_col=0) df = df.rename(columns={"date": "timestamp"}) - converted = df.astype(type_dict) + converted = df.astype(TYPE_DICT) assert all( converted.columns == pd.Index(["key_plot_id", "timestamp", "pcr_conc_smoothed", "normalization"]) @@ -66,9 +34,8 @@ def test_column_conversions_concentration(): def test_column_conversions_metric(): - type_dict, type_dict_metric = construct_typedicts() df = pd.read_csv("test_data/metric_data.csv", index_col=0) - converted = df.astype(type_dict_metric) + converted = df.astype(TYPE_DICT_METRIC) assert all( converted.columns == pd.Index( @@ -113,24 +80,13 @@ def test_column_conversions_metric(): assert all(ptypes.is_numeric_dtype(converted[flo].dtype) for flo in float_typed) -def test_warn_string(): - type_dict, type_dict_metric = construct_typedicts() - df_conc = pd.read_csv("test_data/conc_data.csv") - assert ( - warn_string(df_conc, type_dict) - == "\nExpected column(s) missed, The dataset schema may\nhave changed. Please investigate and amend the code.\n\nColumns needed:\npcr_conc_smoothed\ntimestamp\n\nColumns available:\nUnnamed: 0\ndate\nkey_plot_id\nnormalization\npcr_conc_smoothed\n" - ) - - def test_formatting(): - type_dict, type_dict_metric = construct_typedicts() df_metric = pd.read_csv("test_data/metric_data.csv", index_col=0) - df_metric = df_metric.astype(type_dict_metric) + df_metric = df_metric.astype(TYPE_DICT_METRIC) - type_dict, type_dict_metric = construct_typedicts() df = pd.read_csv("test_data/conc_data.csv", index_col=0) df = df.rename(columns={"date": "timestamp"}) - df = df.astype(type_dict) + df = df.astype(TYPE_DICT) df_formatted = reformat(df, df_metric) diff --git a/nwss_wastewater/tests/test_run.py b/nwss_wastewater/tests/test_run.py index 4270ad87d..161d92556 100644 --- a/nwss_wastewater/tests/test_run.py +++ b/nwss_wastewater/tests/test_run.py @@ -1,17 +1,7 @@ -from datetime import datetime, date -import json -from unittest.mock import patch -import tempfile -import os -import time -from datetime import datetime - import numpy as np import pandas as pd from pandas.testing import assert_frame_equal -from delphi_utils import S3ArchiveDiffer, get_structured_logger, create_export_csv, Nans -from delphi_nwss.constants import GEOS, SIGNALS from delphi_nwss.run import ( add_needed_columns, generate_weights, @@ -23,13 +13,9 @@ def test_sum_all_nan(): """Check that sum_all_nan returns NaN iff everything is a NaN""" - no_nans = np.array([3, 5]) - assert sum_all_nan(no_nans) == 8 - partial_nan = np.array([np.nan, 3, 5]) + assert sum_all_nan(np.array([3, 5])) == 8 assert np.isclose(sum_all_nan([np.nan, 3, 5]), 8) - - oops_all_nans = np.array([np.nan, np.nan]) - assert np.isnan(oops_all_nans).all() + assert np.isnan(np.array([np.nan, np.nan])).all() def test_weight_generation():