Skip to content

Commit 137f795

Browse files
committed
Merge branch 'main' of github.com:cmu-delphi/covidcast-indicators into signals
2 parents d73100f + d947887 commit 137f795

File tree

30 files changed

+306
-73
lines changed

30 files changed

+306
-73
lines changed

changehc/delphi_changehc/config.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
"""
77

88
from datetime import datetime, timedelta
9-
import numpy as np
109

1110

1211
class Config:

changehc/delphi_changehc/constants.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""Registry for signal names and geo types"""
2-
SMOOTHED = "smoothed_chc"
3-
SMOOTHED_ADJ = "smoothed_adj_chc"
2+
SMOOTHED = "smoothed_cli"
3+
SMOOTHED_ADJ = "smoothed_adj_cli"
44
SIGNALS = [SMOOTHED, SMOOTHED_ADJ]
55
NA = "NA"
66
HRR = "hrr"
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
"""
2+
Downloads files modified in the last 24 hours from the specified ftp server."""
3+
4+
# standard
5+
import datetime
6+
import functools
7+
from os import path
8+
9+
# third party
10+
import paramiko
11+
12+
13+
def print_callback(filename, bytes_so_far, bytes_total):
14+
"""Log file transfer progress"""
15+
rough_percent_transferred = int(100 * (bytes_so_far / bytes_total))
16+
if (rough_percent_transferred % 25) == 0:
17+
print(f'{filename} transfer: {rough_percent_transferred}%')
18+
19+
20+
def get_files_from_dir(sftp, out_path):
21+
"""Download files from sftp server that have been uploaded in last day
22+
Args:
23+
sftp: SFTP Session from Paramiko client
24+
out_path: Path to local directory into which to download the files
25+
"""
26+
27+
current_time = datetime.datetime.now()
28+
29+
# go through files in recieving dir
30+
filepaths_to_download = {}
31+
for fileattr in sftp.listdir_attr():
32+
file_time = datetime.datetime.fromtimestamp(fileattr.st_mtime)
33+
filename = fileattr.filename
34+
if current_time - file_time < datetime.timedelta(days=1) and \
35+
not path.exists(path.join(out_path, filename)):
36+
filepaths_to_download[filename] = path.join(out_path, filename)
37+
38+
# make sure we don't download more than 2 files per day
39+
assert len(filepaths_to_download) <= 2, "more files dropped than expected"
40+
41+
# download!
42+
for infile, outfile in filepaths_to_download.items():
43+
callback_for_filename = functools.partial(print_callback, infile)
44+
sftp.get(infile, outfile, callback=callback_for_filename)
45+
46+
47+
def download(out_path, ftp_conn):
48+
"""Downloads files necessary to create CHC signal from ftp server.
49+
Args:
50+
out_path: Path to local directory into which to download the files
51+
ftp_conn: Dict containing login credentials to ftp server
52+
"""
53+
54+
# open client
55+
try:
56+
client = paramiko.SSHClient()
57+
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
58+
59+
client.connect(ftp_conn["host"], username=ftp_conn["user"],
60+
password=ftp_conn["pass"][1:] + ftp_conn["pass"][0],
61+
port=ftp_conn["port"],
62+
allow_agent=False, look_for_keys=False)
63+
sftp = client.open_sftp()
64+
65+
sftp.chdir('/dailycounts/All_Outpatients_By_County')
66+
get_files_from_dir(sftp, out_path)
67+
68+
sftp.chdir('/dailycounts/Covid_Outpatients_By_County')
69+
get_files_from_dir(sftp, out_path)
70+
71+
finally:
72+
if client:
73+
client.close()

changehc/delphi_changehc/run.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from delphi_utils import read_params
1515

1616
# first party
17+
from .download_ftp_files import download
1718
from .update_sensor import CHCSensorUpdator
1819

1920

@@ -25,11 +26,15 @@ def run_module():
2526

2627
logging.basicConfig(level=logging.DEBUG)
2728

