Skip to content

first draft of splitting NWSS signals #1946

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions _delphi_utils_python/DEVELOP.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,5 @@ When you are finished, the virtual environment can be deactivated and
deactivate
rm -r env
```
## Releasing the module
If you have made enough changes that it warrants updating [the PyPi project](https://pypi.org/project/delphi-utils/), currently this is done as part of merging from `main` to `prod`.
5 changes: 0 additions & 5 deletions nchs_mortality/delphi_nchs_mortality/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,3 @@
"prop"
]
INCIDENCE_BASE = 100000

# this is necessary as a delimiter in the f-string expressions we use to
# construct detailed error reports
# (https://www.python.org/dev/peps/pep-0498/#escape-sequences)
NEWLINE = "\n"
17 changes: 8 additions & 9 deletions nchs_mortality/delphi_nchs_mortality/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@

import numpy as np
import pandas as pd
from delphi_utils.geomap import GeoMapper
from sodapy import Socrata

from delphi_utils.geomap import GeoMapper
from .constants import METRICS, RENAME

from .constants import METRICS, RENAME, NEWLINE

def standardize_columns(df):
"""Rename columns to comply with a standard set.
Expand Down Expand Up @@ -85,16 +85,15 @@ def pull_nchs_mortality_data(socrata_token: str, test_file: Optional[str] = None
try:
df = df.astype(type_dict)
except KeyError as exc:
raise ValueError(f"""
raise ValueError(
f"""
Expected column(s) missed, The dataset schema may
have changed. Please investigate and amend the code.

Columns needed:
{NEWLINE.join(type_dict.keys())}

Columns available:
{NEWLINE.join(df.columns)}
""") from exc
expected={''.join(type_dict.keys())}
received={''.join(df.columns)}
"""
) from exc

df = df[keep_columns + ["timestamp", "state"]].set_index("timestamp")

Expand Down
41 changes: 28 additions & 13 deletions nwss_wastewater/delphi_nwss/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,33 @@

SIGNALS = ["pcr_conc_smoothed"]
METRIC_SIGNALS = ["detect_prop_15d", "percentile", "ptc_15d"]
METRIC_DATES = ["date_start", "date_end"]
SAMPLE_SITE_NAMES = {
"wwtp_jurisdiction": "category",
"wwtp_id": int,
"reporting_jurisdiction": "category",
"sample_location": "category",
"county_names": "category",
"county_fips": "category",
"population_served": float,
"sampling_prior": bool,
"sample_location_specify": float,
PROVIDER_NORMS = {
"provider": ["CDC_VERILY", "CDC_VERILY", "NWSS", "NWSS", "WWS"],
"normalization": [
"flow-population",
"microbial",
"flow-population",
"microbial",
"microbial",
],
}
SIG_DIGITS = 7
SIG_DIGITS = 4

NEWLINE = "\n"
TYPE_DICT = {key: float for key in SIGNALS}
TYPE_DICT.update({"timestamp": "datetime64[ns]"})
TYPE_DICT_METRIC = {key: float for key in METRIC_SIGNALS}
TYPE_DICT_METRIC.update({key: "datetime64[ns]" for key in ["date_start", "date_end"]})
# Sample site names
TYPE_DICT_METRIC.update(
{
"wwtp_jurisdiction": "category",
"wwtp_id": int,
"reporting_jurisdiction": "category",
"sample_location": "category",
"county_names": "category",
"county_fips": "category",
"population_served": float,
"sampling_prior": bool,
"sample_location_specify": float,
}
)
133 changes: 76 additions & 57 deletions nwss_wastewater/delphi_nwss/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,7 @@
import pandas as pd
from sodapy import Socrata

from .constants import (
SIGNALS,
METRIC_SIGNALS,
METRIC_DATES,
SAMPLE_SITE_NAMES,
SIG_DIGITS,
NEWLINE,
)
from .constants import METRIC_SIGNALS, PROVIDER_NORMS, SIG_DIGITS, SIGNALS, TYPE_DICT, TYPE_DICT_METRIC


def sig_digit_round(value, n_digits):
Expand All @@ -34,47 +27,70 @@ def sig_digit_round(value, n_digits):
return result


def construct_typedicts():
"""Create the type conversion dictionary for both dataframes."""
# basic type conversion
type_dict = {key: float for key in SIGNALS}
type_dict["timestamp"] = "datetime64[ns]"
# metric type conversion
signals_dict_metric = {key: float for key in METRIC_SIGNALS}
metric_dates_dict = {key: "datetime64[ns]" for key in METRIC_DATES}
type_dict_metric = {**metric_dates_dict, **signals_dict_metric, **SAMPLE_SITE_NAMES}
return type_dict, type_dict_metric


def warn_string(df, type_dict):
"""Format the warning string."""
return f"""
def convert_df_type(df, type_dict, logger):
"""Convert types and warn if there are unexpected columns."""
try:
df = df.astype(type_dict)
except KeyError as exc:
raise KeyError(
f"""
Expected column(s) missed, The dataset schema may
have changed. Please investigate and amend the code.

Columns needed:
{NEWLINE.join(sorted(type_dict.keys()))}

Columns available:
{NEWLINE.join(sorted(df.columns))}
expected={''.join(sorted(type_dict.keys()))}
received={''.join(sorted(df.columns))}
"""
) from exc
if new_columns := set(df.columns) - set(type_dict.keys()):
logger.info("New columns found in NWSS dataset.", new_columns=new_columns)
return df


def reformat(df, df_metric):
"""Add columns from df_metric to df, and rename some columns.

def add_population(df, df_metric):
"""Add the population column from df_metric to df, and rename some columns."""
Specifically the population and METRIC_SIGNAL columns, and renames date_start to timestamp.
"""
# drop unused columns from df_metric
df_population = df_metric.loc[:, ["key_plot_id", "date_start", "population_served"]]
df_metric_core = df_metric.loc[:, ["key_plot_id", "date_end", "population_served", *METRIC_SIGNALS]]
# get matching keys
df_population = df_population.rename(columns={"date_start": "timestamp"})
df_population = df_population.set_index(["key_plot_id", "timestamp"])
df_metric_core = df_metric_core.rename(columns={"date_end": "timestamp"})
df_metric_core = df_metric_core.set_index(["key_plot_id", "timestamp"])
df = df.set_index(["key_plot_id", "timestamp"])

df = df.join(df_population)
df = df.join(df_metric_core)
df = df.reset_index()
return df


def pull_nwss_data(socrata_token: str):
def add_identifier_columns(df):
"""Add identifier columns.

Add columns to get more detail than key_plot_id gives;
specifically, state, and `provider_normalization`, which gives the signal identifier
"""
# a pair of alphanumerics surrounded by _
df["state"] = df.key_plot_id.str.extract(r"_(\w\w)_")
# anything followed by state ^
df["provider"] = df.key_plot_id.str.extract(r"(.*)_[a-z]{2}_")
df["signal_name"] = df.provider + "_" + df.normalization


def check_endpoints(df):
"""Make sure that there aren't any new signals that we need to add."""
# compare with existing column name checker
# also add a note about handling errors
unique_provider_norms = (
df[["provider", "normalization"]]
.drop_duplicates()
.sort_values(["provider", "normalization"])
.reset_index(drop=True)
)
if not unique_provider_norms.equals(pd.DataFrame(PROVIDER_NORMS)):
raise ValueError(f"There are new providers and/or norms. They are\n{unique_provider_norms}")


def pull_nwss_data(token: str, logger):
"""Pull the latest NWSS Wastewater data, and conforms it into a dataset.

The output dataset has:
Expand All @@ -95,40 +111,43 @@ def pull_nwss_data(socrata_token: str):
pd.DataFrame
Dataframe as described above.
"""
# concentration key types
type_dict, type_dict_metric = construct_typedicts()

# Pull data from Socrata API
client = Socrata("data.cdc.gov", socrata_token)
client = Socrata("data.cdc.gov", token)
results_concentration = client.get("g653-rqe2", limit=10**10)
results_metric = client.get("2ew6-ywp6", limit=10**10)
df_metric = pd.DataFrame.from_records(results_metric)
df_concentration = pd.DataFrame.from_records(results_concentration)
df_concentration = df_concentration.rename(columns={"date": "timestamp"})

try:
df_concentration = df_concentration.astype(type_dict)
except KeyError as exc:
raise ValueError(warn_string(df_concentration, type_dict)) from exc
# Schema checks.
df_concentration = convert_df_type(df_concentration, TYPE_DICT, logger)
df_metric = convert_df_type(df_metric, TYPE_DICT_METRIC, logger)

try:
df_metric = df_metric.astype(type_dict_metric)
except KeyError as exc:
raise ValueError(warn_string(df_metric, type_dict_metric)) from exc
# Drop sites without a normalization scheme.
df = df_concentration[~df_concentration["normalization"].isna()]

# pull 2 letter state labels out of the key_plot_id labels
df_concentration["state"] = df_concentration.key_plot_id.str.extract(r"_(\w\w)_")
# Pull 2 letter state labels out of the key_plot_id labels.
add_identifier_columns(df)

# move population and metric signals over to df
df = reformat(df, df_metric)
# round out some of the numeric noise that comes from smoothing
df_concentration[SIGNALS[0]] = sig_digit_round(
df_concentration[SIGNALS[0]], SIG_DIGITS
)
for signal in [*SIGNALS, *METRIC_SIGNALS]:
df[signal] = sig_digit_round(df[signal], SIG_DIGITS)

df_concentration = add_population(df_concentration, df_metric)
# if there are population NA's, assume the previous value is accurate (most
# likely introduced by dates only present in one and not the other; even
# otherwise, best to assume some value rather than break the data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: RE "dates only present in one and not the other", is this referring to the df and df_metric datasets? Wouldn't we expect both of them to have population data? Confused about this comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is; only one of them actually has population data listed (the metric data vs the concentration). There's actually a ton of signal metadata present in the metric data not in the concentration data.

df_concentration.population_served = df_concentration.population_served.ffill()

keep_columns = ["timestamp", "state", "population_served"]
return df_concentration[SIGNALS + keep_columns]
df.population_served = df.population_served.ffill()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: ffill uses the value from the previous row, which may or may not be from the same geo. We can't guarantee order unless we sort explicitly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch; I'm adding a sort_index to the reformat function to address this (p sure the index is the right order).

check_endpoints(df)

keep_columns = [
*SIGNALS,
*METRIC_SIGNALS,
"timestamp",
"state",
"population_served",
"normalization",
"provider",
]
return df[keep_columns]
Loading
Loading