-
Notifications
You must be signed in to change notification settings - Fork 16
2085 add proportions nhsn #2111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 24 commits
ff91c4c
5ef99b2
6b19402
ad92262
f4b3c40
1df478c
6e5a99b
7cabd8a
6a73c35
6e0d4c2
2da6c08
1e408ba
77662dc
783ab24
76d5436
18de943
e3e96bf
e9bb0a7
33f3db5
88fbc6e
7e6b23a
a220e0d
d8f237b
11ceae9
ebe52aa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,13 @@ | ||
# -*- coding: utf-8 -*- | ||
"""Functions for pulling NSSP ER data.""" | ||
import copy | ||
import logging | ||
import random | ||
import time | ||
from datetime import datetime, timedelta | ||
from pathlib import Path | ||
from typing import Optional | ||
from urllib.error import HTTPError | ||
|
||
import pandas as pd | ||
from delphi_utils import create_backup_csv | ||
|
@@ -11,20 +16,73 @@ | |
from .constants import MAIN_DATASET_ID, PRELIM_DATASET_ID, PRELIM_SIGNALS_MAP, PRELIM_TYPE_DICT, SIGNALS_MAP, TYPE_DICT | ||
|
||
|
||
def pull_data(socrata_token: str, dataset_id: str): | ||
def check_last_updated(socrata_token, dataset_id, logger): | ||
""" | ||
Check last updated timestamp to determine data should be pulled or not. | ||
|
||
Note -- the behavior of the api fail is to treat is as stale | ||
as having possible duplicate is preferable compared to possible missing data | ||
|
||
Parameters | ||
---------- | ||
socrata_token | ||
dataset_id | ||
logger | ||
|
||
Returns bool | ||
------- | ||
|
||
""" | ||
recently_updated_source = True | ||
try: | ||
client = Socrata("data.cdc.gov", socrata_token) | ||
response = client.get_metadata(dataset_id) | ||
|
||
updated_timestamp = datetime.utcfromtimestamp(int(response["rowsUpdatedAt"])) | ||
now = datetime.utcnow() | ||
recently_updated_source = (now - updated_timestamp) < timedelta(days=1) | ||
|
||
prelim_prefix = "Preliminary " if dataset_id == PRELIM_DATASET_ID else "" | ||
if recently_updated_source: | ||
logger.info( | ||
f"{prelim_prefix}NHSN data was recently updated; Pulling data", updated_timestamp=updated_timestamp | ||
) | ||
else: | ||
logger.info(f"{prelim_prefix}NHSN data is stale; Skipping", updated_timestamp=updated_timestamp) | ||
# pylint: disable=W0703 | ||
except Exception as e: | ||
logger.info("error while processing socrata metadata; treating data as stale", error=str(e)) | ||
return recently_updated_source | ||
|
||
|
||
def pull_data(socrata_token: str, dataset_id: str, backup_dir: str, logger): | ||
"""Pull data from Socrata API.""" | ||
client = Socrata("data.cdc.gov", socrata_token) | ||
logger.info("Pulling data from Socrata API") | ||
results = [] | ||
offset = 0 | ||
limit = 50000 # maximum limit allowed by SODA 2.0 | ||
while True: | ||
# retry logic for 500 error | ||
try: | ||
page = client.get(dataset_id, limit=limit, offset=offset) | ||
if not page: | ||
break # exit the loop if no more results | ||
except HTTPError as err: | ||
if err.code == 503: | ||
time.sleep(2 + random.randint(0, 1000) / 1000.0) | ||
aysim319 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
page = client.get(dataset_id, limit=limit, offset=offset) | ||
else: | ||
logger.info("Error pulling data from Socrata API", error=str(err)) | ||
raise err | ||
aysim319 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
while len(page) > 0: | ||
results.extend(page) | ||
offset += limit | ||
page = client.get(dataset_id, limit=limit, offset=offset) | ||
|
||
df = pd.DataFrame.from_records(results) | ||
if results: | ||
df = pd.DataFrame.from_records(results) | ||
create_backup_csv(df, backup_dir, False, logger=logger) | ||
else: | ||
df = pd.DataFrame() | ||
return df | ||
|
||
|
||
|
@@ -89,25 +147,33 @@ def pull_nhsn_data( | |
""" | ||
# Pull data from Socrata API | ||
df = ( | ||
pull_data(socrata_token, dataset_id=MAIN_DATASET_ID) | ||
pull_data(socrata_token, MAIN_DATASET_ID, backup_dir, logger) | ||
if not custom_run | ||
else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=False) | ||
) | ||
|
||
keep_columns = list(TYPE_DICT.keys()) | ||
recently_updated = True if custom_run else check_last_updated(socrata_token, MAIN_DATASET_ID, logger) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion: if we put this before the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that was the original thought, but previously you brought up if there's multiple failures in a row and the solution was to squirrel away for now, but at least avoid duplicating for both raw and processed |
||
|
||
if not df.empty: | ||
create_backup_csv(df, backup_dir, custom_run, logger=logger) | ||
keep_columns = list(TYPE_DICT.keys()) | ||
|
||
if not df.empty and recently_updated: | ||
df = df.rename(columns={"weekendingdate": "timestamp", "jurisdiction": "geo_id"}) | ||
filtered_type_dict = copy.deepcopy(TYPE_DICT) | ||
|
||
for signal, col_name in SIGNALS_MAP.items(): | ||
df[signal] = df[col_name] | ||
# older backups don't have certain columns | ||
try: | ||
df[signal] = df[col_name] | ||
except KeyError: | ||
logger.info("column not available in data", col_name=col_name) | ||
keep_columns.remove(signal) | ||
del filtered_type_dict[signal] | ||
|
||
df = df[keep_columns] | ||
df["geo_id"] = df["geo_id"].str.lower() | ||
df.loc[df["geo_id"] == "usa", "geo_id"] = "us" | ||
df = df.astype(TYPE_DICT) | ||
|
||
df = df.astype(filtered_type_dict) | ||
else: | ||
df = pd.DataFrame(columns=keep_columns) | ||
|
||
|
@@ -144,24 +210,31 @@ def pull_preliminary_nhsn_data( | |
pd.DataFrame | ||
Dataframe as described above. | ||
""" | ||
# Pull data from Socrata API | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know they're similar, i thought about it and went back and forth about it but I was in the thought of maybe in the future there would be something different going on so kept it seperate. I'm not too concerned about this, since we'll be slowly deprecating this codebase; |
||
df = ( | ||
pull_data(socrata_token, dataset_id=PRELIM_DATASET_ID) | ||
pull_data(socrata_token, PRELIM_DATASET_ID, backup_dir, logger) | ||
if not custom_run | ||
else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=True) | ||
) | ||
|
||
keep_columns = list(PRELIM_TYPE_DICT.keys()) | ||
recently_updated = True if custom_run else check_last_updated(socrata_token, PRELIM_DATASET_ID, logger) | ||
|
||
if not df.empty: | ||
create_backup_csv(df, backup_dir, custom_run, sensor="prelim", logger=logger) | ||
|
||
if not df.empty and recently_updated: | ||
df = df.rename(columns={"weekendingdate": "timestamp", "jurisdiction": "geo_id"}) | ||
filtered_type_dict = copy.deepcopy(PRELIM_TYPE_DICT) | ||
|
||
for signal, col_name in PRELIM_SIGNALS_MAP.items(): | ||
df[signal] = df[col_name] | ||
try: | ||
df[signal] = df[col_name] | ||
except KeyError: | ||
logger.info("column not available in data", col_name=col_name, signal=signal) | ||
keep_columns.remove(signal) | ||
del filtered_type_dict[signal] | ||
|
||
df = df[keep_columns] | ||
df = df.astype(PRELIM_TYPE_DICT) | ||
df = df.astype(filtered_type_dict) | ||
|
||
df["geo_id"] = df["geo_id"].str.lower() | ||
df.loc[df["geo_id"] == "usa", "geo_id"] = "us" | ||
else: | ||
|
Uh oh!
There was an error while loading. Please reload this page.