Skip to content

Commit d947887

Browse files
authored
Merge pull request #352 from cmu-delphi/changehc
CHC SFTP Downloads
2 parents 38a5a10 + 7248fb4 commit d947887

File tree

11 files changed

+164
-18
lines changed

11 files changed

+164
-18
lines changed

changehc/delphi_changehc/config.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
"""
77

88
from datetime import datetime, timedelta
9-
import numpy as np
109

1110

1211
class Config:

changehc/delphi_changehc/constants.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""Registry for signal names and geo types"""
2-
SMOOTHED = "smoothed_chc"
3-
SMOOTHED_ADJ = "smoothed_adj_chc"
2+
SMOOTHED = "smoothed_cli"
3+
SMOOTHED_ADJ = "smoothed_adj_cli"
44
SIGNALS = [SMOOTHED, SMOOTHED_ADJ]
55
NA = "NA"
66
HRR = "hrr"
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
"""
2+
Downloads files modified in the last 24 hours from the specified ftp server."""
3+
4+
# standard
5+
import datetime
6+
import functools
7+
from os import path
8+
9+
# third party
10+
import paramiko
11+
12+
13+
def print_callback(filename, bytes_so_far, bytes_total):
14+
"""Log file transfer progress"""
15+
rough_percent_transferred = int(100 * (bytes_so_far / bytes_total))
16+
if (rough_percent_transferred % 25) == 0:
17+
print(f'{filename} transfer: {rough_percent_transferred}%')
18+
19+
20+
def get_files_from_dir(sftp, out_path):
21+
"""Download files from sftp server that have been uploaded in last day
22+
Args:
23+
sftp: SFTP Session from Paramiko client
24+
out_path: Path to local directory into which to download the files
25+
"""
26+
27+
current_time = datetime.datetime.now()
28+
29+
# go through files in recieving dir
30+
filepaths_to_download = {}
31+
for fileattr in sftp.listdir_attr():
32+
file_time = datetime.datetime.fromtimestamp(fileattr.st_mtime)
33+
filename = fileattr.filename
34+
if current_time - file_time < datetime.timedelta(days=1) and \
35+
not path.exists(path.join(out_path, filename)):
36+
filepaths_to_download[filename] = path.join(out_path, filename)
37+
38+
# make sure we don't download more than 2 files per day
39+
assert len(filepaths_to_download) <= 2, "more files dropped than expected"
40+
41+
# download!
42+
for infile, outfile in filepaths_to_download.items():
43+
callback_for_filename = functools.partial(print_callback, infile)
44+
sftp.get(infile, outfile, callback=callback_for_filename)
45+
46+
47+
def download(out_path, ftp_conn):
48+
"""Downloads files necessary to create CHC signal from ftp server.
49+
Args:
50+
out_path: Path to local directory into which to download the files
51+
ftp_conn: Dict containing login credentials to ftp server
52+
"""
53+
54+
# open client
55+
try:
56+
client = paramiko.SSHClient()
57+
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
58+
59+
client.connect(ftp_conn["host"], username=ftp_conn["user"],
60+
password=ftp_conn["pass"][1:] + ftp_conn["pass"][0],
61+
port=ftp_conn["port"],
62+
allow_agent=False, look_for_keys=False)
63+
sftp = client.open_sftp()
64+
65+
sftp.chdir('/dailycounts/All_Outpatients_By_County')
66+
get_files_from_dir(sftp, out_path)
67+
68+
sftp.chdir('/dailycounts/Covid_Outpatients_By_County')
69+
get_files_from_dir(sftp, out_path)
70+
71+
finally:
72+
if client:
73+
client.close()

changehc/delphi_changehc/run.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from delphi_utils import read_params
1515

1616
# first party
17+
from .download_ftp_files import download
1718
from .update_sensor import CHCSensorUpdator
1819

1920

@@ -25,11 +26,15 @@ def run_module():
2526

