Skip to content

Fix/jhu nyc support #272

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 69 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
2fd5c72
Add new configuration to set up safegraph deployment branch (#182)
korlaxxalrok Aug 5, 2020
4183fd3
Add missing path parameter
korlaxxalrok Aug 5, 2020
4ded043
Fix variable name, reencrypt vault.yaml
korlaxxalrok Aug 5, 2020
b566d4c
try until pull the new data successfully
Aug 24, 2020
8db2799
used tenacity
Aug 26, 2020
c9b16a5
Add .gitignore
vishakha1812 Aug 26, 2020
f969d96
Update params.json.template
vishakha1812 Aug 26, 2020
56db650
Update run.py
vishakha1812 Aug 26, 2020
775efe1
Update setup.py
vishakha1812 Aug 26, 2020
df20f05
Update test_update_sensor.py
vishakha1812 Aug 26, 2020
9295eac
Update update_sensor.py
vishakha1812 Aug 26, 2020
786cef4
Add constants.py
vishakha1812 Aug 26, 2020
3391ff3
Add static files
vishakha1812 Aug 26, 2020
40091b7
Minor changes
vishakha1812 Aug 26, 2020
db54eb8
removed wildcard import error
vishakha1812 Aug 27, 2020
2a9a68f
update params.json.template
vishakha1812 Aug 27, 2020
319f038
Add wip_signal key in ansible
vishakha1812 Aug 27, 2020
17fb5ae
Add missing receiving directory
vishakha1812 Aug 27, 2020
8a40c1c
Merge pull request #239 from cmu-delphi/main
krivard Aug 27, 2020
dd4085b
Merge branch 'deploy-google_health' into fix_ght_pulling
vishakha1812 Aug 27, 2020
5a4d298
Merge pull request #252 from cmu-delphi/deploy-jhu
krivard Aug 28, 2020
65490cc
Add requirement tenacity
Aug 28, 2020
d4b6f69
fixed invalid character
Aug 28, 2020
a00c125
add prefix wip
Jul 27, 2020
15bf81e
update test_run for wip signals
Jul 27, 2020
6dcd1bc
update signal names and export end date
Jul 30, 2020
df302b3
fixed errors in case:no new data
Aug 1, 2020
f8f0bbe
update signal names in DETAILS
Aug 3, 2020
d883c0c
update DETAILS
Aug 4, 2020
e0a6124
remove prefix wip_
Aug 4, 2020
156476f
update the export start date to generate reports from -45 days to -5days
Aug 4, 2020
c730742
deleted white spaces'
Aug 4, 2020
aefcfb5
Update Exceptions
jingjtang Aug 5, 2020
3457802
add wip test_per_device)
Aug 6, 2020
de7b82d
update unit tests
Aug 13, 2020
779c5d6
Added new constants.py file
vishakha1812 Aug 14, 2020
7c50aa1
Updated run.py: added new usecase for signal names
vishakha1812 Aug 14, 2020
e6d1ff6
Updated params.json.template
vishakha1812 Aug 14, 2020
4ffff29
Updated setup.py
vishakha1812 Aug 14, 2020
b3f8977
Updated tests/params.json.template
vishakha1812 Aug 14, 2020
cf2f725
Added new test case to check signal names
vishakha1812 Aug 14, 2020
bfb422e
Updated test_run.py to include new sensor names
vishakha1812 Aug 14, 2020
baac1ca
Added new file to handle signal naming
vishakha1812 Aug 14, 2020
566e7d7
Added missing files in /static
vishakha1812 Aug 14, 2020
e62f772
Update quidel_covidtest/params.json.template
vishakha1812 Aug 18, 2020
148fa7b
Update quidel_covidtest/tests/params.json.template
vishakha1812 Aug 18, 2020
e52e9b0
added a dry-run mode
Aug 18, 2020
71cb57d
Add files via upload
jingjtang Aug 19, 2020
16d6de4
Delete test_data_tools.py
jingjtang Aug 19, 2020
b0814ba
Add files via upload
jingjtang Aug 19, 2020
fd66b79
resolved a conflict caused by accident
Aug 19, 2020
3637c8e
Solved the problems in pylint test
Aug 22, 2020
9d22aa0
Added explainations to TestDate and StorageDate
Aug 26, 2020
a18184e
commented out test_per_device signals
Aug 26, 2020
d01ade2
Fixed the error in the documentation of se
jingjtang Aug 28, 2020
2167231
uploaded test_data for unit tests
Aug 28, 2020
43df798
Merge pull request #193 from cmu-delphi/quidel_covidtest
krivard Aug 28, 2020
fbe27a8
Merge latest from main
korlaxxalrok Aug 31, 2020
130a9a7
Minor changes
vishakha1812 Sep 1, 2020
b450f42
Minor change in update_sensor.py
vishakha1812 Sep 2, 2020
a6e07b8
Update test_update_sensor.py
vishakha1812 Sep 2, 2020
008bd9b
Merge pull request #250 from cmu-delphi/wip_emr
krivard Sep 3, 2020
3553e00
Merge pull request #247 from cmu-delphi/fix_ght_pulling
krivard Sep 3, 2020
a7a8e6a
Merge pull request #263 from cmu-delphi/deploy-google_health
krivard Sep 3, 2020
3988a3b
Set up production necessities for deploying Safegraph (#259)
korlaxxalrok Sep 3, 2020
c6bafc0
Finalize production setup for deploying Safegraph (#264)
korlaxxalrok Sep 4, 2020
55a41fd
Merge pull request #266 from cmu-delphi/deploy-safegraph
krivard Sep 8, 2020
dd20817
remove old static file that is no longer used
huisaddison Sep 11, 2020
6fda1b2
hotfix for nyc boroughs
huisaddison Sep 11, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions ansible/templates/safegraph-params-prod.json.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"static_file_dir": "./static",
"raw_data_dir": "/common/safegraph",
"export_dir": "./receiving",
"cache_dir": "./cache",
"n_core": "12",
"aws_access_key_id": "{{ safegraph_aws_access_key_id }}",
"aws_secret_access_key": "{{ safegraph_aws_secret_access_key }}",
"aws_default_region": "us-east-1",
"aws_endpoint": "https://s3.wasabisys.com",
"wip_signal": ""
}
3 changes: 2 additions & 1 deletion ansible/vars.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ pyenv_python_path: "/home/{{ runtime_user }}/.pyenv/versions/{{ python_version }
google_health_api_key: "{{ vault_google_health_api_key }}"
delphi_aws_access_key_id: "{{ vault_delphi_aws_access_key_id }}"
delphi_aws_secret_access_key: "{{ vault_delphi_aws_secret_access_key }}"

safegraph_aws_access_key_id: "{{ vault_safegraph_aws_access_key_id }}"
safegraph_aws_secret_access_key: "{{ vault_safegraph_aws_secret_access_key }}"
36 changes: 21 additions & 15 deletions ansible/vault.yaml
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
$ANSIBLE_VAULT;1.1;AES256
66386163643862646634343162646465663762643034303563333833633661333932646164656462
6166646131623132393238336263623562373065643633310a663232373237396361623462613333
62373663383565623263306539636431623230633065626363666531366662363065343066363031
3738616663336665340a326138333634306137363837396366303861663064326333613662656630
62306331646637326637363766366237663037306665343761643263646663316535343561623137
63313365653535393639626465343232396261643239303430383138633135346466323834336665
33633064353034613836313265613466623961373565363835343430373138376336363966316365
35663664396436313432376264316663326130306134326231303234393561643436623039613136
63366638396262383762383336643930343661636461646162653734336334306239383132643435
39333665643738643966356431333830646561353263353063326330643731616130396466343339
39346437653063303336626663623835613938633834396430353634383366386237353862643766
37393738353231666565303031393839306463373461393761653866653330646534393832303264
30323038646166366465396235623731343539313633326539663966333437623733626131653437
62326632656462383835656235373664366566343866383938343639613737623631616231616135
633863383761366461363532353137323936
39633436633363346633333638666438316131313337333132396634393538383432623239316463
3435333238376331383439366161656639353039326163370a376363633535623363383233646533
63363865646561323132663032383331346332373364333465643330616638623466333039623831
6530663236313234360a336264656239383166663934303335386238386139386132626165386138
32663164326237323534636263663263666634383339613362633939323565356437663666653436
31353362316334313561333430626361616337643133346664636434313664373333653839323630
65346331383135656135386263643564333063626563336365333865663333353337393866666139
64613735663363323938633161666662653161633835383832656164343836383339376661396332
66353131373265373931366130383632633466363036373562363232663162333966316563373535
65343336363732303132366335616335333334373063313562336330336661353239646533356461
62313365633336613037626261353639323937363066363062356234653631346233373965636461
63326237663537363338346566326232353632663463386135393535343436373335393430393865
33393631623762636230656263363462346561323064653561393666373735313836666238323238
66366564666266343636663666386566336637373036633966643961346636373066356632326464
63336565656666336436383938346461646431353265353133633736363761623634346262616436
61653633326333356330626638386665313865343233393637623662383634346534326537623662
34326633623431343835346339656335386330333664373166313766366339663736376261343965
63616461666230616131326537373130313239663931313330356538356161333537666237376362
64613232333834303737323438616437303666643166383439393030316533343530363863613034
39653761626439356133393164363561316535633230633438316137623333376633663665393634
63333161376263613766353030616336386531303565346263366239653232333764
7 changes: 7 additions & 0 deletions emr_hosp/delphi_emr_hosp/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""Registry for signal names and geo types"""
SMOOTHED = "smoothed_covid19"
SMOOTHED_ADJ = "smoothed_adj_covid19"
SIGNALS = [SMOOTHED, SMOOTHED_ADJ]
NA = "NA"
HRR = "hrr"
FIPS = "fips"
4 changes: 1 addition & 3 deletions emr_hosp/delphi_emr_hosp/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ def run_module():
logging.info("parallel:\t\t%s", params["parallel"])
logging.info("weekday:\t\t%s", params["weekday"])
logging.info("se:\t\t\t%s", params["se"])
logging.info("prefix:\t\t%s", params["obfuscated_prefix"])

## start generating
for geo in params["geos"]:
Expand All @@ -85,8 +84,7 @@ def run_module():
geo,
params["parallel"],
weekday,
params["se"],
params["obfuscated_prefix"]
params["se"]
)
su_inst.update_sensor(
params["input_emr_file"],
Expand Down
117 changes: 71 additions & 46 deletions emr_hosp/delphi_emr_hosp/update_sensor.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,44 @@
"""
Generate EMR-hosp sensors.

Author: Maria Jahja
Created: 2020-06-01
"""

# standard packages
import logging
from datetime import timedelta
from multiprocessing import Pool, cpu_count
import covidcast
from delphi_utils import read_params

# third party
import numpy as np
import pandas as pd

# first party
from .config import Config, Constants
from .geo_maps import GeoMaps
from .load_data import load_combined_data
from .sensor import EMRHospSensor
from .weekday import Weekday
from .constants import SIGNALS, SMOOTHED, SMOOTHED_ADJ, HRR, NA, FIPS

from delphi_utils import GeoMapper


def write_to_csv(output_dict, write_se, out_name, output_path="."):
"""Write sensor values to csv.

Args:
output_dict: dictionary containing sensor rates, se, unique dates, and unique geo_id
write_se: boolean to write out standard errors, if true, use an obfuscated name
out_name: name of the output file
output_path: outfile path to write the csv (default is current directory)
"""

if write_se:
logging.info(f"========= WARNING: WRITING SEs TO {out_name} =========")

geo_level = output_dict["geo_level"]
dates = output_dict["dates"]
geo_ids = output_dict["geo_ids"]
all_rates = output_dict["rates"]
all_se = output_dict["se"]
all_include = output_dict["include"]

out_n = 0
for i, d in enumerate(dates):
filename = "%s/%s_%s_%s.csv" % (
Expand All @@ -52,33 +47,83 @@ def write_to_csv(output_dict, write_se, out_name, output_path="."):
geo_level,
out_name,
)

with open(filename, "w") as outfile:
outfile.write("geo_id,val,se,direction,sample_size\n")

for geo_id in geo_ids:
sensor = all_rates[geo_id][i]
se = all_se[geo_id][i]

if all_include[geo_id][i]:
assert not np.isnan(sensor), "value for included sensor is nan"
assert not np.isnan(se), "se for included sensor is nan"
if sensor > 90:
logging.warning(f"value suspiciously high, {geo_id}: {sensor}")
assert se < 5, f"se suspiciously high, {geo_id}: {se}"

if write_se:
assert sensor > 0 and se > 0, "p=0, std_err=0 invalid"
outfile.write(
"%s,%f,%s,%s,%s\n" % (geo_id, sensor, se, "NA", "NA"))
"%s,%f,%s,%s,%s\n" % (geo_id, sensor, se, NA, NA))
else:
# for privacy reasons we will not report the standard error
outfile.write(
"%s,%f,%s,%s,%s\n" % (geo_id, sensor, "NA", "NA", "NA")
"%s,%f,%s,%s,%s\n" % (geo_id, sensor, NA, NA, NA)
)
out_n += 1
logging.debug(f"wrote {out_n} rows for {len(geo_ids)} {geo_level}")


def add_prefix(signal_names, wip_signal, prefix="wip_"):
"""Adds prefix to signal if there is a WIP signal
Parameters
----------
signal_names: List[str]
Names of signals to be exported
wip_signal : List[str] or bool
a list of wip signals: [], OR
all signals in the registry: True OR
only signals that have never been published: False
prefix : 'wip_'
prefix for new/non public signals
Returns
-------
List of signal names
wip/non wip signals for further computation
"""
if wip_signal is True:
return [prefix + signal for signal in signal_names]
if isinstance(wip_signal, list):
make_wip = set(wip_signal)
return [
prefix + signal if signal in make_wip else signal
for signal in signal_names
]
if wip_signal in {False, ""}:
return [
signal if public_signal(signal)
else prefix + signal
for signal in signal_names
]
raise ValueError("Supply True | False or '' or [] | list()")


def public_signal(signal_):
"""Checks if the signal name is already public using COVIDcast
Parameters
----------
signal_ : str
Name of the signal
Returns
-------
bool
True if the signal is present
False if the signal is not present
"""
epidata_df = covidcast.metadata()
for index in range(len(epidata_df)):
if epidata_df['signal'][index] == signal_:
return True
return False


class EMRHospSensorUpdator:

def __init__(self,
Expand All @@ -88,10 +133,8 @@ def __init__(self,
geo,
parallel,
weekday,
se,
prefix=None):
se):
"""Init Sensor Updator

Args:
startdate: first sensor date (YYYY-mm-dd)
enddate: last sensor date (YYYY-mm-dd)
Expand All @@ -100,11 +143,8 @@ def __init__(self,
parallel: boolean to run the sensor update in parallel
weekday: boolean to adjust for weekday effects
se: boolean to write out standard errors, if true, use an obfuscated name
prefix: string to prefix to output files (used for obfuscation in producing SEs)

"""
self.startdate, self.enddate, self.dropdate = [pd.to_datetime(t) for t in (startdate, enddate, dropdate)]

# handle dates
assert (self.startdate > (Config.FIRST_DATA_DATE + Config.BURN_IN_PERIOD)
), f"not enough data to produce estimates starting {self.startdate}"
Expand All @@ -114,32 +154,28 @@ def __init__(self,
self.geo, self.parallel, self.weekday, self.se = geo.lower(), parallel, weekday, se

# output file naming
out_name = "smoothed_adj_covid19" if self.weekday else "smoothed_covid19"
if se:
assert prefix is not None, "supply obfuscated prefix in params"
out_name = prefix + "_" + out_name
self.output_filename = out_name

signals = SIGNALS.copy()
signals.remove(SMOOTHED if self.weekday else SMOOTHED_ADJ)
signal_names = add_prefix(
signals,
wip_signal=read_params()["wip_signal"])
self.updated_signal_names = signal_names

def shift_dates(self):
"""shift estimates forward to account for time lag, compute burnindates, sensordates
"""

drange = lambda s, e: pd.date_range(start=s,periods=(e-s).days,freq='D')
self.startdate = self.startdate - Config.DAY_SHIFT
self.burnindate = self.startdate - Config.BURN_IN_PERIOD
self.fit_dates = drange(Config.FIRST_DATA_DATE, self.dropdate)
self.burn_in_dates = drange(self.burnindate, self.dropdate)
self.sensor_dates = drange(self.startdate, self.enddate)
return True

def geo_reindex(self,data):
"""Reindex based on geography, include all date, geo pairs

Args:
data: dataframe, the output of loadcombineddata
staticpath: path for the static geographic files

Returns:
dataframe
"""
Expand All @@ -157,92 +193,80 @@ def geo_reindex(self,data):
else:
logging.error(f"{geo} is invalid, pick one of 'county', 'state', 'msa', 'hrr'")
return False

self.unique_geo_ids = pd.unique(data_frame[geo])
data_frame.set_index([geo,'date'],inplace=True)

# for each location, fill in all missing dates with 0 values
multiindex = pd.MultiIndex.from_product((self.unique_geo_ids, self.fit_dates),
names=[geo, "date"])
assert (len(multiindex) <= (Constants.MAX_GEO[geo] * len(self.fit_dates))
), "more loc-date pairs than maximum number of geographies x number of dates"

# fill dataframe with missing dates using 0
data_frame = data_frame.reindex(multiindex, fill_value=0)
data_frame.fillna(0, inplace=True)
return data_frame



def update_sensor(self,
emr_filepath,
claims_filepath,
outpath,
staticpath):
"""Generate sensor values, and write to csv format.

Args:
emr_filepath: path to the aggregated EMR data
claims_filepath: path to the aggregated claims data
outpath: output path for the csv results
staticpath: path for the static geographic files
"""

self.shift_dates()
final_sensor_idxs = (self.burn_in_dates >= self.startdate) & (self.burn_in_dates <= self.enddate)

# load data
## JS: If the data is in fips then can we also put it into hrr?
base_geo = "hrr" if self.geo == "hrr" else "fips"
base_geo = HRR if self.geo == HRR else FIPS
data = load_combined_data(emr_filepath, claims_filepath, self.dropdate, base_geo)

data.reset_index(inplace=True)
data_frame = self.geo_reindex(data)

# handle if we need to adjust by weekday
wd_params = Weekday.get_params(data_frame) if self.weekday else None

# run sensor fitting code (maybe in parallel)
sensor_rates = {}
sensor_se = {}
sensor_include = {}
if not self.parallel:
for geo_id, sub_data in data_frame.groupby(level=0):
sub_data.reset_index(level=0,inplace=True)

if self.weekday:
sub_data = Weekday.calc_adjustment(wd_params, sub_data)

res = EMRHospSensor.fit(sub_data, self.burnindate, geo_id)
res = pd.DataFrame(res)
sensor_rates[geo_id] = np.array(res.loc[final_sensor_idxs,"rate"])
sensor_se[geo_id] = np.array(res.loc[final_sensor_idxs,"se"])
sensor_include[geo_id] = np.array(res.loc[final_sensor_idxs,"incl"])

else:
n_cpu = min(10, cpu_count())
logging.debug(f"starting pool with {n_cpu} workers")

with Pool(n_cpu) as pool:
pool_results = []
for geo_id, sub_data in data_frame.groupby(level=0,as_index=False):
sub_data.reset_index(level=0, inplace=True)
if self.weekday:
sub_data = Weekday.calc_adjustment(wd_params, sub_data)

pool_results.append(
pool.apply_async(
EMRHospSensor.fit, args=(sub_data, self.burnindate, geo_id,),
)
)
pool_results = [proc.get() for proc in pool_results]

for res in pool_results:
geo_id = res["geo_id"]
res = pd.DataFrame(res)
sensor_rates[geo_id] = np.array(res.loc[final_sensor_idxs, "rate"])
sensor_se[geo_id] = np.array(res.loc[final_sensor_idxs, "se"])
sensor_include[geo_id] = np.array(res.loc[final_sensor_idxs, "incl"])

unique_geo_ids = list(sensor_rates.keys())
output_dict = {
"rates": sensor_rates,
Expand All @@ -254,6 +278,7 @@ def update_sensor(self,
}

# write out results
write_to_csv(output_dict, self.se, self.output_filename, outpath)
for signal in self.updated_signal_names:
write_to_csv(output_dict, self.se, signal, outpath)
logging.debug(f"wrote files to {outpath}")
return True
return True
Loading