29+
## download recent files from FTP server
30+
logging.info("downloading recent files through SFTP")
31+
download(params["cache_dir"], params["ftp_conn"])
32+
2833
## get end date from input file
2934
# the filenames are expected to be in the format:
3035
# Denominator: "YYYYMMDD_All_Outpatients_By_County.dat.gz"
3136
# Numerator: "YYYYMMDD_Covid_Outpatients_By_County.dat.gz"
32-
37+
3338
if params["drop_date"] is None:
3439
dropdate_denom = datetime.strptime(
3540
Path(params["input_denom_file"]).name.split("_")[0], "%Y%m%d"

changehc/delphi_changehc/sensor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ def fit(y_data, first_sensor_date, geo_id, num_col="num", den_col="den"):
129129

130130
# checks - due to the smoother, the first value will be NA
131131
assert (
132-
np.sum(np.isnan(smoothed_total_rates[1:]) == True) == 0
132+
np.sum(np.isnan(smoothed_total_rates[1:])) == 0
133133
), "NAs in rate calculation"
134134
assert (
135135
np.sum(smoothed_total_rates[1:] <= 0) == 0

changehc/delphi_changehc/update_sensor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ def write_to_csv(output_dict, write_se, out_name, output_path="."):
6868

6969

7070
class CHCSensorUpdator:
71+
"""Contains methods to update sensor and write results to csv
72+
"""
7173

7274
def __init__(self,
7375
startdate,

changehc/delphi_changehc/weekday.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
# third party
88
import cvxpy as cp
99
import numpy as np
10+
from cvxpy.error import SolverError
1011

1112
# first party
1213
from .config import Config
@@ -83,7 +84,7 @@ def get_params(data):
8384
try:
8485
prob = cp.Problem(cp.Minimize(-ll + lmbda * penalty))
8586
_ = prob.solve()
86-
except:
87+
except SolverError:
8788
# If the magnitude of the objective function is too large, an error is
8889
# thrown; Rescale the objective function
8990
prob = cp.Problem(cp.Minimize((-ll + lmbda * penalty) / 1e5))

changehc/params.json.template

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,11 @@
1818
"aws_access_key_id": "",
1919
"aws_secret_access_key": ""
2020
},
21-
"bucket_name": ""
22-
}
21+
"bucket_name": "",
22+
"ftp_conn": {
23+
"host": "",
24+
"user": "",
25+
"pass": "",
26+
"port": 0
27+
}
28+
}

changehc/setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
"delphi-utils",
1212
"covidcast",
1313
"boto3",
14-
"moto"
14+
"moto",
15+
"paramiko"
1516
]
1617

1718
setup(

changehc/tests/params.json.template

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
"static_file_dir": "../static",
33
"export_dir": "./receiving",
44
"cache_dir": "./cache",
5-
"input_emr_file": "test_data/SYNICUE_CMB_INPATIENT_11062020.csv.gz",
6-
"input_claims_file": "test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz",
5+
"input_denom_file": "test_data/20200601_All_Outpatients_By_County.dat",
6+
"input_covid_file": "test_data/20200601_Covid_Outpatients_By_County.dat",
77
"start_date": "2020-02-01",
88
"end_date": "2020-02-02",
99
"drop_date": "2020-02-02",
@@ -19,4 +19,4 @@
1919
"aws_secret_access_key": "FAKE_TEST_SECRET_ACCESS_KEY"
2020
},
2121
"bucket_name": "test_bucket"
22-
}
22+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# standard
2+
import pytest
3+
import mock
4+
from datetime import datetime as dt
5+
from datetime import timedelta
6+
7+
# first party
8+
from delphi_changehc.download_ftp_files import *
9+
10+
class TestDownloadFTPFiles:
11+
12+
class MockSFTP:
13+
14+
# Mocks an SFTP connection
15+
def __init__(self, attrs):
16+
self.attrs = attrs
17+
self.num_gets = 0
18+
19+
# Attrs are modified time and filename
20+
def listdir_attr(self):
21+
return self.attrs
22+
23+
# Don't download anything, just note that method was called
24+
def get(self, infile, outfile, callback=None):
25+
self.num_gets += 1
26+
return
27+
28+
29+
class FileAttr:
30+
31+
def __init__(self, time, name):
32+
self.st_mtime = time
33+
self.filename = name
34+
35+
36+
@mock.patch("os.path")
37+
def test_get_files(self, mock_path):
38+
39+
# When one new file is present, one file is downloaded
40+
one_new = self.MockSFTP([self.FileAttr(dt.timestamp(dt.now()-timedelta(minutes=1)),"foo")])
41+
get_files_from_dir(one_new, "")
42+
assert one_new.num_gets == 1
43+
44+
# When one new file and one old file are present, one file is downloaded
45+
one_new_one_old = self.MockSFTP([self.FileAttr(dt.timestamp(dt.now()-timedelta(minutes=1)),"foo"),
46+
self.FileAttr(dt.timestamp(dt.now()-timedelta(days=10)),"foo")])
47+
get_files_from_dir(one_new_one_old, "")
48+
assert one_new_one_old.num_gets == 1
49+
50+
# When three new files are present, AssertionError
51+
new_file1 = self.FileAttr(dt.timestamp(dt.now()-timedelta(minutes=1)),"foo1")
52+
new_file2 = self.FileAttr(dt.timestamp(dt.now()-timedelta(minutes=1)),"foo2")
53+
new_file3 = self.FileAttr(dt.timestamp(dt.now()-timedelta(minutes=1)),"foo3")
54+
three_new = self.MockSFTP([new_file1, new_file2, new_file3])
55+
with pytest.raises(AssertionError):
56+
get_files_from_dir(three_new,"")
57+
58+
# When the file already exists, no files are downloaded
59+
mock_path.exists.return_value = True
60+
one_exists = self.MockSFTP([new_file1])
61+
get_files_from_dir(one_new, "")
62+
assert one_exists.num_gets == 0
63+

