From 2cb25535e5a89b8aa837b00c92ccda44a41bd178 Mon Sep 17 00:00:00 2001 From: rumackaaron Date: Tue, 1 Dec 2020 15:24:32 -0500 Subject: [PATCH 1/7] Add constants for cli --- changehc/delphi_changehc/config.py | 20 +++++++++++++++++--- changehc/delphi_changehc/constants.py | 4 +++- changehc/params.json.template | 4 ++++ 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/changehc/delphi_changehc/config.py b/changehc/delphi_changehc/config.py index 07f3dde8b..1cabb1eb4 100644 --- a/changehc/delphi_changehc/config.py +++ b/changehc/delphi_changehc/config.py @@ -25,15 +25,29 @@ class Config: ## data columns COVID_COL = "COVID" DENOM_COL = "Denominator" - COUNT_COLS = ["COVID"] + ["Denominator"] + FLU_COL = "Flu" + MIXED_COL = "Mixed" + FLU_LIKE_COL = "Flu-like" + COVID_LIKE_COL = "Covid-like" + COUNT_COLS = [COVID_COL,DENOM_COL,FLU_COL,MIXED_COL,FLU_LIKE_COL,COVID_LIKE_COL] DATE_COL = "date" GEO_COL = "fips" ID_COLS = [DATE_COL] + [GEO_COL] FILT_COLS = ID_COLS + COUNT_COLS + DENOM_COLS = [GEO_COL, DATE_COL, DENOM_COL] COVID_COLS = [GEO_COL, DATE_COL, COVID_COL] - DENOM_DTYPES = {"date": str, "Denominator": str, "fips": str} - COVID_DTYPES = {"date": str, "COVID": str, "fips": str} + FLU_COLS = [GEO_COL, DATE_COL, FLU_COL] + MIXED_COLS = [GEO_COL, DATE_COL, MIXED_COL] + FLU_LIKE_COLS = [GEO_COL, DATE_COL, FLU_LIKE_COL] + COVID_LIKE_COLS = [GEO_COL, DATE_COL, COVID_LIKE_COL] + + DENOM_DTYPES = {DATE_COL: str, DENOM_COL: str, GEO_COL: str} + COVID_DTYPES = {DATE_COL: str, COVID_COL: str, GEO_COL: str} + FLU_DTYPES = {DATE_COL: str, FLU_COL: str, GEO_COL: str} + MIXED_DTYPES = {DATE_COL: str, MIXED_COL: str, GEO_COL: str} + FLU_LIKE_DTYPES = {DATE_COL: str, FLU_LIKE_COL: str, GEO_COL: str} + COVID_LIKE_DTYPES = {DATE_COL: str, COVID_LIKE_COL: str, GEO_COL: str} SMOOTHER_BANDWIDTH = 100 # bandwidth for the linear left Gaussian filter MIN_DEN = 100 # number of total visits needed to produce a sensor diff --git a/changehc/delphi_changehc/constants.py b/changehc/delphi_changehc/constants.py index 645cf3d3b..144679ebb 100644 --- a/changehc/delphi_changehc/constants.py +++ b/changehc/delphi_changehc/constants.py @@ -1,7 +1,9 @@ """Registry for signal names and geo types""" SMOOTHED = "smoothed_covid" SMOOTHED_ADJ = "smoothed_adj_covid" -SIGNALS = [SMOOTHED, SMOOTHED_ADJ] +SMOOTHED_CLI = "smoothed_cli" +SMOOTHED_ADJ_CLI = "smoothed_adj_cli" +SIGNALS = [SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI] NA = "NA" HRR = "hrr" FIPS = "fips" diff --git a/changehc/params.json.template b/changehc/params.json.template index 54467564b..5598cbdcd 100644 --- a/changehc/params.json.template +++ b/changehc/params.json.template @@ -4,6 +4,10 @@ "cache_dir": "./cache", "input_denom_file": null, "input_covid_file": null, + "input_flu_file": null, + "input_mixed_file": null, + "input_flu_like_file": null, + "input_covid_like_file": null, "start_date": "2020-02-01", "end_date": null, "drop_date": null, From 5e22295fc17bdd755ff676adac22568497d995ba Mon Sep 17 00:00:00 2001 From: rumackaaron Date: Tue, 1 Dec 2020 15:25:06 -0500 Subject: [PATCH 2/7] Load files for cli --- .../delphi_changehc/download_ftp_files.py | 43 ++++- changehc/delphi_changehc/load_data.py | 155 ++++++++++-------- 2 files changed, 124 insertions(+), 74 deletions(-) diff --git a/changehc/delphi_changehc/download_ftp_files.py b/changehc/delphi_changehc/download_ftp_files.py index 576492615..3ff6ad85f 100644 --- a/changehc/delphi_changehc/download_ftp_files.py +++ b/changehc/delphi_changehc/download_ftp_files.py @@ -44,8 +44,8 @@ def get_files_from_dir(sftp, out_path): sftp.get(infile, outfile, callback=callback_for_filename) -def download(out_path, ftp_conn): - """Downloads files necessary to create CHC signal from ftp server. +def download_covid(out_path, ftp_conn): + """Downloads files necessary to create chng-covid signal from ftp server. Args: out_path: Path to local directory into which to download the files ftp_conn: Dict containing login credentials to ftp server @@ -71,3 +71,42 @@ def download(out_path, ftp_conn): finally: if client: client.close() + + +def download_cli(out_path, ftp_conn): + """Downloads files necessary to create chng-cli signal from ftp server. + Args: + out_path: Path to local directory into which to download the files + ftp_conn: Dict containing login credentials to ftp server + """ + + # open client + try: + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + + client.connect(ftp_conn["host"], username=ftp_conn["user"], + password=ftp_conn["pass"], + port=ftp_conn["port"], + allow_agent=False, look_for_keys=False) + sftp = client.open_sftp() + + sftp.chdir('/dailycounts/All_Outpatients_By_County') + get_files_from_dir(sftp, out_path) + + sftp.chdir('/dailycounts/Flu_Patient_Count_By_County') + get_files_from_dir(sftp, out_path) + + sftp.chdir('/dailycounts/Mixed_Patient_Count_By_County') + get_files_from_dir(sftp, out_path) + + sftp.chdir('/dailycounts/Flu_Like_Patient_Count_By_County') + get_files_from_dir(sftp, out_path) + + sftp.chdir('/dailycounts/Covid_Like_Patient_Count_By_County') + get_files_from_dir(sftp, out_path) + + finally: + if client: + client.close() + diff --git a/changehc/delphi_changehc/load_data.py b/changehc/delphi_changehc/load_data.py index bf5509beb..0e9d91100 100644 --- a/changehc/delphi_changehc/load_data.py +++ b/changehc/delphi_changehc/load_data.py @@ -12,116 +12,114 @@ from .config import Config -def load_denom_data(denom_filepath, dropdate, base_geo): - """Load in and set up denominator data. +def load_chng_data(filepath, dropdate, base_geo, + col_names, col_types, counts_col): + """Load in and set up daily count data from Change. Args: - denom_filepath: path to the aggregated denominator data + filepath: path to aggregated data dropdate: data drop date (datetime object) base_geo: base geographic unit before aggregation ('fips') + col_names: column names of data + col_types: column types of data + counts_col: name of column containing counts Returns: - cleaned denominator dataframe + cleaned dataframe """ - assert base_geo == "fips", "base unit must be 'fips'" - - denom_suffix = denom_filepath.split("/")[-1].split(".")[0][9:] - assert denom_suffix == "All_Outpatients_By_County" - denom_filetype = denom_filepath.split("/")[-1].split(".")[1] - assert denom_filetype == "dat" - denom_data = pd.read_csv( - denom_filepath, + assert base_geo == "fips", "base unit must be 'fips'" + count_flag = False + date_flag = False + geo_flag = False + for n in col_names: + if n == counts_col: + count_flag = True + elif n == Config.DATE_COL: + date_flag = True + elif n == "fips": + geo_flag = True + assert count_flag, "counts_col must be present in col_names" + assert date_flag, "'%s' must be present in col_names"%(CONFIG.DATE_COL) + assert geo_flag, "'fips' must be present in col_names" + + data = pd.read_csv( + filepath, sep="|", header=None, - names=Config.DENOM_COLS, - dtype=Config.DENOM_DTYPES, + names=col_names, + dtype=col_types, ) - denom_data[Config.DATE_COL] = \ - pd.to_datetime(denom_data[Config.DATE_COL],errors="coerce") + data[Config.DATE_COL] = \ + pd.to_datetime(data[Config.DATE_COL],errors="coerce") # restrict to start and end date - denom_data = denom_data[ - (denom_data[Config.DATE_COL] >= Config.FIRST_DATA_DATE) & - (denom_data[Config.DATE_COL] < dropdate) + data = data[ + (data[Config.DATE_COL] >= Config.FIRST_DATA_DATE) & + (data[Config.DATE_COL] < dropdate) ] # counts between 1 and 3 are coded as "3 or less", we convert to 1 - denom_data[Config.DENOM_COL][ - denom_data[Config.DENOM_COL] == "3 or less" + data[counts_col][ + data[counts_col] == "3 or less" ] = "1" - denom_data[Config.DENOM_COL] = denom_data[Config.DENOM_COL].astype(int) + data[counts_col] = data[counts_col].astype(int) assert ( - (denom_data[Config.DENOM_COL] >= 0).all().all() - ), "Denominator counts must be nonnegative" + (data[counts_col] >= 0).all().all() + ), "Counts must be nonnegative" # aggregate age groups (so data is unique by date and base geography) - denom_data = denom_data.groupby([base_geo, Config.DATE_COL]).sum() - denom_data.dropna(inplace=True) # drop rows with any missing entries + data = data.groupby([base_geo, Config.DATE_COL]).sum() + data.dropna(inplace=True) # drop rows with any missing entries + + return data - return denom_data -def load_covid_data(covid_filepath, dropdate, base_geo): - """Load in and set up denominator data. +def load_combined_data(denom_filepath, covid_filepath, dropdate, base_geo): + """Load in denominator and covid data, and combine them. Args: + denom_filepath: path to the aggregated denominator data covid_filepath: path to the aggregated covid data dropdate: data drop date (datetime object) base_geo: base geographic unit before aggregation ('fips') Returns: - cleaned denominator dataframe + combined multiindexed dataframe, index 0 is geo_base, index 1 is date """ assert base_geo == "fips", "base unit must be 'fips'" - covid_suffix = covid_filepath.split("/")[-1].split(".")[0][9:] - assert covid_suffix == "Covid_Outpatients_By_County" - covid_filetype = covid_filepath.split("/")[-1].split(".")[1] - assert covid_filetype == "dat" - - covid_data = pd.read_csv( - covid_filepath, - sep="|", - header=None, - names=Config.COVID_COLS, - dtype=Config.COVID_DTYPES, - parse_dates=[Config.DATE_COL] - ) - - covid_data[Config.DATE_COL] = \ - pd.to_datetime(covid_data[Config.DATE_COL],errors="coerce") - - # restrict to start and end date - covid_data = covid_data[ - (covid_data[Config.DATE_COL] >= Config.FIRST_DATA_DATE) & - (covid_data[Config.DATE_COL] < dropdate) - ] - - # counts between 1 and 3 are coded as "3 or less", we convert to 1 - covid_data[Config.COVID_COL][ - covid_data[Config.COVID_COL] == "3 or less" - ] = "1" - covid_data[Config.COVID_COL] = covid_data[Config.COVID_COL].astype(int) + # load each data stream + denom_data = load_chng_data(denom_filepath, dropdate, base_geo, + Config.DENOM_COLS, Config.DENOM_DTYPES, Config.DENOM_COL) + covid_data = load_chng_data(covid_filepath, dropdate, base_geo, + Config.COVID_COLS, Config.COVID_DTYPES, Config.COVID_COL) - assert ( - (covid_data[Config.COVID_COL] >= 0).all().all() - ), "COVID counts must be nonnegative" + # merge data + data = denom_data.merge(covid_data, how="outer", left_index=True, right_index=True) + assert data.isna().all(axis=1).sum() == 0, "entire row is NA after merge" - # aggregate age groups (so data is unique by date and base geography) - covid_data = covid_data.groupby([base_geo, Config.DATE_COL]).sum() - covid_data.dropna(inplace=True) # drop rows with any missing entries + # calculate combined numerator and denominator + data.fillna(0, inplace=True) + data["num"] = data[Config.COVID_COL] + data["den"] = data[Config.DENOM_COL] + data = data[["num", "den"]] - return covid_data + return data -def load_combined_data(denom_filepath, covid_filepath, dropdate, base_geo): - """Load in denominator and covid data, and combine them. +def load_cli_data(denom_filepath, flu_filepath, mixed_filepath, flu_like_filepath, + covid_like_filepath, dropdate, base_geo): + """Load in denominator and covid-like data, and combine them. Args: denom_filepath: path to the aggregated denominator data - covid_filepath: path to the aggregated covid data + flu_filepath: path to the aggregated flu data + mixed_filepath: path to the aggregated mixed data + flu_like_filepath: path to the aggregated flu-like data + covid_like_filepath: path to the aggregated covid-like data dropdate: data drop date (datetime object) base_geo: base geographic unit before aggregation ('fips') @@ -131,16 +129,29 @@ def load_combined_data(denom_filepath, covid_filepath, dropdate, base_geo): assert base_geo == "fips", "base unit must be 'fips'" # load each data stream - denom_data = load_denom_data(denom_filepath, dropdate, base_geo) - covid_data = load_covid_data(covid_filepath, dropdate, base_geo) + denom_data = load_chng_data(denom_filepath, dropdate, base_geo, + Config.DENOM_COLS, Config.DENOM_DTYPES, Config.DENOM_COL) + flu_data = load_chng_data(flu_filepath, dropdate, base_geo, + Config.FLU_COLS, Config.FLU_DTYPES, Config.FLU_COL) + mixed_data = load_chng_data(mixed_filepath, dropdate, base_geo, + Config.MIXED_COLS, Config.MIXED_DTYPES, Config.MIXED_COL) + flu_like_data = load_chng_data(flu_like_filepath, dropdate, base_geo, + Config.FLU_LIKE_COLS, Config.FLU_LIKE_DTYPES, Config.FLU_LIKE_COL) + covid_like_data = load_chng_data(covid_like_filepath, dropdate, base_geo, + Config.COVID_LIKE_COLS, Config.COVID_LIKE_DTYPES, Config.COVID_LIKE_COL) # merge data - data = denom_data.merge(covid_data, how="outer", left_index=True, right_index=True) + data = denom_data.merge(flu_data, how="outer", left_index=True, right_index=True) + data = data.merge(mixed_data, how="outer", left_index=True, right_index=True) + data = data.merge(flu_like_data, how="outer", left_index=True, right_index=True) + data = data.merge(covid_like_data, how="outer", left_index=True, right_index=True) assert data.isna().all(axis=1).sum() == 0, "entire row is NA after merge" # calculate combined numerator and denominator data.fillna(0, inplace=True) - data["num"] = data[Config.COVID_COL] + data["num"] = -data[Config.FLU_COL] + data[Config.MIXED_COL] + data[Config.FLU_LIKE_COL] + data["num"] = data["num"].clip(lower=0) + data["num"] = data["num"] + data[Config.COVID_LIKE_COL] data["den"] = data[Config.DENOM_COL] data = data[["num", "den"]] From 595e1dd9ae1a18bf40bfdd4d640fa58af27ab10a Mon Sep 17 00:00:00 2001 From: rumackaaron Date: Tue, 1 Dec 2020 15:25:27 -0500 Subject: [PATCH 3/7] Adjust pipeline --- changehc/delphi_changehc/run.py | 64 +++++++++++++++-------- changehc/delphi_changehc/update_sensor.py | 21 ++++---- 2 files changed, 52 insertions(+), 33 deletions(-) diff --git a/changehc/delphi_changehc/run.py b/changehc/delphi_changehc/run.py index 168602f56..5506b1c67 100644 --- a/changehc/delphi_changehc/run.py +++ b/changehc/delphi_changehc/run.py @@ -14,7 +14,8 @@ from delphi_utils import read_params # first party -from .download_ftp_files import download +from .download_ftp_files import download_covid, download_cli +from .load_data import load_combined_data, load_cli_data from .update_sensor import CHCSensorUpdator @@ -45,13 +46,24 @@ def run_module(): ## download recent files from FTP server logging.info("downloading recent files through SFTP") - download(params["cache_dir"], params["ftp_conn"]) + if "covid" in params["types"]: + download_covid(params["cache_dir"], params["ftp_conn"]) + if "cli" in params["types"]: + download_cli(params["cache_dir"], params["ftp_conn"]) input_denom_file = "%s/%s_All_Outpatients_By_County.dat.gz" % (params["cache_dir"],filedate) input_covid_file = "%s/%s_Covid_Outpatients_By_County.dat.gz" % (params["cache_dir"],filedate) + input_flu_file = "%s/%s_Flu_Patient_Count_By_County.dat.gz" % (params["cache_dir"],filedate) + input_mixed_file = "%s/%s_Mixed_Patient_Count_By_County.dat.gz" % (params["cache_dir"],filedate) + input_flu_like_file = "%s/%s_Flu_Like_Patient_Count_By_County.dat.gz" % (params["cache_dir"],filedate) + input_covid_like_file = "%s/%s_Covid_Like_Patient_Count_By_County.dat.gz" % (params["cache_dir"],filedate) else: input_denom_file = params["input_denom_file"] input_covid_file = params["input_covid_file"] + input_flu_file = params["input_flu_file"] + input_mixed_file = params["input_mixed_file"] + input_flu_like_file = params["input_flu_like_file"] + input_covid_like_file = params["input_covid_like_file"] dropdate = str(dropdate_dt.date()) @@ -80,29 +92,37 @@ def run_module(): logging.info("outpath:\t\t%s", params["export_dir"]) logging.info("parallel:\t\t%s", params["parallel"]) logging.info("weekday:\t\t%s", params["weekday"]) + logging.info("types:\t\t%s", params["types"]) logging.info("se:\t\t\t%s", params["se"]) ## start generating for geo in params["geos"]: - for weekday in params["weekday"]: - if weekday: - logging.info("starting %s, weekday adj", geo) - else: - logging.info("starting %s, no adj", geo) - su_inst = CHCSensorUpdator( - startdate, - enddate, - dropdate, - geo, - params["parallel"], - weekday, - params["se"] - ) - su_inst.update_sensor( - input_denom_file, - input_covid_file, - params["export_dir"] - ) - logging.info("finished %s", geo) + for numtype in params["types"]: + for weekday in params["weekday"]: + if weekday: + logging.info("starting %s, %s, weekday adj", geo, numtype) + else: + logging.info("starting %s, %s, no adj", geo, numtype) + su_inst = CHCSensorUpdator( + startdate, + enddate, + dropdate, + geo, + params["parallel"], + weekday, + numtype, + params["se"] + ) + if numtype == "covid": + data = load_combined_data(input_denom_file, + input_covid_file,dropdate_dt,"fips") + elif numtype == "cli": + data = load_cli_data(input_denom_file,input_flu_file,input_mixed_file, + input_flu_like_file,input_covid_like_file,dropdate_dt,"fips") + su_inst.update_sensor( + data, + params["export_dir"] + ) + logging.info("finished %s", geo) logging.info("finished all") diff --git a/changehc/delphi_changehc/update_sensor.py b/changehc/delphi_changehc/update_sensor.py index 58d3fe2d8..cfaf2214e 100644 --- a/changehc/delphi_changehc/update_sensor.py +++ b/changehc/delphi_changehc/update_sensor.py @@ -14,8 +14,7 @@ # first party from .config import Config, Constants -from .constants import SIGNALS, SMOOTHED, SMOOTHED_ADJ, NA -from .load_data import load_combined_data +from .constants import * from .sensor import CHCSensor from .weekday import Weekday @@ -83,6 +82,7 @@ def __init__(self, geo, parallel, weekday, + numtype, se): """Init Sensor Updator Args: @@ -92,6 +92,7 @@ def __init__(self, geo: geographic resolution, one of ["county", "state", "msa", "hrr"] parallel: boolean to run the sensor update in parallel weekday: boolean to adjust for weekday effects + numtype: type of count data used, one of ["covid", "cli"] se: boolean to write out standard errors, if true, use an obfuscated name """ self.startdate, self.enddate, self.dropdate = [ @@ -103,11 +104,13 @@ def __init__(self, assert self.enddate <= self.dropdate, "end date > drop date" assert geo in ['county', 'state', 'msa', 'hrr'],\ f"{geo} is invalid, pick one of 'county', 'state', 'msa', 'hrr'" - self.geo, self.parallel, self.weekday, self.se = geo.lower(), parallel, weekday, se + self.geo, self.parallel, self.weekday, self.numtype, self.se = geo.lower(), parallel, weekday, numtype, se # output file naming - signals = SIGNALS.copy() - signals.remove(SMOOTHED if self.weekday else SMOOTHED_ADJ) + if self.numtype == "covid": + signals = [SMOOTHED_ADJ if self.weekday else SMOOTHED] + elif self.numtype == "cli": + signals = [SMOOTHED_ADJ_CLI if self.weekday else SMOOTHED_CLI] signal_names = add_prefix( signals, wip_signal=read_params()["wip_signal"]) @@ -171,15 +174,12 @@ def geo_reindex(self, data): return data_frame - def update_sensor(self, - denom_filepath, - covid_filepath, + data, outpath): """Generate sensor values, and write to csv format. Args: - denom_filepath: path to the aggregated denominator data - covid_filepath: path to the aggregated covid data + data: pd.DataFrame with columns num and den outpath: output path for the csv results """ self.shift_dates() @@ -188,7 +188,6 @@ def update_sensor(self, # load data base_geo = "fips" - data = load_combined_data(denom_filepath, covid_filepath, self.dropdate, base_geo) data.reset_index(inplace=True) data_frame = self.geo_reindex(data) From d0db10687908195593668e800df9bcbe1f2d3ac0 Mon Sep 17 00:00:00 2001 From: rumackaaron Date: Tue, 1 Dec 2020 15:42:14 -0500 Subject: [PATCH 4/7] Fix linting --- changehc/delphi_changehc/download_ftp_files.py | 1 - changehc/delphi_changehc/load_data.py | 2 +- changehc/delphi_changehc/run.py | 2 +- changehc/delphi_changehc/update_sensor.py | 4 +--- 4 files changed, 3 insertions(+), 6 deletions(-) diff --git a/changehc/delphi_changehc/download_ftp_files.py b/changehc/delphi_changehc/download_ftp_files.py index 3ff6ad85f..eee83366b 100644 --- a/changehc/delphi_changehc/download_ftp_files.py +++ b/changehc/delphi_changehc/download_ftp_files.py @@ -109,4 +109,3 @@ def download_cli(out_path, ftp_conn): finally: if client: client.close() - diff --git a/changehc/delphi_changehc/load_data.py b/changehc/delphi_changehc/load_data.py index 0e9d91100..28ad8c4dd 100644 --- a/changehc/delphi_changehc/load_data.py +++ b/changehc/delphi_changehc/load_data.py @@ -40,7 +40,7 @@ def load_chng_data(filepath, dropdate, base_geo, elif n == "fips": geo_flag = True assert count_flag, "counts_col must be present in col_names" - assert date_flag, "'%s' must be present in col_names"%(CONFIG.DATE_COL) + assert date_flag, "'%s' must be present in col_names"%(Config.DATE_COL) assert geo_flag, "'fips' must be present in col_names" data = pd.read_csv( diff --git a/changehc/delphi_changehc/run.py b/changehc/delphi_changehc/run.py index 5506b1c67..97a64fb31 100644 --- a/changehc/delphi_changehc/run.py +++ b/changehc/delphi_changehc/run.py @@ -19,7 +19,7 @@ from .update_sensor import CHCSensorUpdator -def run_module(): +def run_module(): # pylint: disable=too-many-branches,too-many-statements """Run the delphi_changehc module. """ diff --git a/changehc/delphi_changehc/update_sensor.py b/changehc/delphi_changehc/update_sensor.py index cfaf2214e..60730aa0f 100644 --- a/changehc/delphi_changehc/update_sensor.py +++ b/changehc/delphi_changehc/update_sensor.py @@ -14,7 +14,7 @@ # first party from .config import Config, Constants -from .constants import * +from .constants import SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI, NA from .sensor import CHCSensor from .weekday import Weekday @@ -187,8 +187,6 @@ def update_sensor(self, (self.burn_in_dates <= self.enddate) # load data - base_geo = "fips" - data.reset_index(inplace=True) data_frame = self.geo_reindex(data) # handle if we need to adjust by weekday From b7f213fbce232b147bd0d62f6cc11d5ec3c5a03d Mon Sep 17 00:00:00 2001 From: rumackaaron Date: Tue, 1 Dec 2020 16:19:49 -0500 Subject: [PATCH 5/7] Update tests --- changehc/tests/test_load_data.py | 18 ++++++++---------- changehc/tests/test_update_sensor.py | 7 +++++-- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/changehc/tests/test_load_data.py b/changehc/tests/test_load_data.py index 3f09d9ec2..52b119e59 100644 --- a/changehc/tests/test_load_data.py +++ b/changehc/tests/test_load_data.py @@ -18,23 +18,21 @@ class TestLoadData: - denom_data = load_denom_data(DENOM_FILEPATH, DROP_DATE, "fips") - covid_data = load_covid_data(COVID_FILEPATH, DROP_DATE, "fips") + denom_data = load_chng_data(DENOM_FILEPATH, DROP_DATE, "fips", + Config.DENOM_COLS, Config.DENOM_DTYPES, Config.DENOM_COL) + covid_data = load_chng_data(COVID_FILEPATH, DROP_DATE, "fips", + Config.COVID_COLS, Config.COVID_DTYPES, Config.COVID_COL) combined_data = load_combined_data(DENOM_FILEPATH, COVID_FILEPATH, DROP_DATE, "fips") def test_base_unit(self): with pytest.raises(AssertionError): - load_denom_data(DENOM_FILEPATH, DROP_DATE, "foo") + load_chng_data(DENOM_FILEPATH, DROP_DATE, "foo", + Config.DENOM_COLS, Config.DENOM_DTYPES, Config.DENOM_COL) with pytest.raises(AssertionError): - load_denom_data("test_data/20200101_foo.dat", DROP_DATE, "fips") - - with pytest.raises(AssertionError): - load_covid_data(COVID_FILEPATH, DROP_DATE, "foo") - - with pytest.raises(AssertionError): - load_covid_data("test_data/20200101_foo.dat", DROP_DATE, "fips") + load_chng_data(DENOM_FILEPATH, DROP_DATE, "fips", + Config.DENOM_COLS, Config.DENOM_DTYPES, Config.COVID_COL) with pytest.raises(AssertionError): load_combined_data(DENOM_FILEPATH, COVID_FILEPATH, DROP_DATE, "foo") diff --git a/changehc/tests/test_update_sensor.py b/changehc/tests/test_update_sensor.py index bee2789fa..642274b96 100644 --- a/changehc/tests/test_update_sensor.py +++ b/changehc/tests/test_update_sensor.py @@ -31,6 +31,7 @@ class TestCHCSensorUpdator: geo = "county" parallel = False weekday = False + numtype = "covid" se = False prefix = "foo" small_test_data = pd.DataFrame({ @@ -48,6 +49,7 @@ def test_shift_dates(self): self.geo, self.parallel, self.weekday, + self.numtype, self.se ) ## Test init @@ -69,6 +71,7 @@ def test_geo_reindex(self): 'county', self.parallel, self.weekday, + self.numtype, self.se ) su_inst.shift_dates() @@ -87,6 +90,7 @@ def test_update_sensor(self): geo, self.parallel, self.weekday, + self.numtype, self.se ) @@ -97,8 +101,7 @@ def test_update_sensor(self): s3_client = Session(**aws_credentials).client("s3") s3_client.create_bucket(Bucket=params["bucket_name"]) su_inst.update_sensor( - DENOM_FILEPATH, - COVID_FILEPATH, + self.small_test_data, td.name) assert len(os.listdir(td.name)) == len(su_inst.sensor_dates),\ From f00586e7c93663d81abc1e8e4166d728e376779f Mon Sep 17 00:00:00 2001 From: rumackaaron Date: Tue, 1 Dec 2020 17:04:42 -0500 Subject: [PATCH 6/7] Modularized run --- changehc/delphi_changehc/run.py | 97 ++++++++++++++++++++++----------- 1 file changed, 64 insertions(+), 33 deletions(-) diff --git a/changehc/delphi_changehc/run.py b/changehc/delphi_changehc/run.py index 97a64fb31..9a0eb8536 100644 --- a/changehc/delphi_changehc/run.py +++ b/changehc/delphi_changehc/run.py @@ -18,8 +18,65 @@ from .load_data import load_combined_data, load_cli_data from .update_sensor import CHCSensorUpdator +def retrieve_files(params, filedate): + """Return filenames of relevant files, downloading them if necessary + """ + if params["input_denom_file"] is None: -def run_module(): # pylint: disable=too-many-branches,too-many-statements + ## download recent files from FTP server + logging.info("downloading recent files through SFTP") + if "covid" in params["types"]: + download_covid(params["cache_dir"], params["ftp_conn"]) + if "cli" in params["types"]: + download_cli(params["cache_dir"], params["ftp_conn"]) + + denom_file = "%s/%s_All_Outpatients_By_County.dat.gz" % (params["cache_dir"],filedate) + covid_file = "%s/%s_Covid_Outpatients_By_County.dat.gz" % (params["cache_dir"],filedate) + flu_file = "%s/%s_Flu_Patient_Count_By_County.dat.gz" % (params["cache_dir"],filedate) + mixed_file = "%s/%s_Mixed_Patient_Count_By_County.dat.gz" % (params["cache_dir"],filedate) + flu_like_file = "%s/%s_Flu_Like_Patient_Count_By_County.dat.gz" % (params["cache_dir"],filedate) + covid_like_file = "%s/%s_Covid_Like_Patient_Count_By_County.dat.gz" % (params["cache_dir"],filedate) + else: + denom_file = params["input_denom_file"] + covid_file = params["input_covid_file"] + flu_file = params["input_flu_file"] + mixed_file = params["input_mixed_file"] + flu_like_file = params["input_flu_like_file"] + covid_like_file = params["input_covid_like_file"] + + file_dict = {"denom": denom_file} + if "covid" in params["types"]: + file_dict["covid"] = covid_file + if "cli" in params["types"]: + file_dict["flu"] = flu_file + file_dict["mixed"] = mixed_file + file_dict["flu_like"] = flu_like_file + file_dict["covid_like"] = covid_like_file + return file_dict + + +def make_asserts(params): + """Assert that for each type, filenames are either all present or all absent + """ + if "covid" in params["types"]: + assert (params["input_denom_file"] is None) == (params["input_covid_file"] is None), \ + "exactly one of denom and covid files are provided" + if "cli" in params["types"]: + if params["input_denom_file"] is None: + assert params["input_flu_file"] is None and \ + params["input_mixed_file"] is None and \ + params["input_flu_like_file"] is None and \ + params["input_covid_like_file"] is None,\ + "files must be all present or all absent" + else: + assert params["input_flu_file"] is not None and \ + params["input_mixed_file"] is not None and \ + params["input_flu_like_file"] is not None and \ + params["input_covid_like_file"] is not None,\ + "files must be all present or all absent" + + +def run_module(): """Run the delphi_changehc module. """ @@ -27,12 +84,7 @@ def run_module(): # pylint: disable=too-many-branches,too-many-statements logging.basicConfig(level=logging.DEBUG) - # the filenames are expected to be in the format: - # Denominator: "YYYYMMDD_All_Outpatients_By_County.dat.gz" - # Numerator: "YYYYMMDD_Covid_Outpatients_By_County.dat.gz" - - assert (params["input_denom_file"] is None) == (params["input_covid_file"] is None), \ - "exactly one of denom and covid files are provided" + make_asserts(params) if params["drop_date"] is None: # files are dropped about 8pm the day after the issue date @@ -42,28 +94,7 @@ def run_module(): # pylint: disable=too-many-branches,too-many-statements dropdate_dt = datetime.strptime(params["drop_date"], "%Y-%m-%d") filedate = dropdate_dt.strftime("%Y%m%d") - if params["input_denom_file"] is None: - - ## download recent files from FTP server - logging.info("downloading recent files through SFTP") - if "covid" in params["types"]: - download_covid(params["cache_dir"], params["ftp_conn"]) - if "cli" in params["types"]: - download_cli(params["cache_dir"], params["ftp_conn"]) - - input_denom_file = "%s/%s_All_Outpatients_By_County.dat.gz" % (params["cache_dir"],filedate) - input_covid_file = "%s/%s_Covid_Outpatients_By_County.dat.gz" % (params["cache_dir"],filedate) - input_flu_file = "%s/%s_Flu_Patient_Count_By_County.dat.gz" % (params["cache_dir"],filedate) - input_mixed_file = "%s/%s_Mixed_Patient_Count_By_County.dat.gz" % (params["cache_dir"],filedate) - input_flu_like_file = "%s/%s_Flu_Like_Patient_Count_By_County.dat.gz" % (params["cache_dir"],filedate) - input_covid_like_file = "%s/%s_Covid_Like_Patient_Count_By_County.dat.gz" % (params["cache_dir"],filedate) - else: - input_denom_file = params["input_denom_file"] - input_covid_file = params["input_covid_file"] - input_flu_file = params["input_flu_file"] - input_mixed_file = params["input_mixed_file"] - input_flu_like_file = params["input_flu_like_file"] - input_covid_like_file = params["input_covid_like_file"] + file_dict = retrieve_files(params, filedate) dropdate = str(dropdate_dt.date()) @@ -114,11 +145,11 @@ def run_module(): # pylint: disable=too-many-branches,too-many-statements params["se"] ) if numtype == "covid": - data = load_combined_data(input_denom_file, - input_covid_file,dropdate_dt,"fips") + data = load_combined_data(file_dict["denom"], + file_dict["covid"],dropdate_dt,"fips") elif numtype == "cli": - data = load_cli_data(input_denom_file,input_flu_file,input_mixed_file, - input_flu_like_file,input_covid_like_file,dropdate_dt,"fips") + data = load_cli_data(file_dict["denom"],file_dict["flu"],file_dict["mixed"], + file_dict["flu_like"],file_dict["covid_like"],dropdate_dt,"fips") su_inst.update_sensor( data, params["export_dir"] From 41e15843e521a1697ba1bd67fc24deab9be5c4b1 Mon Sep 17 00:00:00 2001 From: rumackaaron Date: Tue, 1 Dec 2020 17:48:30 -0500 Subject: [PATCH 7/7] params refactor --- changehc/delphi_changehc/run.py | 36 +++++++++++++++++---------------- changehc/params.json.template | 14 +++++++------ 2 files changed, 27 insertions(+), 23 deletions(-) diff --git a/changehc/delphi_changehc/run.py b/changehc/delphi_changehc/run.py index 9a0eb8536..57a82d4f1 100644 --- a/changehc/delphi_changehc/run.py +++ b/changehc/delphi_changehc/run.py @@ -21,7 +21,8 @@ def retrieve_files(params, filedate): """Return filenames of relevant files, downloading them if necessary """ - if params["input_denom_file"] is None: + files = params["input_files"] + if files["denom"] is None: ## download recent files from FTP server logging.info("downloading recent files through SFTP") @@ -37,12 +38,12 @@ def retrieve_files(params, filedate): flu_like_file = "%s/%s_Flu_Like_Patient_Count_By_County.dat.gz" % (params["cache_dir"],filedate) covid_like_file = "%s/%s_Covid_Like_Patient_Count_By_County.dat.gz" % (params["cache_dir"],filedate) else: - denom_file = params["input_denom_file"] - covid_file = params["input_covid_file"] - flu_file = params["input_flu_file"] - mixed_file = params["input_mixed_file"] - flu_like_file = params["input_flu_like_file"] - covid_like_file = params["input_covid_like_file"] + denom_file = files["denom"] + covid_file = files["covid"] + flu_file = files["flu"] + mixed_file = files["mixed"] + flu_like_file = files["flu_like"] + covid_like_file = files["covid_like"] file_dict = {"denom": denom_file} if "covid" in params["types"]: @@ -58,21 +59,22 @@ def retrieve_files(params, filedate): def make_asserts(params): """Assert that for each type, filenames are either all present or all absent """ + files = params["input_files"] if "covid" in params["types"]: - assert (params["input_denom_file"] is None) == (params["input_covid_file"] is None), \ + assert (files["denom"] is None) == (files["covid"] is None), \ "exactly one of denom and covid files are provided" if "cli" in params["types"]: - if params["input_denom_file"] is None: - assert params["input_flu_file"] is None and \ - params["input_mixed_file"] is None and \ - params["input_flu_like_file"] is None and \ - params["input_covid_like_file"] is None,\ + if files["denom"] is None: + assert files["flu"] is None and \ + files["mixed"] is None and \ + files["flu_like"] is None and \ + files["covid_like"] is None,\ "files must be all present or all absent" else: - assert params["input_flu_file"] is not None and \ - params["input_mixed_file"] is not None and \ - params["input_flu_like_file"] is not None and \ - params["input_covid_like_file"] is not None,\ + assert files["flu"] is not None and \ + files["mixed"] is not None and \ + files["flu_like"] is not None and \ + files["covid_like"] is not None,\ "files must be all present or all absent" diff --git a/changehc/params.json.template b/changehc/params.json.template index 5598cbdcd..093fbbe52 100644 --- a/changehc/params.json.template +++ b/changehc/params.json.template @@ -2,12 +2,14 @@ "static_file_dir": "./static", "export_dir": "./receiving", "cache_dir": "./cache", - "input_denom_file": null, - "input_covid_file": null, - "input_flu_file": null, - "input_mixed_file": null, - "input_flu_like_file": null, - "input_covid_like_file": null, + "input_files": { + "denom": null, + "covid": null, + "flu": null, + "mixed": null, + "flu_like": null, + "covid_like": null + }, "start_date": "2020-02-01", "end_date": null, "drop_date": null,