Skip to content

Add NSSP secondary source #2074

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Nov 17, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions nssp/delphi_nssp/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,29 @@
"fips": str,
}
)

SECONDARY_COLS_MAP = {
"week_end": "timestamp",
"geography": "geo_value",
"percent_visits": "val",
"pathogen": "signal",
}

SECONDARY_SIGNALS_MAP = {
"COVID-19": "pct_ed_visits_covid_secondary",
"INFLUENZA": "pct_ed_visits_influenza_secondary",
"RSV": "pct_ed_visits_rsv_secondary",
"Combined": "pct_ed_visits_combined_secondary",
}

SECONDARY_SIGNALS = [val for (key, val) in SECONDARY_SIGNALS_MAP.items()]
SECONDARY_GEOS = ["state", "nation", "hhs"]

SECONDARY_TYPE_DICT = {
"timestamp": "datetime64[ns]",
"geo_value": str,
"val": float,
"geo_type": str,
"signal": str,
}
SECONDARY_KEEP_COLS = [key for (key, val) in SECONDARY_TYPE_DICT.items()]
100 changes: 84 additions & 16 deletions nssp/delphi_nssp/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,16 @@
import pandas as pd
from sodapy import Socrata

from .constants import NEWLINE, SIGNALS, SIGNALS_MAP, TYPE_DICT
from .constants import (
NEWLINE,
SECONDARY_COLS_MAP,
SECONDARY_KEEP_COLS,
SECONDARY_SIGNALS_MAP,
SECONDARY_TYPE_DICT,
SIGNALS,
SIGNALS_MAP,
TYPE_DICT,
)


def warn_string(df, type_dict):
Expand All @@ -27,38 +36,50 @@ def warn_string(df, type_dict):
return warn


def pull_nssp_data(socrata_token: str):
"""Pull the latest NSSP ER visits data, and conforms it into a dataset.

The output dataset has:

- Each row corresponds to a single observation
- Each row additionally has columns for the signals in SIGNALS
def pull_with_socrata_api(socrata_token: str, dataset_id: str):
"""Pull data from Socrata API.

Parameters
----------
socrata_token: str
My App Token for pulling the NWSS data (could be the same as the nchs data)
test_file: Optional[str]
When not null, name of file from which to read test data
My App Token for pulling the NSSP data (could be the same as the nchs data)
dataset_id: str
The dataset id to pull data from

Returns
-------
pd.DataFrame
Dataframe as described above.
list of dictionaries, each representing a row in the dataset
"""
# Pull data from Socrata API
client = Socrata("data.cdc.gov", socrata_token)
results = []
offset = 0
limit = 50000 # maximum limit allowed by SODA 2.0
while True:
page = client.get("rdmq-nq56", limit=limit, offset=offset)
page = client.get(dataset_id, limit=limit, offset=offset)
if not page:
break # exit the loop if no more results
results.extend(page)
offset += limit
df_ervisits = pd.DataFrame.from_records(results)
return results


def pull_nssp_data(socrata_token: str):
"""Pull the latest NSSP ER visits primary dataset.

https://data.cdc.gov/Public-Health-Surveillance/NSSP-Emergency-Department-Visit-Trajectories-by-St/rdmq-nq56/data_preview

Parameters
----------
socrata_token: str
My App Token for pulling the NSSP data (could be the same as the nchs data)

Returns
-------
pd.DataFrame
Dataframe as described above.
"""
socrata_results = pull_with_socrata_api(socrata_token, "rdmq-nq56")
df_ervisits = pd.DataFrame.from_records(socrata_results)
df_ervisits = df_ervisits.rename(columns={"week_end": "timestamp"})
df_ervisits = df_ervisits.rename(columns=SIGNALS_MAP)

Expand All @@ -72,3 +93,50 @@ def pull_nssp_data(socrata_token: str):

keep_columns = ["timestamp", "geography", "county", "fips"]
return df_ervisits[SIGNALS + keep_columns]


def secondary_pull_nssp_data(socrata_token: str):
"""Pull the latest NSSP ER visits secondary dataset.

https://data.cdc.gov/Public-Health-Surveillance/2023-Respiratory-Virus-Response-NSSP-Emergency-Dep/7mra-9cq9/data_preview

The output dataset has:

- Each row corresponds to a single observation