2627
logging.basicConfig(level=logging.DEBUG)
2728

29+
## download recent files from FTP server
30+
logging.info("downloading recent files through SFTP")
31+
download(params["cache_dir"], params["ftp_conn"])
32+
2833
## get end date from input file
2934
# the filenames are expected to be in the format:
3035
# Denominator: "YYYYMMDD_All_Outpatients_By_County.dat.gz"
3136
# Numerator: "YYYYMMDD_Covid_Outpatients_By_County.dat.gz"
32-
37+
3338
if params["drop_date"] is None:
3439
dropdate_denom = datetime.strptime(
3540
Path(params["input_denom_file"]).name.split("_")[0], "%Y%m%d"

changehc/delphi_changehc/sensor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ def fit(y_data, first_sensor_date, geo_id, num_col="num", den_col="den"):
129129

130130
# checks - due to the smoother, the first value will be NA
131131
assert (
132-
np.sum(np.isnan(smoothed_total_rates[1:]) == True) == 0
132+
np.sum(np.isnan(smoothed_total_rates[1:])) == 0
133133
), "NAs in rate calculation"
134134
assert (
135135
np.sum(smoothed_total_rates[1:] <= 0) == 0

changehc/delphi_changehc/update_sensor.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
"""
66
# standard packages
77
import logging
8-
from datetime import timedelta
98
from multiprocessing import Pool, cpu_count
109
import covidcast
1110
from delphi_utils import GeoMapper, S3ArchiveDiffer, read_params
@@ -18,7 +17,7 @@
1817
from .load_data import load_combined_data
1918
from .sensor import CHCSensor
2019
from .weekday import Weekday
21-
from .constants import SIGNALS, SMOOTHED, SMOOTHED_ADJ, HRR, NA, FIPS
20+
from .constants import SIGNALS, SMOOTHED, SMOOTHED_ADJ, NA
2221

2322

2423
def write_to_csv(output_dict, write_se, out_name, output_path="."):
@@ -123,6 +122,8 @@ def public_signal(signal_):
123122

124123

125124
class CHCSensorUpdator:
125+
"""Contains methods to update sensor and write results to csv
126+
"""
126127

127128
def __init__(self,
128129
startdate,
@@ -181,10 +182,10 @@ def geo_reindex(self, data):
181182
# get right geography
182183
geo = self.geo
183184
gmpr = GeoMapper()
184-
if geo not in {"county", "state", "msa", "hrr"}:
185+
if geo not in {"county", "state", "msa", "hrr"}:
185186
logging.error(f"{geo} is invalid, pick one of 'county', 'state', 'msa', 'hrr'")
186187
return False
187-
elif geo == "county":
188+
if geo == "county":
188189
data_frame = gmpr.fips_to_megacounty(data,Config.MIN_DEN,Config.MAX_BACKFILL_WINDOW,thr_col="den",mega_col=geo)
189190
elif geo == "state":
190191
data_frame = gmpr.replace_geocode(data, "fips", "state_id", new_col="state")
@@ -305,4 +306,3 @@ def update_sensor(self,
305306
for exported_file in fails:
306307
print(f"Failed to archive '{exported_file}'")
307308
'''
308-
return

changehc/params.json.template

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,11 @@
1818
"aws_access_key_id": "",
1919
"aws_secret_access_key": ""
2020
},
21-
"bucket_name": ""
22-
}
21+
"bucket_name": "",
22+
"ftp_conn": {
23+
"host": "",
24+
"user": "",
25+
"pass": "",
26+
"port": 0
27+
}
28+
}

changehc/setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
"delphi-utils",
1212
"covidcast",
1313
"boto3",
14-
"moto"
14+
"moto",
15+
"paramiko"
1516
]
1617

1718
setup(

changehc/tests/params.json.template

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
"static_file_dir": "../static",
33
"export_dir": "./receiving",
44
"cache_dir": "./cache",
5-
"input_emr_file": "test_data/SYNICUE_CMB_INPATIENT_11062020.csv.gz",
6-
"input_claims_file": "test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz",
5+
"input_denom_file": "test_data/20200601_All_Outpatients_By_County.dat",
6+
"input_covid_file": "test_data/20200601_Covid_Outpatients_By_County.dat",
77
"start_date": "2020-02-01",
88
"end_date": "2020-02-02",
99
"drop_date": "2020-02-02",
@@ -19,4 +19,4 @@
1919
"aws_secret_access_key": "FAKE_TEST_SECRET_ACCESS_KEY"
2020
},
2121
"bucket_name": "test_bucket"
22-
}
22+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# standard
2+
import pytest
3+
import mock
4+
from datetime import datetime as dt
5+
from datetime import timedelta
6+
7+
# first party
8+
from delphi_changehc.download_ftp_files import *
9+
10+
class TestDownloadFTPFiles:
11+
12+
class MockSFTP:
13+
14+
# Mocks an SFTP connection
15+
def __init__(self, attrs):
16+
self.attrs = attrs
17+
self.num_gets = 0
18+
19+
# Attrs are modified time and filename
20+
def listdir_attr(self):
21+
return self.attrs
22+
23+
# Don't download anything, just note that method was called
24+
def get(self, infile, outfile, callback=None):
25+
self.num_gets += 1
26+
return
27+
28+
29+
class FileAttr:
30+
31+
def __init__(self, time, name):
32+
self.st_mtime = time
33+
self.filename = name
34+
35+
36+
@mock.patch("os.path")
37+
def test_get_files(self, mock_path):
38+
39+
# When one new file is present, one file is downloaded
40+
one_new = self.MockSFTP([self.FileAttr(dt.timestamp(dt.now()-timedelta(minutes=1)),"foo")])
41+
get_files_from_dir(one_new, "")
42+
assert one_new.num_gets == 1
43+
44+
# When one new file and one old file are present, one file is downloaded
45+
one_new_one_old = self.MockSFTP([self.FileAttr(dt.timestamp(dt.now()-timedelta(minutes=1)),"foo"),
46+
self.FileAttr(dt.timestamp(dt.now()-timedelta(days=10)),"foo")])
47+
get_files_from_dir(one_new_one_old, "")
48+
assert one_new_one_old.num_gets == 1
49+
50+
# When three new files are present, AssertionError
51+
new_file1 = self.FileAttr(dt.timestamp(dt.now()-timedelta(minutes=1)),"foo1")
52+
new_file2 = self.FileAttr(dt.timestamp(dt.now()-timedelta(minutes=1)),"foo2")
53+
new_file3 = self.FileAttr(dt.timestamp(dt.now()-timedelta(minutes=1)),"foo3")
54+
three_new = self.MockSFTP([new_file1, new_file2, new_file3])
55+
with pytest.raises(AssertionError):
56+
get_files_from_dir(three_new,"")
57+
58+
# When the file already exists, no files are downloaded
59+
mock_path.exists.return_value = True
60+
one_exists = self.MockSFTP([new_file1])
61+
get_files_from_dir(one_new, "")
62+
assert one_exists.num_gets == 0
63+

changehc/tests/test_update_sensor.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,6 @@ def test_handle_wip_signal(self):
268268
assert signal_names[0].startswith("wip_")
269269
assert all(not s.startswith("wip_") for s in signal_names[1:])
270270
# Test wip_signal = False (only unpublished signals should receive prefix)
271-
# No CHC signal is published now, so both should get prefix
272271
signal_names = add_prefix(["xyzzy", SIGNALS[0]], False)
273272
assert signal_names[0].startswith("wip_")
274-
assert all(s.startswith("wip_") for s in signal_names[1:])
273+
assert all(not s.startswith("wip_") for s in signal_names[1:])

0 commit comments

Comments
 (0)