Skip to content

EMR: diff uploads #269

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 18, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions emr_hosp/delphi_emr_hosp/update_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from datetime import timedelta
from multiprocessing import Pool, cpu_count
import covidcast
from delphi_utils import read_params
from delphi_utils import GeoMapper, S3ArchiveDiffer, read_params

# third party
import numpy as np
Expand All @@ -21,7 +21,6 @@
from .weekday import Weekday
from .constants import SIGNALS, SMOOTHED, SMOOTHED_ADJ, HRR, NA, FIPS

from delphi_utils import GeoMapper

def write_to_csv(output_dict, write_se, out_name, output_path="."):
"""Write sensor values to csv.
Expand Down Expand Up @@ -281,4 +280,28 @@ def update_sensor(self,
for signal in self.updated_signal_names:
write_to_csv(output_dict, self.se, signal, outpath)
logging.debug(f"wrote files to {outpath}")
return True
return True
params = read_params()

arch_diff = S3ArchiveDiffer(
params["cache_dir"],
params["export_dir"],
params["bucket_name"], "emr",
params["aws_credentials"])
arch_diff.update_cache()

_, common_diffs, new_files = arch_diff.diff_exports()

# Archive changed and new files only
to_archive = [f for f, diff in common_diffs.items() if diff is not None]
to_archive += new_files
_, fails = arch_diff.archive_exports(to_archive)
print(fails)

# Filter existing exports to exclude those that failed to archive
succ_common_diffs = {f: diff for f, diff in common_diffs.items() if f not in fails}
arch_diff.filter_exports(succ_common_diffs)

# Report failures: someone should probably look at them
for exported_file in fails:
print(f"Failed to archive '{exported_file}'")
8 changes: 7 additions & 1 deletion emr_hosp/params.json.template
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"static_file_dir": "./static",
"export_dir": "./receiving",
"cache_dir": "./cache",
"input_emr_file": "./tests/test_data/SYNICUE_CMB_INPATIENT_11062020.csv.gz",
"input_claims_file": "./tests/test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz",
"start_date": "2020-02-01",
Expand All @@ -12,5 +13,10 @@
"parallel": false,
"geos": ["state", "msa", "hrr", "county"],
"weekday": [true, false],
"wip_signal": ""
"wip_signal": "",
"aws_credentials": {
"aws_access_key_id": "",
"aws_secret_access_key": ""
},
"bucket_name": ""
}
4 changes: 3 additions & 1 deletion emr_hosp/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
"pytest-cov",
"pylint",
"delphi-utils",
"covidcast"
"covidcast",
"boto3",
"moto"
]

setup(
Expand Down
21 changes: 17 additions & 4 deletions emr_hosp/tests/params.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,21 @@
"static_file_dir": "../static",
"export_dir": "./receiving",
"cache_dir": "./cache",
"input_emr_file": "test_data/SYNICUE_CMB_INPATIENT_11062020.csv.gz",
"input_claims_file": "test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz",
"drop_date": "2020-06-11",
"wip_signal": ""
"input_emr_file": "./tests/test_data/SYNICUE_CMB_INPATIENT_11062020.csv.gz",
"input_claims_file": "./tests/test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz",
"start_date": "2020-02-01",
"end_date": "2020-02-01",
"drop_date": null,
"n_backfill_days": 60,
"n_waiting_days": 3,
"se": false,
"parallel": false,
"geos": ["state", "msa", "hrr", "county"],
"weekday": [true, false],
"wip_signal": "",
"aws_credentials": {
"aws_access_key_id": "",
"aws_secret_access_key": ""
},
"bucket_name": ""
}
1 change: 1 addition & 0 deletions emr_hosp/tests/receiving/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.csv
13 changes: 13 additions & 0 deletions emr_hosp/tests/test_update_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
# third party
import pandas as pd
import numpy as np
from boto3 import Session
from moto import mock_s3
import pytest

# third party
from delphi_utils import read_params
Expand All @@ -17,6 +20,7 @@
from delphi_emr_hosp.constants import *
from delphi_emr_hosp.update_sensor import write_to_csv, add_prefix, EMRHospSensorUpdator
from delphi_emr_hosp.load_data import *
from delphi_emr_hosp.run import run_module

CONFIG = Config()
CONSTANTS = Constants()
Expand Down Expand Up @@ -259,3 +263,12 @@ def test_handle_wip_signal(self):
signal_names = add_prefix(["xyzzy", SIGNALS[0]], False)
assert signal_names[0].startswith("wip_")
assert all(not s.startswith("wip_") for s in signal_names[1:])

def test_bucket(self):
with mock_s3():
# Create the fake bucket we will be using
params = read_params()
aws_credentials = params["aws_credentials"]
s3_client = Session(**aws_credentials).client("s3")
s3_client.create_bucket(Bucket=params["bucket_name"])
run_module()