diff --git a/emr_hosp/delphi_emr_hosp/update_sensor.py b/emr_hosp/delphi_emr_hosp/update_sensor.py index e85694f40..f38f11e23 100644 --- a/emr_hosp/delphi_emr_hosp/update_sensor.py +++ b/emr_hosp/delphi_emr_hosp/update_sensor.py @@ -8,7 +8,7 @@ from datetime import timedelta from multiprocessing import Pool, cpu_count import covidcast -from delphi_utils import read_params +from delphi_utils import GeoMapper, S3ArchiveDiffer, read_params # third party import numpy as np @@ -21,7 +21,6 @@ from .weekday import Weekday from .constants import SIGNALS, SMOOTHED, SMOOTHED_ADJ, HRR, NA, FIPS -from delphi_utils import GeoMapper def write_to_csv(output_dict, write_se, out_name, output_path="."): """Write sensor values to csv. @@ -281,4 +280,27 @@ def update_sensor(self, for signal in self.updated_signal_names: write_to_csv(output_dict, self.se, signal, outpath) logging.debug(f"wrote files to {outpath}") - return True \ No newline at end of file + params = read_params() + + arch_diff = S3ArchiveDiffer( + params["cache_dir"], + params["export_dir"], + params["bucket_name"], "emr", + params["aws_credentials"]) + arch_diff.update_cache() + + _, common_diffs, new_files = arch_diff.diff_exports() + + # Archive changed and new files only + to_archive = [f for f, diff in common_diffs.items() if diff is not None] + to_archive += new_files + _, fails = arch_diff.archive_exports(to_archive) + print(fails) + + # Filter existing exports to exclude those that failed to archive + succ_common_diffs = {f: diff for f, diff in common_diffs.items() if f not in fails} + arch_diff.filter_exports(succ_common_diffs) + + # Report failures: someone should probably look at them + for exported_file in fails: + print(f"Failed to archive '{exported_file}'") diff --git a/emr_hosp/params.json.template b/emr_hosp/params.json.template index 3928bafb1..33ebf114b 100644 --- a/emr_hosp/params.json.template +++ b/emr_hosp/params.json.template @@ -1,6 +1,7 @@ { "static_file_dir": "./static", "export_dir": "./receiving", + "cache_dir": "./cache", "input_emr_file": "./tests/test_data/SYNICUE_CMB_INPATIENT_11062020.csv.gz", "input_claims_file": "./tests/test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz", "start_date": "2020-02-01", @@ -12,5 +13,10 @@ "parallel": false, "geos": ["state", "msa", "hrr", "county"], "weekday": [true, false], - "wip_signal": "" + "wip_signal": "", + "aws_credentials": { + "aws_access_key_id": "", + "aws_secret_access_key": "" + }, + "bucket_name": "" } \ No newline at end of file diff --git a/emr_hosp/setup.py b/emr_hosp/setup.py index cb7778007..96fa45965 100644 --- a/emr_hosp/setup.py +++ b/emr_hosp/setup.py @@ -9,7 +9,9 @@ "pytest-cov", "pylint", "delphi-utils", - "covidcast" + "covidcast", + "boto3", + "moto" ] setup( diff --git a/emr_hosp/tests/params.json.template b/emr_hosp/tests/params.json.template index 99704a77e..3702a8a5e 100644 --- a/emr_hosp/tests/params.json.template +++ b/emr_hosp/tests/params.json.template @@ -4,6 +4,19 @@ "cache_dir": "./cache", "input_emr_file": "test_data/SYNICUE_CMB_INPATIENT_11062020.csv.gz", "input_claims_file": "test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz", - "drop_date": "2020-06-11", - "wip_signal": "" + "start_date": "2020-02-01", + "end_date": "2020-02-02", + "drop_date": "2020-02-02", + "n_backfill_days": 60, + "n_waiting_days": 3, + "se": false, + "parallel": false, + "geos": ["state", "msa", "hrr", "county"], + "weekday": [true, false], + "wip_signal": "", + "aws_credentials": { + "aws_access_key_id": "FAKE_TEST_ACCESS_KEY_ID", + "aws_secret_access_key": "FAKE_TEST_SECRET_ACCESS_KEY" + }, + "bucket_name": "test_bucket" } \ No newline at end of file diff --git a/emr_hosp/tests/receiving/.gitignore b/emr_hosp/tests/receiving/.gitignore new file mode 100644 index 000000000..16f2dc5fa --- /dev/null +++ b/emr_hosp/tests/receiving/.gitignore @@ -0,0 +1 @@ +*.csv \ No newline at end of file diff --git a/emr_hosp/tests/test_update_sensor.py b/emr_hosp/tests/test_update_sensor.py index d417c778f..eb8d63d99 100644 --- a/emr_hosp/tests/test_update_sensor.py +++ b/emr_hosp/tests/test_update_sensor.py @@ -8,6 +8,9 @@ # third party import pandas as pd import numpy as np +from boto3 import Session +from moto import mock_s3 +import pytest # third party from delphi_utils import read_params @@ -17,6 +20,7 @@ from delphi_emr_hosp.constants import * from delphi_emr_hosp.update_sensor import write_to_csv, add_prefix, EMRHospSensorUpdator from delphi_emr_hosp.load_data import * +from delphi_emr_hosp.run import run_module CONFIG = Config() CONSTANTS = Constants() @@ -85,12 +89,20 @@ def test_update_sensor(self): self.weekday, self.se ) - su_inst.update_sensor( - EMR_FILEPATH, - CLAIMS_FILEPATH, - td.name, - PARAMS["static_file_dir"] - ) + + with mock_s3(): + # Create the fake bucket we will be using + params = read_params() + aws_credentials = params["aws_credentials"] + s3_client = Session(**aws_credentials).client("s3") + s3_client.create_bucket(Bucket=params["bucket_name"]) + su_inst.update_sensor( + EMR_FILEPATH, + CLAIMS_FILEPATH, + td.name, + PARAMS["static_file_dir"] + ) + assert len(os.listdir(td.name)) == len(su_inst.sensor_dates), f"failed {geo} update sensor test" td.cleanup()