From 17bc7ab9f75b09380e6f65cd1a65c7d09188b475 Mon Sep 17 00:00:00 2001 From: rumackaaron Date: Wed, 21 Oct 2020 17:00:21 -0400 Subject: [PATCH 1/9] Download CHC files via sftp --- .../delphi_changehc/download_ftp_files.py | 71 +++++++++++++++++++ changehc/delphi_changehc/run.py | 5 ++ changehc/params.json.template | 10 ++- changehc/setup.py | 3 +- 4 files changed, 86 insertions(+), 3 deletions(-) create mode 100644 changehc/delphi_changehc/download_ftp_files.py diff --git a/changehc/delphi_changehc/download_ftp_files.py b/changehc/delphi_changehc/download_ftp_files.py new file mode 100644 index 000000000..7ff2c4c66 --- /dev/null +++ b/changehc/delphi_changehc/download_ftp_files.py @@ -0,0 +1,71 @@ +""" +Downloads files modified in the last 24 hours from the delphi ftp server.""" + +# standard +import datetime +import functools +import sys +from os import path + +# third party +import paramiko + +class AllowAnythingPolicy(paramiko.MissingHostKeyPolicy): + def missing_host_key(self, client, hostname, key): + return + + +def print_callback(filename, bytes_so_far, bytes_total): + rough_percent_transferred = int(100 * (bytes_so_far / bytes_total)) + if (rough_percent_transferred % 25) == 0: + print(f'{filename} transfer: {rough_percent_transferred}%') + + +def get_files_from_dir(sftp, out_path): + current_time = datetime.datetime.now() + seconds_in_day = 24 * 60 * 60 + + # go through files in recieving dir + files_to_download = [] + for fileattr in sftp.listdir_attr(): + file_time = datetime.datetime.fromtimestamp(fileattr.st_mtime) + time_diff_to_current_time = current_time - file_time + if time_diff_to_current_time.total_seconds() <= seconds_in_day: + files_to_download.append(fileattr.filename) + + filepaths_to_download = {} + for file in files_to_download: + full_path = path.join(out_path, file) + if path.exists(full_path): + print(f"{file} exists, skipping") + else: + filepaths_to_download[file] = full_path + + # make sure we don't download more than 2 files per day + assert len(files_to_download) <= 2, "more files dropped than expected" + + # download! + for infile, outfile in filepaths_to_download.items(): + callback_for_filename = functools.partial(print_callback, infile) + sftp.get(infile, outfile, callback=callback_for_filename) + + +def download(out_path, ftp_conn): + + # open client + client = paramiko.SSHClient() + client.set_missing_host_key_policy(AllowAnythingPolicy()) + + client.connect(ftp_conn["host"], username=ftp_conn["user"], + password=ftp_conn["pass"][1:] + ftp_conn["pass"][0], + port=ftp_conn["port"], + allow_agent=False, look_for_keys=False) + sftp = client.open_sftp() + + sftp.chdir('/dailycounts/All_Outpatients_By_County') + get_files_from_dir(sftp, out_path) + + sftp.chdir('/dailycounts/Covid_Outpatients_By_County') + get_files_from_dir(sftp, out_path) + + client.close() diff --git a/changehc/delphi_changehc/run.py b/changehc/delphi_changehc/run.py index a85a9cf3b..01afc01bd 100644 --- a/changehc/delphi_changehc/run.py +++ b/changehc/delphi_changehc/run.py @@ -14,6 +14,7 @@ from delphi_utils import read_params # first party +from .download_ftp_files import download from .update_sensor import CHCSensorUpdator @@ -25,6 +26,10 @@ def run_module(): logging.basicConfig(level=logging.DEBUG) + ## download recent files from FTP server + logging.info("downloading recent files through SFTP") + download(params["cache_dir"], params["ftp_conn"]) + ## get end date from input file # the filenames are expected to be in the format: # Denominator: "YYYYMMDD_All_Outpatients_By_County.dat.gz" diff --git a/changehc/params.json.template b/changehc/params.json.template index 68f806fec..d2aae255b 100644 --- a/changehc/params.json.template +++ b/changehc/params.json.template @@ -18,5 +18,11 @@ "aws_access_key_id": "", "aws_secret_access_key": "" }, - "bucket_name": "" -} \ No newline at end of file + "bucket_name": "", + "ftp_conn": { + "host": "", + "user": "", + "pass": "", + "port": 0 + } +} diff --git a/changehc/setup.py b/changehc/setup.py index 3a8521fd7..17302c243 100644 --- a/changehc/setup.py +++ b/changehc/setup.py @@ -11,7 +11,8 @@ "delphi-utils", "covidcast", "boto3", - "moto" + "moto", + "paramiko" ] setup( From e4343fc53162a0b5831dfa4c52a5ebcea457aab6 Mon Sep 17 00:00:00 2001 From: rumackaaron Date: Wed, 21 Oct 2020 17:00:31 -0400 Subject: [PATCH 2/9] Modify signal name --- changehc/delphi_changehc/constants.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/changehc/delphi_changehc/constants.py b/changehc/delphi_changehc/constants.py index 107fd49d3..9a2cb29d4 100644 --- a/changehc/delphi_changehc/constants.py +++ b/changehc/delphi_changehc/constants.py @@ -1,6 +1,6 @@ """Registry for signal names and geo types""" -SMOOTHED = "smoothed_chc" -SMOOTHED_ADJ = "smoothed_adj_chc" +SMOOTHED = "smoothed_cli" +SMOOTHED_ADJ = "smoothed_adj_cli" SIGNALS = [SMOOTHED, SMOOTHED_ADJ] NA = "NA" HRR = "hrr" From 638c578ec6cde87654014ce5baf8b4437dfd1e69 Mon Sep 17 00:00:00 2001 From: Aaron Rumack Date: Thu, 22 Oct 2020 17:31:21 -0400 Subject: [PATCH 3/9] Update comment in download_ftp_files.py Co-authored-by: krivard --- changehc/delphi_changehc/download_ftp_files.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changehc/delphi_changehc/download_ftp_files.py b/changehc/delphi_changehc/download_ftp_files.py index 7ff2c4c66..7d7b5f904 100644 --- a/changehc/delphi_changehc/download_ftp_files.py +++ b/changehc/delphi_changehc/download_ftp_files.py @@ -1,5 +1,5 @@ """ -Downloads files modified in the last 24 hours from the delphi ftp server.""" +Downloads files modified in the last 24 hours from the specified ftp server.""" # standard import datetime From 571d51d3900aa6ca53e4d14a47cc089e9256b0d6 Mon Sep 17 00:00:00 2001 From: rumackaaron Date: Fri, 23 Oct 2020 14:25:27 -0400 Subject: [PATCH 4/9] New signal name --- changehc/tests/test_update_sensor.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/changehc/tests/test_update_sensor.py b/changehc/tests/test_update_sensor.py index 1ca3e7880..b1329543f 100644 --- a/changehc/tests/test_update_sensor.py +++ b/changehc/tests/test_update_sensor.py @@ -268,7 +268,6 @@ def test_handle_wip_signal(self): assert signal_names[0].startswith("wip_") assert all(not s.startswith("wip_") for s in signal_names[1:]) # Test wip_signal = False (only unpublished signals should receive prefix) - # No CHC signal is published now, so both should get prefix signal_names = add_prefix(["xyzzy", SIGNALS[0]], False) assert signal_names[0].startswith("wip_") - assert all(s.startswith("wip_") for s in signal_names[1:]) + assert all(not s.startswith("wip_") for s in signal_names[1:]) From f115859c2a43ec1a1dd94ae7e742be653c2e837b Mon Sep 17 00:00:00 2001 From: rumackaaron Date: Fri, 23 Oct 2020 14:25:56 -0400 Subject: [PATCH 5/9] Code review suggestions --- .../delphi_changehc/download_ftp_files.py | 71 ++++++++++--------- 1 file changed, 37 insertions(+), 34 deletions(-) diff --git a/changehc/delphi_changehc/download_ftp_files.py b/changehc/delphi_changehc/download_ftp_files.py index 7ff2c4c66..dc4c7a58a 100644 --- a/changehc/delphi_changehc/download_ftp_files.py +++ b/changehc/delphi_changehc/download_ftp_files.py @@ -10,39 +10,34 @@ # third party import paramiko -class AllowAnythingPolicy(paramiko.MissingHostKeyPolicy): - def missing_host_key(self, client, hostname, key): - return - def print_callback(filename, bytes_so_far, bytes_total): + """Log file transfer progress""" rough_percent_transferred = int(100 * (bytes_so_far / bytes_total)) if (rough_percent_transferred % 25) == 0: print(f'{filename} transfer: {rough_percent_transferred}%') def get_files_from_dir(sftp, out_path): + """Download files from sftp server that have been uploaded in last day + Args: + sftp: SFTP Session from Paramiko client + out_path: Path to local directory into which to download the files + """ + current_time = datetime.datetime.now() - seconds_in_day = 24 * 60 * 60 # go through files in recieving dir - files_to_download = [] + filepaths_to_download = {} for fileattr in sftp.listdir_attr(): file_time = datetime.datetime.fromtimestamp(fileattr.st_mtime) - time_diff_to_current_time = current_time - file_time - if time_diff_to_current_time.total_seconds() <= seconds_in_day: - files_to_download.append(fileattr.filename) - - filepaths_to_download = {} - for file in files_to_download: - full_path = path.join(out_path, file) - if path.exists(full_path): - print(f"{file} exists, skipping") - else: - filepaths_to_download[file] = full_path + filename = fileattr.filename + if current_time - file_time < datetime.timedelta(days=1) and \ + not path.exists(filename): + filepaths_to_download[filename] = path.join(out_path, filename) # make sure we don't download more than 2 files per day - assert len(files_to_download) <= 2, "more files dropped than expected" + assert len(filepaths_to_download) <= 2, "more files dropped than expected" # download! for infile, outfile in filepaths_to_download.items(): @@ -51,21 +46,29 @@ def get_files_from_dir(sftp, out_path): def download(out_path, ftp_conn): + """Downloads files necessary to create CHC signal from ftp server. + Args: + out_path: Path to local directory into which to download the files + ftp_conn: Dict containing login credentials to ftp server + """ # open client - client = paramiko.SSHClient() - client.set_missing_host_key_policy(AllowAnythingPolicy()) - - client.connect(ftp_conn["host"], username=ftp_conn["user"], - password=ftp_conn["pass"][1:] + ftp_conn["pass"][0], - port=ftp_conn["port"], - allow_agent=False, look_for_keys=False) - sftp = client.open_sftp() - - sftp.chdir('/dailycounts/All_Outpatients_By_County') - get_files_from_dir(sftp, out_path) - - sftp.chdir('/dailycounts/Covid_Outpatients_By_County') - get_files_from_dir(sftp, out_path) - - client.close() + try: + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + + client.connect(ftp_conn["host"], username=ftp_conn["user"], + password=ftp_conn["pass"][1:] + ftp_conn["pass"][0], + port=ftp_conn["port"], + allow_agent=False, look_for_keys=False) + sftp = client.open_sftp() + + sftp.chdir('/dailycounts/All_Outpatients_By_County') + get_files_from_dir(sftp, out_path) + + sftp.chdir('/dailycounts/Covid_Outpatients_By_County') + get_files_from_dir(sftp, out_path) + + finally: + if client: + client.close() From b310fd95916bfaad7044283f645edf9dae1f8ddf Mon Sep 17 00:00:00 2001 From: rumackaaron Date: Fri, 23 Oct 2020 14:35:34 -0400 Subject: [PATCH 6/9] Corrected path to check for previous download --- changehc/delphi_changehc/download_ftp_files.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changehc/delphi_changehc/download_ftp_files.py b/changehc/delphi_changehc/download_ftp_files.py index bdeb25abd..a1a1820dd 100644 --- a/changehc/delphi_changehc/download_ftp_files.py +++ b/changehc/delphi_changehc/download_ftp_files.py @@ -33,7 +33,7 @@ def get_files_from_dir(sftp, out_path): file_time = datetime.datetime.fromtimestamp(fileattr.st_mtime) filename = fileattr.filename if current_time - file_time < datetime.timedelta(days=1) and \ - not path.exists(filename): + not path.exists(path.join(out_path, filename)): filepaths_to_download[filename] = path.join(out_path, filename) # make sure we don't download more than 2 files per day From b27f6da16bb059e30b7bef131048d6c19c074254 Mon Sep 17 00:00:00 2001 From: rumackaaron Date: Mon, 26 Oct 2020 14:29:01 -0400 Subject: [PATCH 7/9] Add download tests --- changehc/tests/test_download_ftp_files.py | 63 +++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 changehc/tests/test_download_ftp_files.py diff --git a/changehc/tests/test_download_ftp_files.py b/changehc/tests/test_download_ftp_files.py new file mode 100644 index 000000000..5808b4882 --- /dev/null +++ b/changehc/tests/test_download_ftp_files.py @@ -0,0 +1,63 @@ +# standard +import pytest +import mock +from datetime import datetime as dt +from datetime import timedelta + +# first party +from delphi_changehc.download_ftp_files import * + +class TestDownloadFTPFiles: + + class MockSFTP: + + # Mocks an SFTP connection + def __init__(self, attrs): + self.attrs = attrs + self.num_gets = 0 + + # Attrs are modified time and filename + def listdir_attr(self): + return self.attrs + + # Don't download anything, just note that method was called + def get(self, infile, outfile, callback=None): + self.num_gets += 1 + return + + + class FileAttr: + + def __init__(self, time, name): + self.st_mtime = time + self.filename = name + + + @mock.patch("os.path") + def test_get_files(self, mock_path): + + # When one new file is present, one file is downloaded + one_new = self.MockSFTP([self.FileAttr(dt.timestamp(dt.now()-timedelta(minutes=1)),"foo")]) + get_files_from_dir(one_new, "") + assert one_new.num_gets == 1 + + # When one new file and one old file are present, one file is downloaded + one_new_one_old = self.MockSFTP([self.FileAttr(dt.timestamp(dt.now()-timedelta(minutes=1)),"foo"), + self.FileAttr(dt.timestamp(dt.now()-timedelta(days=10)),"foo")]) + get_files_from_dir(one_new_one_old, "") + assert one_new_one_old.num_gets == 1 + + # When three new files are present, AssertionError + new_file1 = self.FileAttr(dt.timestamp(dt.now()-timedelta(minutes=1)),"foo1") + new_file2 = self.FileAttr(dt.timestamp(dt.now()-timedelta(minutes=1)),"foo2") + new_file3 = self.FileAttr(dt.timestamp(dt.now()-timedelta(minutes=1)),"foo3") + three_new = self.MockSFTP([new_file1, new_file2, new_file3]) + with pytest.raises(AssertionError): + get_files_from_dir(three_new,"") + + # When the file already exists, no files are downloaded + mock_path.exists.return_value = True + one_exists = self.MockSFTP([new_file1]) + get_files_from_dir(one_new, "") + assert one_exists.num_gets == 0 + From c8fe35f257f5680f0892d013dabbb0458529ca5a Mon Sep 17 00:00:00 2001 From: rumackaaron Date: Mon, 26 Oct 2020 14:30:40 -0400 Subject: [PATCH 8/9] Modify test params --- changehc/tests/params.json.template | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/changehc/tests/params.json.template b/changehc/tests/params.json.template index 3702a8a5e..68c16e4be 100644 --- a/changehc/tests/params.json.template +++ b/changehc/tests/params.json.template @@ -2,8 +2,8 @@ "static_file_dir": "../static", "export_dir": "./receiving", "cache_dir": "./cache", - "input_emr_file": "test_data/SYNICUE_CMB_INPATIENT_11062020.csv.gz", - "input_claims_file": "test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz", + "input_denom_file": "test_data/20200601_All_Outpatients_By_County.dat", + "input_covid_file": "test_data/20200601_Covid_Outpatients_By_County.dat", "start_date": "2020-02-01", "end_date": "2020-02-02", "drop_date": "2020-02-02", @@ -19,4 +19,4 @@ "aws_secret_access_key": "FAKE_TEST_SECRET_ACCESS_KEY" }, "bucket_name": "test_bucket" -} \ No newline at end of file +} From 7248fb4eed7d21502eb43cc8749426eab81c9b71 Mon Sep 17 00:00:00 2001 From: rumackaaron Date: Mon, 26 Oct 2020 21:50:38 -0400 Subject: [PATCH 9/9] Improve linting --- changehc/delphi_changehc/config.py | 1 - changehc/delphi_changehc/download_ftp_files.py | 3 +-- changehc/delphi_changehc/run.py | 2 +- changehc/delphi_changehc/sensor.py | 2 +- changehc/delphi_changehc/update_sensor.py | 10 +++++----- 5 files changed, 8 insertions(+), 10 deletions(-) diff --git a/changehc/delphi_changehc/config.py b/changehc/delphi_changehc/config.py index 217e81daa..07f3dde8b 100644 --- a/changehc/delphi_changehc/config.py +++ b/changehc/delphi_changehc/config.py @@ -6,7 +6,6 @@ """ from datetime import datetime, timedelta -import numpy as np class Config: diff --git a/changehc/delphi_changehc/download_ftp_files.py b/changehc/delphi_changehc/download_ftp_files.py index a1a1820dd..ec2382004 100644 --- a/changehc/delphi_changehc/download_ftp_files.py +++ b/changehc/delphi_changehc/download_ftp_files.py @@ -4,7 +4,6 @@ # standard import datetime import functools -import sys from os import path # third party @@ -68,7 +67,7 @@ def download(out_path, ftp_conn): sftp.chdir('/dailycounts/Covid_Outpatients_By_County') get_files_from_dir(sftp, out_path) - + finally: if client: client.close() diff --git a/changehc/delphi_changehc/run.py b/changehc/delphi_changehc/run.py index 01afc01bd..97dce3f6c 100644 --- a/changehc/delphi_changehc/run.py +++ b/changehc/delphi_changehc/run.py @@ -34,7 +34,7 @@ def run_module(): # the filenames are expected to be in the format: # Denominator: "YYYYMMDD_All_Outpatients_By_County.dat.gz" # Numerator: "YYYYMMDD_Covid_Outpatients_By_County.dat.gz" - + if params["drop_date"] is None: dropdate_denom = datetime.strptime( Path(params["input_denom_file"]).name.split("_")[0], "%Y%m%d" diff --git a/changehc/delphi_changehc/sensor.py b/changehc/delphi_changehc/sensor.py index bef8869e7..7a625e4fe 100644 --- a/changehc/delphi_changehc/sensor.py +++ b/changehc/delphi_changehc/sensor.py @@ -129,7 +129,7 @@ def fit(y_data, first_sensor_date, geo_id, num_col="num", den_col="den"): # checks - due to the smoother, the first value will be NA assert ( - np.sum(np.isnan(smoothed_total_rates[1:]) == True) == 0 + np.sum(np.isnan(smoothed_total_rates[1:])) == 0 ), "NAs in rate calculation" assert ( np.sum(smoothed_total_rates[1:] <= 0) == 0 diff --git a/changehc/delphi_changehc/update_sensor.py b/changehc/delphi_changehc/update_sensor.py index 2f2efb169..91b44b36e 100644 --- a/changehc/delphi_changehc/update_sensor.py +++ b/changehc/delphi_changehc/update_sensor.py @@ -5,7 +5,6 @@ """ # standard packages import logging -from datetime import timedelta from multiprocessing import Pool, cpu_count import covidcast from delphi_utils import GeoMapper, S3ArchiveDiffer, read_params @@ -18,7 +17,7 @@ from .load_data import load_combined_data from .sensor import CHCSensor from .weekday import Weekday -from .constants import SIGNALS, SMOOTHED, SMOOTHED_ADJ, HRR, NA, FIPS +from .constants import SIGNALS, SMOOTHED, SMOOTHED_ADJ, NA def write_to_csv(output_dict, write_se, out_name, output_path="."): @@ -123,6 +122,8 @@ def public_signal(signal_): class CHCSensorUpdator: + """Contains methods to update sensor and write results to csv + """ def __init__(self, startdate, @@ -181,10 +182,10 @@ def geo_reindex(self, data): # get right geography geo = self.geo gmpr = GeoMapper() - if geo not in {"county", "state", "msa", "hrr"}: + if geo not in {"county", "state", "msa", "hrr"}: logging.error(f"{geo} is invalid, pick one of 'county', 'state', 'msa', 'hrr'") return False - elif geo == "county": + if geo == "county": data_frame = gmpr.fips_to_megacounty(data,Config.MIN_DEN,Config.MAX_BACKFILL_WINDOW,thr_col="den",mega_col=geo) elif geo == "state": data_frame = gmpr.replace_geocode(data, "fips", "state_id", new_col="state") @@ -305,4 +306,3 @@ def update_sensor(self, for exported_file in fails: print(f"Failed to archive '{exported_file}'") ''' - return