Skip to content

Release covidcast-indicators 0.3.61 #2126

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.3.60
current_version = 0.3.61
commit = True
message = chore: bump covidcast-indicators to {new_version}
tag = False
8 changes: 1 addition & 7 deletions ansible/templates/sir_complainsalot-params-prod.json.j2
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,7 @@
},
"nssp": {
"max_age":19,
"maintainers": [],
"retired-signals": [
"pct_ed_visits_combined_2023rvr",
"pct_ed_visits_covid_2023rvr",
"pct_ed_visits_influenza_2023rvr",
"pct_ed_visits_rsv_2023rvr"
]
"maintainers": []
},
"nhsn": {
"max_age":19,
Expand Down
2 changes: 1 addition & 1 deletion changehc/version.cfg
Original file line number Diff line number Diff line change
@@ -1 +1 @@
current_version = 0.3.60
current_version = 0.3.61
2 changes: 1 addition & 1 deletion claims_hosp/version.cfg
Original file line number Diff line number Diff line change
@@ -1 +1 @@
current_version = 0.3.60
current_version = 0.3.61
2 changes: 1 addition & 1 deletion doctor_visits/version.cfg
Original file line number Diff line number Diff line change
@@ -1 +1 @@
current_version = 0.3.60
current_version = 0.3.61
2 changes: 1 addition & 1 deletion google_symptoms/version.cfg
Original file line number Diff line number Diff line change
@@ -1 +1 @@
current_version = 0.3.60
current_version = 0.3.61
2 changes: 1 addition & 1 deletion hhs_hosp/version.cfg
Original file line number Diff line number Diff line change
@@ -1 +1 @@
current_version = 0.3.60
current_version = 0.3.61
2 changes: 1 addition & 1 deletion nchs_mortality/version.cfg
Original file line number Diff line number Diff line change
@@ -1 +1 @@
current_version = 0.3.60
current_version = 0.3.61
50 changes: 40 additions & 10 deletions nhsn/delphi_nhsn/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,59 @@
PRELIM_DATASET_ID = "mpgq-jmmr"

# column name from socrata
TOTAL_ADMISSION_COVID_API = "totalconfc19newadm"
TOTAL_ADMISSION_FLU_API = "totalconfflunewadm"
TOTAL_ADMISSION_COVID_COL = "totalconfc19newadm"
TOTAL_ADMISSION_FLU_COL = "totalconfflunewadm"
TOTAL_ADMISSION_RSV_COL = "totalconfrsvnewadm"
NUM_HOSP_REPORTING_COVID_COL = "totalconfc19newadmhosprep"
NUM_HOSP_REPORTING_FLU_COL = "totalconfflunewadmhosprep"
NUM_HOSP_REPORTING_RSV_COL = "totalconfrsvnewadmhosprep"
# signal name
TOTAL_ADMISSION_COVID = "confirmed_admissions_covid_ew"
TOTAL_ADMISSION_FLU = "confirmed_admissions_flu_ew"
TOTAL_ADMISSION_RSV = "confirmed_admissions_rsv_ew"
NUM_HOSP_REPORTING_COVID = "hosprep_confirmed_admissions_covid_ew"
NUM_HOSP_REPORTING_FLU = "hosprep_confirmed_admissions_flu_ew"
NUM_HOSP_REPORTING_RSV = "hosprep_confirmed_admissions_rsv_ew"

SIGNALS_MAP = {
"confirmed_admissions_covid_ew": TOTAL_ADMISSION_COVID_API,
"confirmed_admissions_flu_ew": TOTAL_ADMISSION_FLU_API,
TOTAL_ADMISSION_COVID: TOTAL_ADMISSION_COVID_COL,
TOTAL_ADMISSION_FLU: TOTAL_ADMISSION_FLU_COL,
TOTAL_ADMISSION_RSV: TOTAL_ADMISSION_RSV_COL,
NUM_HOSP_REPORTING_COVID: NUM_HOSP_REPORTING_COVID_COL,
NUM_HOSP_REPORTING_FLU: NUM_HOSP_REPORTING_FLU_COL,
NUM_HOSP_REPORTING_RSV: NUM_HOSP_REPORTING_RSV_COL,
}

TYPE_DICT = {
"timestamp": "datetime64[ns]",
"geo_id": str,
"confirmed_admissions_covid_ew": float,
"confirmed_admissions_flu_ew": float,
TOTAL_ADMISSION_COVID: float,
TOTAL_ADMISSION_FLU: float,
TOTAL_ADMISSION_RSV: float,
NUM_HOSP_REPORTING_COVID: float,
NUM_HOSP_REPORTING_FLU: float,
NUM_HOSP_REPORTING_RSV: float,
}

# signal mapping for secondary, preliminary source
# made copy incase things would diverge

PRELIM_SIGNALS_MAP = {
"confirmed_admissions_covid_ew_prelim": TOTAL_ADMISSION_COVID_API,
"confirmed_admissions_flu_ew_prelim": TOTAL_ADMISSION_FLU_API,
f"{TOTAL_ADMISSION_COVID}_prelim": TOTAL_ADMISSION_COVID_COL,
f"{TOTAL_ADMISSION_FLU}_prelim": TOTAL_ADMISSION_FLU_COL,
f"{TOTAL_ADMISSION_RSV}_prelim": TOTAL_ADMISSION_RSV_COL,
f"{NUM_HOSP_REPORTING_COVID}_prelim": NUM_HOSP_REPORTING_COVID_COL,
f"{NUM_HOSP_REPORTING_FLU}_prelim": NUM_HOSP_REPORTING_FLU_COL,
f"{NUM_HOSP_REPORTING_RSV}_prelim": NUM_HOSP_REPORTING_RSV_COL,
}

PRELIM_TYPE_DICT = {
"timestamp": "datetime64[ns]",
"geo_id": str,
"confirmed_admissions_covid_ew_prelim": float,
"confirmed_admissions_flu_ew_prelim": float,
f"{TOTAL_ADMISSION_COVID}_prelim": float,
f"{TOTAL_ADMISSION_FLU}_prelim": float,
f"{TOTAL_ADMISSION_RSV}_prelim": float,
f"{NUM_HOSP_REPORTING_COVID}_prelim": float,
f"{NUM_HOSP_REPORTING_FLU}_prelim": float,
f"{NUM_HOSP_REPORTING_RSV}_prelim": float,
}
159 changes: 91 additions & 68 deletions nhsn/delphi_nhsn/pull.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
# -*- coding: utf-8 -*-
"""Functions for pulling NSSP ER data."""
import copy
import logging
import random
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from urllib.error import HTTPError

import pandas as pd
from delphi_utils import create_backup_csv
Expand All @@ -11,20 +16,77 @@
from .constants import MAIN_DATASET_ID, PRELIM_DATASET_ID, PRELIM_SIGNALS_MAP, PRELIM_TYPE_DICT, SIGNALS_MAP, TYPE_DICT


def pull_data(socrata_token: str, dataset_id: str):
def check_last_updated(socrata_token, dataset_id, logger):
"""
Check last updated timestamp to determine if data should be pulled or not.

Note -- if the call to the API fails, the behavior is to treat the data as stale,
as possibly having duplicate is preferable to missing data

Parameters
----------
socrata_token
dataset_id
logger

Returns bool
-------

"""
recently_updated_source = True
try:
client = Socrata("data.cdc.gov", socrata_token)
response = client.get_metadata(dataset_id)

updated_timestamp = datetime.utcfromtimestamp(int(response["rowsUpdatedAt"]))
now = datetime.utcnow()
recently_updated_source = (now - updated_timestamp) < timedelta(days=1)

prelim_prefix = "Preliminary " if dataset_id == PRELIM_DATASET_ID else ""
if recently_updated_source:
logger.info(
f"{prelim_prefix}NHSN data was recently updated; Pulling data", updated_timestamp=updated_timestamp
)
else:
logger.info(f"{prelim_prefix}NHSN data is stale; Skipping", updated_timestamp=updated_timestamp)
# pylint: disable=W0703
except Exception as e:
logger.info("error while processing socrata metadata; treating data as stale", error=str(e))
return recently_updated_source


def pull_data(socrata_token: str, dataset_id: str, backup_dir: str, logger):
"""Pull data from Socrata API."""
client = Socrata("data.cdc.gov", socrata_token)
logger.info(
f"Pulling {'main' if dataset_id == MAIN_DATASET_ID else 'preliminary'} data from Socrata API",
dataset_id=dataset_id,
)
results = []
offset = 0
limit = 50000 # maximum limit allowed by SODA 2.0
while True:
# retry logic for 500 error
try:
page = client.get(dataset_id, limit=limit, offset=offset)
if not page:
break # exit the loop if no more results
except HTTPError as err:
if err.code == 503:
time.sleep(2 + random.randint(0, 1000) / 1000.0)
page = client.get(dataset_id, limit=limit, offset=offset)
else:
logger.info("Error pulling data from Socrata API", error=str(err))
raise err

while len(page) > 0:
results.extend(page)
offset += limit
page = client.get(dataset_id, limit=limit, offset=offset)

df = pd.DataFrame.from_records(results)
if results:
df = pd.DataFrame.from_records(results)
sensor = "prelim" if dataset_id == PRELIM_DATASET_ID else None
create_backup_csv(df, backup_dir, False, sensor=sensor, logger=logger)
else:
df = pd.DataFrame()
return df


Expand Down Expand Up @@ -62,6 +124,7 @@ def pull_nhsn_data(
backup_dir: str,
custom_run: bool,
issue_date: Optional[str],
preliminary: bool = False,
logger: Optional[logging.Logger] = None,
):
"""Pull the latest NHSN hospital admission data, and conforms it into a dataset.
Expand All @@ -79,6 +142,10 @@ def pull_nhsn_data(
Directory to which to save raw backup data
custom_run: bool
Flag indicating if the current run is a patch. If so, don't save any data to disk
preliminary: bool
Flag indicating if the grabbing main or preliminary data
issue_date:
date to indicate which backup file to pull for patching
logger: Optional[logging.Logger]
logger object

Expand All @@ -87,83 +154,39 @@ def pull_nhsn_data(
pd.DataFrame
Dataframe as described above.
"""
dataset_id = PRELIM_DATASET_ID if preliminary else MAIN_DATASET_ID
# Pull data from Socrata API
df = (
pull_data(socrata_token, dataset_id=MAIN_DATASET_ID)
pull_data(socrata_token, dataset_id, backup_dir, logger)
if not custom_run
else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=False)
else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=preliminary)
)

