-
Notifications
You must be signed in to change notification settings - Fork 16
CHC SFTP Downloads #352
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CHC SFTP Downloads #352
Changes from all commits
17bc7ab
e4343fc
638c578
571d51d
f115859
6d789d8
b310fd9
b27f6da
c8fe35f
7248fb4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,6 @@ | |
""" | ||
|
||
from datetime import datetime, timedelta | ||
import numpy as np | ||
|
||
|
||
class Config: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
""" | ||
Downloads files modified in the last 24 hours from the specified ftp server.""" | ||
|
||
# standard | ||
import datetime | ||
import functools | ||
from os import path | ||
|
||
# third party | ||
import paramiko | ||
|
||
|
||
def print_callback(filename, bytes_so_far, bytes_total): | ||
"""Log file transfer progress""" | ||
rough_percent_transferred = int(100 * (bytes_so_far / bytes_total)) | ||
if (rough_percent_transferred % 25) == 0: | ||
print(f'{filename} transfer: {rough_percent_transferred}%') | ||
|
||
|
||
def get_files_from_dir(sftp, out_path): | ||
"""Download files from sftp server that have been uploaded in last day | ||
Args: | ||
sftp: SFTP Session from Paramiko client | ||
out_path: Path to local directory into which to download the files | ||
""" | ||
|
||
current_time = datetime.datetime.now() | ||
|
||
# go through files in recieving dir | ||
filepaths_to_download = {} | ||
for fileattr in sftp.listdir_attr(): | ||
file_time = datetime.datetime.fromtimestamp(fileattr.st_mtime) | ||
filename = fileattr.filename | ||
if current_time - file_time < datetime.timedelta(days=1) and \ | ||
not path.exists(path.join(out_path, filename)): | ||
filepaths_to_download[filename] = path.join(out_path, filename) | ||
|
||
# make sure we don't download more than 2 files per day | ||
assert len(filepaths_to_download) <= 2, "more files dropped than expected" | ||
|
||
# download! | ||
for infile, outfile in filepaths_to_download.items(): | ||
callback_for_filename = functools.partial(print_callback, infile) | ||
sftp.get(infile, outfile, callback=callback_for_filename) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. callback is cool, didn't know of this feature in paramiko |
||
|
||
|
||
def download(out_path, ftp_conn): | ||
"""Downloads files necessary to create CHC signal from ftp server. | ||
Args: | ||
out_path: Path to local directory into which to download the files | ||
ftp_conn: Dict containing login credentials to ftp server | ||
""" | ||
|
||
# open client | ||
try: | ||
client = paramiko.SSHClient() | ||
client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) | ||
|
||
client.connect(ftp_conn["host"], username=ftp_conn["user"], | ||
password=ftp_conn["pass"][1:] + ftp_conn["pass"][0], | ||
port=ftp_conn["port"], | ||
allow_agent=False, look_for_keys=False) | ||
sftp = client.open_sftp() | ||
|
||
sftp.chdir('/dailycounts/All_Outpatients_By_County') | ||
get_files_from_dir(sftp, out_path) | ||
|
||
sftp.chdir('/dailycounts/Covid_Outpatients_By_County') | ||
get_files_from_dir(sftp, out_path) | ||
|
||
finally: | ||
if client: | ||
client.close() |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ | |
from delphi_utils import read_params | ||
|
||
# first party | ||
from .download_ftp_files import download | ||
from .update_sensor import CHCSensorUpdator | ||
|
||
|
||
|
@@ -25,11 +26,15 @@ def run_module(): | |
|
||
logging.basicConfig(level=logging.DEBUG) | ||
|
||
## download recent files from FTP server | ||
logging.info("downloading recent files through SFTP") | ||
download(params["cache_dir"], params["ftp_conn"]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this be mocked out for testing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 |
||
|
||
## get end date from input file | ||
# the filenames are expected to be in the format: | ||
# Denominator: "YYYYMMDD_All_Outpatients_By_County.dat.gz" | ||
# Numerator: "YYYYMMDD_Covid_Outpatients_By_County.dat.gz" | ||
|
||
if params["drop_date"] is None: | ||
dropdate_denom = datetime.strptime( | ||
Path(params["input_denom_file"]).name.split("_")[0], "%Y%m%d" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,7 +11,8 @@ | |
"delphi-utils", | ||
"covidcast", | ||
"boto3", | ||
"moto" | ||
"moto", | ||
"paramiko" | ||
] | ||
|
||
setup( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
# standard | ||
import pytest | ||
import mock | ||
from datetime import datetime as dt | ||
from datetime import timedelta | ||
|
||
# first party | ||
from delphi_changehc.download_ftp_files import * | ||
|
||
class TestDownloadFTPFiles: | ||
|
||
class MockSFTP: | ||
|
||
# Mocks an SFTP connection | ||
def __init__(self, attrs): | ||
self.attrs = attrs | ||
self.num_gets = 0 | ||
|
||
# Attrs are modified time and filename | ||
def listdir_attr(self): | ||
return self.attrs | ||
|
||
# Don't download anything, just note that method was called | ||
def get(self, infile, outfile, callback=None): | ||
self.num_gets += 1 | ||
return | ||
|
||
|
||
class FileAttr: | ||
|
||
def __init__(self, time, name): | ||
self.st_mtime = time | ||
self.filename = name | ||
|
||
|
||
@mock.patch("os.path") | ||
def test_get_files(self, mock_path): | ||
|
||
# When one new file is present, one file is downloaded | ||
one_new = self.MockSFTP([self.FileAttr(dt.timestamp(dt.now()-timedelta(minutes=1)),"foo")]) | ||
get_files_from_dir(one_new, "") | ||
assert one_new.num_gets == 1 | ||
|
||
# When one new file and one old file are present, one file is downloaded | ||
one_new_one_old = self.MockSFTP([self.FileAttr(dt.timestamp(dt.now()-timedelta(minutes=1)),"foo"), | ||
self.FileAttr(dt.timestamp(dt.now()-timedelta(days=10)),"foo")]) | ||
get_files_from_dir(one_new_one_old, "") | ||
assert one_new_one_old.num_gets == 1 | ||
|
||
# When three new files are present, AssertionError | ||
new_file1 = self.FileAttr(dt.timestamp(dt.now()-timedelta(minutes=1)),"foo1") | ||
new_file2 = self.FileAttr(dt.timestamp(dt.now()-timedelta(minutes=1)),"foo2") | ||
new_file3 = self.FileAttr(dt.timestamp(dt.now()-timedelta(minutes=1)),"foo3") | ||
three_new = self.MockSFTP([new_file1, new_file2, new_file3]) | ||
with pytest.raises(AssertionError): | ||
get_files_from_dir(three_new,"") | ||
|
||
# When the file already exists, no files are downloaded | ||
mock_path.exists.return_value = True | ||
one_exists = self.MockSFTP([new_file1]) | ||
get_files_from_dir(one_new, "") | ||
assert one_exists.num_gets == 0 | ||
|
Uh oh!
There was an error while loading. Please reload this page.