Parameters
----------
socrata_token: str
My App Token for pulling the NSSP data (could be the same as the nchs data)

Returns
-------
pd.DataFrame
Dataframe as described above.
"""
socrata_results = pull_with_socrata_api(socrata_token, "7mra-9cq9")
df_ervisits = pd.DataFrame.from_records(socrata_results)
df_ervisits = df_ervisits.rename(columns=SECONDARY_COLS_MAP)

# geo_type is not provided in the dataset, so we infer it from the geo_value
# which is either state names, "National" or hhs region numbers
df_ervisits["geo_type"] = "state"

df_ervisits.loc[df_ervisits["geo_value"] == "National", "geo_type"] = "nation"

hhs_region_mask = df_ervisits["geo_value"].str.startswith("Region ")
df_ervisits.loc[hhs_region_mask, "geo_value"] = df_ervisits.loc[hhs_region_mask, "geo_value"].str.replace(
"Region ", ""
)
df_ervisits.loc[hhs_region_mask, "geo_type"] = "hhs"

df_ervisits["signal"] = df_ervisits["signal"].map(SECONDARY_SIGNALS_MAP)

df_ervisits = df_ervisits[SECONDARY_KEEP_COLS]

try:
df_ervisits = df_ervisits.astype(SECONDARY_TYPE_DICT)
except KeyError as exc:
raise ValueError(warn_string(df_ervisits, SECONDARY_TYPE_DICT)) from exc

return df_ervisits
47 changes: 45 additions & 2 deletions nssp/delphi_nssp/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
- "cache_dir": str, directory of locally cached data
"""

import sys
import time
from datetime import datetime

Expand All @@ -31,8 +32,8 @@
from delphi_utils.geomap import GeoMapper
from delphi_utils.nancodes import add_default_nancodes

from .constants import AUXILIARY_COLS, CSV_COLS, GEOS, SIGNALS
from .pull import pull_nssp_data
from .constants import AUXILIARY_COLS, CSV_COLS, GEOS, SECONDARY_GEOS, SECONDARY_SIGNALS, SIGNALS
from .pull import pull_nssp_data, secondary_pull_nssp_data


def add_needed_columns(df, col_names=None):
Expand Down Expand Up @@ -81,6 +82,7 @@ def run_module(params):
socrata_token = params["indicator"]["socrata_token"]

run_stats = []

## build the base version of the signal at the most detailed geo level you can get.
## compute stuff here or farm out to another function or file
df_pull = pull_nssp_data(socrata_token)
Expand Down Expand Up @@ -137,5 +139,46 @@ def run_module(params):
if len(dates) > 0:
run_stats.append((max(dates), len(dates)))

secondary_df_pull = secondary_pull_nssp_data(socrata_token)
## aggregate
geo_mapper = GeoMapper()
for signal in SECONDARY_SIGNALS:
for geo in SECONDARY_GEOS:
df = secondary_df_pull.copy()
logger.info("Generating signal and exporting to CSV", geo_type=geo, signal=signal)
if geo == "state":
df = df[(df["geo_type"] == "state")]
df["geo_id"] = df["geo_value"].apply(
lambda x: (
us.states.lookup(x).abbr.lower()
if us.states.lookup(x)
else ("dc" if x == "District of Columbia" else x)
)
)
unexpected_state_names = df[df["geo_id"] == df["geo_value"]]
if unexpected_state_names.shape[0] > 0:
logger.error("Unexpected state names", df=unexpected_state_names)
sys.exit(1)
elif geo == "nation":
df = df[(df["geo_type"] == "nation")]
df["geo_id"] = "us"
elif geo == "hhs":
df = df[(df["geo_type"] == "hhs")]
df["geo_id"] = df["geo_type"]
# add se, sample_size, and na codes
missing_cols = set(CSV_COLS) - set(df.columns)
df = add_needed_columns(df, col_names=list(missing_cols))
df_csv = df[CSV_COLS + ["timestamp"]]
# actual export
dates = create_export_csv(
df_csv,
geo_res=geo,
export_dir=export_dir,
sensor=signal,
weekly_dates=True,
)
if len(dates) > 0:
run_stats.append((max(dates), len(dates)))

## log this indicator run
logging(start_time, run_stats, logger)
Loading
Loading