keep_columns = list(TYPE_DICT.keys())

if not df.empty:
create_backup_csv(df, backup_dir, custom_run, logger=logger)

df = df.rename(columns={"weekendingdate": "timestamp", "jurisdiction": "geo_id"})

for signal, col_name in SIGNALS_MAP.items():
df[signal] = df[col_name]
recently_updated = True if custom_run else check_last_updated(socrata_token, dataset_id, logger)

df = df[keep_columns]
df["geo_id"] = df["geo_id"].str.lower()
df.loc[df["geo_id"] == "usa", "geo_id"] = "us"
df = df.astype(TYPE_DICT)
else:
df = pd.DataFrame(columns=keep_columns)
type_dict = PRELIM_TYPE_DICT if preliminary else TYPE_DICT
keep_columns = list(type_dict.keys())
filtered_type_dict = copy.deepcopy(type_dict)

return df


def pull_preliminary_nhsn_data(
socrata_token: str,
backup_dir: str,
custom_run: bool,
issue_date: Optional[str],
logger: Optional[logging.Logger] = None,
):
"""Pull the latest preliminary NHSN hospital admission data, and conforms it into a dataset.

The output dataset has:

- Each row corresponds to a single observation
- Each row additionally has columns for the signals in SIGNALS

Parameters
----------
socrata_token: str
My App Token for pulling the NHSN data
backup_dir: str
Directory to which to save raw backup data
custom_run: bool
Flag indicating if the current run is a patch. If so, don't save any data to disk
logger: Optional[logging.Logger]
logger object

Returns
-------
pd.DataFrame
Dataframe as described above.
"""
df = (
pull_data(socrata_token, dataset_id=PRELIM_DATASET_ID)
if not custom_run
else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=True)
)

keep_columns = list(PRELIM_TYPE_DICT.keys())

if not df.empty:
create_backup_csv(df, backup_dir, custom_run, sensor="prelim", logger=logger)
signal_map = PRELIM_SIGNALS_MAP if preliminary else SIGNALS_MAP

if not df.empty and recently_updated:
df = df.rename(columns={"weekendingdate": "timestamp", "jurisdiction": "geo_id"})

for signal, col_name in PRELIM_SIGNALS_MAP.items():
df[signal] = df[col_name]
for signal, col_name in signal_map.items():
# older backups don't have certain columns
try:
df[signal] = df[col_name]
except KeyError:
logger.info("column not available in data", col_name=col_name)
keep_columns.remove(signal)
del filtered_type_dict[signal]

df = df[keep_columns]
df = df.astype(PRELIM_TYPE_DICT)
df["geo_id"] = df["geo_id"].str.lower()
df.loc[df["geo_id"] == "usa", "geo_id"] = "us"

df = df.astype(filtered_type_dict)
else:
df = pd.DataFrame(columns=keep_columns)

Expand Down
Loading