claims_hosp/delphi_claims_hosp/weekday.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
# third party
88
import cvxpy as cp
99
import numpy as np
10+
from cvxpy.error import SolverError
1011

1112
# first party
1213
from .config import Config
@@ -83,7 +84,7 @@ def get_params(data):
8384
try:
8485
prob = cp.Problem(cp.Minimize(-ll + lmbda * penalty))
8586
_ = prob.solve()
86-
except:
87+
except SolverError:
8788
# If the magnitude of the objective function is too large, an error is
8889
# thrown; Rescale the objective function
8990
prob = cp.Problem(cp.Minimize((-ll + lmbda * penalty) / 1e5))

emr_hosp/delphi_emr_hosp/weekday.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
# third party
88
import cvxpy as cp
99
import numpy as np
10+
from cvxpy.error import SolverError
1011

1112
# first party
1213
from .config import Config
@@ -83,7 +84,7 @@ def get_params(data):
8384
try:
8485
prob = cp.Problem(cp.Minimize(-ll + lmbda * penalty))
8586
_ = prob.solve()
86-
except:
87+
except SolverError:
8788
# If the magnitude of the objective function is too large, an error is
8889
# thrown; Rescale the objective function
8990
prob = cp.Problem(cp.Minimize((-ll + lmbda * penalty) / 1e5))

safegraph/delphi_safegraph/process.py

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# Base file name for raw data CSVs.
1616
CSV_NAME = 'social-distancing.csv.gz'
1717

18+
1819
def validate(df):
1920
"""Confirms that a data frame has only one date."""
2021
timestamps = df['date_range_start'].apply(date_from_timestamp)
@@ -181,13 +182,13 @@ def process_window(df_list: List[pd.DataFrame],
181182
f'{signal}_se': 'se',
182183
f'{signal}_n': 'sample_size',
183184
}, axis=1)
184-
df_export.to_csv(f'{export_dir}/{date}_{geo_res}_{signal}.csv',
185+
date_str = date.strftime('%Y%m%d')
186+
df_export.to_csv(f'{export_dir}/{date_str}_{geo_res}_{signal}.csv',
185187
na_rep='NA',
186188
index=False, )
187189

188190

189-
def process(current_filename: str,
190-
previous_filenames: List[str],
191+
def process(filenames: List[str],
191192
signal_names: List[str],
192193
wip_signal,
193194
geo_resolutions: List[str],
@@ -196,11 +197,11 @@ def process(current_filename: str,
196197
as averaged over the previous week.
197198
Parameters
198199
----------
199-
current_filename: str
200-
path to file holding the target date's data.
201-
previous_filenames: List[str]
202-
paths to files holding data from each day in the week preceding the
203-
target date.
200+
current_filename: List[str]
201+
paths to files holding data.
202+
The first entry of the list should correspond to the target date while
203+
the remaining entries should correspond to the dates from each day in
204+
the week preceding the target date.
204205
signal_names: List[str]
205206
signal names to be processed for a single date.
206207
A second version of each such signal named {SIGNAL}_7d_avg will be
@@ -220,8 +221,8 @@ def process(current_filename: str,
220221
one for the data averaged over the previous week to
221222
{export_dir}/{date}_{resolution}_{signal}_7d_avg.csv.
222223
"""
223-
past_week = [pd.read_csv(current_filename)]
224-
for fname in previous_filenames:
224+
past_week = []
225+
for fname in filenames:
225226
if os.path.exists(fname):
226227
past_week.append(pd.read_csv(fname))
227228

@@ -232,8 +233,8 @@ def process(current_filename: str,
232233
export_dir)
233234
# ...then as part of the whole window.
234235
process_window(past_week,
235-
add_prefix(add_suffix(signal_names, '_7d_avg'),
236-
wip_signal,
237-
'wip_'),
238-
geo_resolutions,
239-
export_dir)
236+
add_prefix(add_suffix(signal_names, '_7d_avg'),
237+
wip_signal,
238+
'wip_'),
239+
geo_resolutions,
240+
export_dir)

0 commit comments

Comments
 (0)