diff --git a/_delphi_utils_python/delphi_utils/runner.py b/_delphi_utils_python/delphi_utils/runner.py index 949cd4eaa..2ecc79f15 100644 --- a/_delphi_utils_python/delphi_utils/runner.py +++ b/_delphi_utils_python/delphi_utils/runner.py @@ -4,7 +4,7 @@ from typing import Any, Callable, Dict, Optional from .archive import ArchiveDiffer, archiver_from_params from .logger import get_structured_logger -from .utils import read_params, transfer_files +from .utils import read_params, transfer_files, delete_move_files from .validator.validate import Validator from .validator.run import validator_from_params @@ -46,6 +46,9 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None], if validator: validation_report = validator.validate() validation_report.log(logger) + # Validators on dry-run always return success + if not validation_report.success(): + delete_move_files() if (not validator or validation_report.success()): if archiver: archiver.run(logger) diff --git a/_delphi_utils_python/delphi_utils/utils.py b/_delphi_utils_python/delphi_utils/utils.py index 0504967bf..78118faae 100644 --- a/_delphi_utils_python/delphi_utils/utils.py +++ b/_delphi_utils_python/delphi_utils/utils.py @@ -95,5 +95,33 @@ def transfer_files(): delivery_dir = params["delivery"].get("delivery_dir", None) files_to_export = os.listdir(export_dir) for file_name in files_to_export: + if file_name.lower().endswith(".csv"): + move(os.path.join(export_dir, file_name), os.path.join(delivery_dir, file_name)) + +def delete_move_files(): + """Delete or move output files depending on dir settings provided in params. + + 1. Delete files in export-dir if delivery-dir is specified and is different + from export_dir (aka only delete files produced by the most recent run) + 2. If validation-failures-dir is specified, move failed files there instead + If dry-run tag is True, then this function should not (and currently does not) get called + """ + params = read_params() + export_dir = params["common"].get("export_dir", None) + delivery_dir = params["delivery"].get("delivery_dir", None) + validation_failure_dir = params["validation"]["common"].get("validation_failure_dir", None) + # Create validation_failure_dir if it doesn't exist + if (validation_failure_dir is not None) and (not os.path.exists(validation_failure_dir)): + os.mkdir(validation_failure_dir) + # Double-checking that export-dir is not delivery-dir + # Throw assertion error if delivery_dir or export_dir is unspecified + assert(delivery_dir is not None and export_dir is not None) + assert export_dir != delivery_dir + files_to_delete = os.listdir(export_dir) + for file_name in files_to_delete: if file_name.endswith(".csv") or file_name.endswith(".CSV"): - move(os.path.join(export_dir, file_name), delivery_dir) + if validation_failure_dir is not None: + move(os.path.join(export_dir, file_name), + os.path.join(validation_failure_dir, file_name)) + else: + os.remove(os.path.join(export_dir, file_name)) diff --git a/_delphi_utils_python/delphi_utils/validator/README.md b/_delphi_utils_python/delphi_utils/validator/README.md index 37c41ad11..3c42a223b 100644 --- a/_delphi_utils_python/delphi_utils/validator/README.md +++ b/_delphi_utils_python/delphi_utils/validator/README.md @@ -43,6 +43,27 @@ deactivate rm -r env ``` +### Using the runner + +The validator can also be called using the runner, which performs the entire pipeline: + +1. Calling the indicator's `run` module +2. Running the validator on newly obtained CSV files, if validator is specified +3. If validation is unsuccessful, remove or transfer current files in the run +4. Archive files if archiver is specified and there are no validation failures +5. Transfer files to delivery directory if specified in params + +The following truth table describes behavior of item (3): + +| case | dry_run | validation failure | export dir | delivery dir | v failure dir | action | +| - | - | - | - | - | - | - | +| 1 | true | never | * | * | * | no effect | +| 2 | false | no | * | * | * | no effect | +| 3 | false | yes | X | X | * | throw assertion exception | +| 4 | false | yes | X | None | * | throw assertion exception | +| 5 | false | yes | X | Y != X | None | delete CSV from X | +| 6 | false | yes | X | Y != X | Z | move CSV from X to Z | + ### Customization All of the user-changable parameters are stored in the `validation` field of the indicator's `params.json` file. If `params.json` does not already include a `validation` field, please copy that provided in this module's `params.json.template`. diff --git a/_delphi_utils_python/tests/test_runner.py b/_delphi_utils_python/tests/test_runner.py index 54ab7881b..89543fef5 100644 --- a/_delphi_utils_python/tests/test_runner.py +++ b/_delphi_utils_python/tests/test_runner.py @@ -55,8 +55,9 @@ def test_full_run(self, mock_read_params, mock_validator_fn.return_value.validate.assert_called_once() mock_archiver_fn.return_value.run.assert_called_once() + @mock.patch("delphi_utils.runner.delete_move_files") @mock.patch("delphi_utils.runner.read_params") - def test_failed_validation(self, mock_read_params, + def test_failed_validation(self, mock_read_params, mock_delete_move_files, mock_indicator_fn, mock_validator_fn, mock_archiver_fn): """Test that archiving is not called when validation fails.""" mock_read_params.return_value = self.PARAMS @@ -65,6 +66,7 @@ def test_failed_validation(self, mock_read_params, run_indicator_pipeline(mock_indicator_fn, mock_validator_fn, mock_archiver_fn) + mock_delete_move_files.assert_called_once() mock_indicator_fn.assert_called_once_with(self.PARAMS) mock_validator_fn.assert_called_once_with(self.PARAMS) mock_archiver_fn.assert_called_once_with(self.PARAMS)