Skip to content

Commit f472223

Browse files
committed
Merge branch 'fb-package-speed-up-3' of github.com:cmu-delphi/covidcast-indicators into fb-package-speed-up-3
2 parents 65ab02d + d45dcbe commit f472223

File tree

25 files changed

+453
-167
lines changed

25 files changed

+453
-167
lines changed

_delphi_utils_python/delphi_utils/archive.py

Lines changed: 67 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@
2626
Created: 2020-08-06
2727
"""
2828

29-
from argparse import ArgumentParser
3029
from contextlib import contextmanager
3130
import filecmp
3231
from glob import glob
3332
from os import remove, replace
3433
from os.path import join, basename, abspath
3534
import shutil
35+
import time
3636
from typing import Tuple, List, Dict, Optional
3737

3838
from boto3 import Session
@@ -42,6 +42,7 @@
4242
import pandas as pd
4343

4444
from .utils import read_params
45+
from .logger import get_structured_logger
4546

4647
Files = List[str]
4748
FileDiffMap = Dict[str, Optional[str]]
@@ -98,43 +99,57 @@ def diff_export_csv(
9899
after_df.loc[added_idx, :])
99100

100101

101-
def run_module(archive_type: str,
102-
cache_dir: str,
103-
export_dir: str,
104-
**kwargs):
105-
"""Build and run an ArchiveDiffer.
102+
def archiver_from_params(params):
103+
"""Build an ArchiveDiffer from `params`.
104+
105+
The type of ArchiveDiffer constructed is inferred from the parameters.
106106
107107
Parameters
108108
----------
109-
archive_type: str
110-
Type of ArchiveDiffer to run. Must be one of ["git", "s3"] which correspond to
111-
`GitArchiveDiffer` and `S3ArchiveDiffer`, respectively.
112-
cache_dir: str
113-
The directory for storing most recent archived/uploaded CSVs to start diffing from.
114-
export_dir: str
115-
The directory with most recent exported CSVs to diff to.
116-
**kwargs:
117-
Keyword arguments corresponding to constructor arguments for the respective ArchiveDiffers.
109+
params: Dict[str, Dict[str, Any]]
110+
Dictionary of user-defined parameters with the following structure:
111+
- "common":
112+
- "export_dir": str, directory to which indicator output files have been exported
113+
- "archive":
114+
- "cache_dir": str, directory containing cached data from previous indicator runs
115+
- "branch_name" (required for git archiver): str, name of git branch
116+
- "override_dirty" (optional for git archiver): bool, whether to allow overwriting of
117+
untracked & uncommitted changes in `cache_dir`
118+
- "commit_partial_success" (optional for git archiver): bool, whether to still commit
119+
even if some files were not archived and staged due to `override_dirty=False`
120+
- "commit_message" (optional for git archiver): str, commit message to use
121+
- "bucket_name" (required for S3 archiver): str, name of S3 bucket to which to upload
122+
files
123+
- "indicator_prefix" (required for S3 archiver): str, S3 prefix for files from this
124+
indicator
125+
- "aws_credentials" (required for S3 archiver): Dict[str, str], authentication
126+
parameters for S3 to create a boto3.Session
127+
128+
Returns
129+
-------
130+
ArchiveDiffer of the inferred type.
118131
"""
119-
if archive_type == "git":
120-
arch_diff = GitArchiveDiffer(cache_dir,
121-
export_dir,
122-
kwargs["branch_name"],
123-
kwargs["override_dirty"],
124-
kwargs["commit_partial_success"],
125-
kwargs["commit_message"])
126-
elif archive_type == "s3":
127-
arch_diff = S3ArchiveDiffer(cache_dir,
128-
export_dir,
129-
kwargs["bucket_name"],
130-
kwargs["indicator_prefix"],
131-
kwargs["aws_credentials"])
132-
elif archive_type == "filesystem":
133-
arch_diff = FilesystemArchiveDiffer(cache_dir,
134-
export_dir)
135-
else:
136-
raise ValueError(f"No archive type named '{archive_type}'")
137-
arch_diff.run()
132+
if "archive" not in params:
133+
return None
134+
135+
# Copy to kwargs to take advantage of default arguments to archiver
136+
kwargs = params["archive"]
137+
kwargs["export_dir"] = params["common"]["export_dir"]
138+
139+
if "branch_name" in kwargs:
140+
return GitArchiveDiffer(**kwargs)
141+
142+
if "bucket_name" in kwargs:
143+
assert "indicator_prefix" in kwargs, "Missing indicator_prefix in params"
144+
assert "aws_credentials" in kwargs, "Missing aws_credentials in params"
145+
return S3ArchiveDiffer(**kwargs)
146+
147+
# Don't run the filesystem archiver if the user misspecified the archiving params
148+
assert set(kwargs.keys()) == set(["cache_dir", "export_dir"]),\
149+
'If you intended to run a filesystem archiver, please remove all options other than '\
150+
'"cache_dir" from the "archive" params. Otherwise, please include either "branch_name" '\
151+
'or "bucket_name" to run the git or S3 archivers, respectively.'
152+
return FilesystemArchiveDiffer(**kwargs)
138153

139154

140155
class ArchiveDiffer:
@@ -621,46 +636,26 @@ def update_cache(self):
621636
self._cache_updated = True
622637

623638
if __name__ == "__main__":
624-
parser = ArgumentParser()
625-
parser.add_argument("--archive_type", required=True, type=str,
626-
choices=["git", "s3", "filesystem"],
627-
help="Type of archive differ to use.")
628-
parser.add_argument("--indicator_prefix", type=str, default="",
629-
help="The prefix for S3 keys related to this indicator."
630-
" Required for `archive_type = 's3'")
631-
parser.add_argument("--branch_name", type=str, default="",
632-
help=" Branch to use for `archive_type` = 'git'.")
633-
parser.add_argument("--override_dirty", action="store_true",
634-
help="Whether to allow overwriting of untracked &"
635-
" uncommitted changes for `archive_type` = 'git'")
636-
parser.add_argument("--commit_partial_success", action="store_true",
637-
help="Whether to still commit for `archive_type` = "
638-
"'git' even if some files were not archived and "
639-
"staged due to `override_dirty` = False.")
640-
parser.add_argument("--commit_message", type=str, default="",
641-
help="Commit message for `archive_type` = 'git'")
642-
args = parser.parse_args()
643-
params = read_params()
639+
_params = read_params()
644640

645641
# Autodetect whether parameters have been factored hierarchically or not
646642
# See https://github.com/cmu-delphi/covidcast-indicators/issues/847
647643
# Once all indicators have their parameters factored in to "common", "indicator", "validation",
648644
# and "archive", this code will be obsolete.
649-
if "archive" in params:
650-
archive_params = params["archive"]
651-
common_params = params["common"]
652-
else:
653-
archive_params = params
654-
common_params = params
655-
656-
run_module(args.archive_type,
657-
archive_params["cache_dir"],
658-
common_params["export_dir"],
659-
aws_credentials=archive_params.get("aws_credentials", {}),
660-
branch_name=args.branch_name,
661-
bucket_name=archive_params.get("bucket_name", ""),
662-
commit_message=args.commit_message,
663-
commit_partial_success=args.commit_partial_success,
664-
indicator_prefix=args.indicator_prefix,
665-
override_dirty=args.override_dirty
666-
)
645+
#
646+
# We assume that by virtue of invoking this module from the command line that the user intends
647+
# to run validation. Thus if the "archive" sub-object is not found, we interpret that to mean
648+
# the parameters have not be hierarchically refactored.
649+
if "archive" not in _params:
650+
_params = {"archive": _params, "common": _params}
651+
652+
logger = get_structured_logger(
653+
__name__, filename=_params["common"].get("log_filename"),
654+
log_exceptions=_params["common"].get("log_exceptions", True))
655+
start_time = time.time()
656+
657+
archiver_from_params(_params).run()
658+
659+
elapsed_time_in_seconds = round(time.time() - start_time, 2)
660+
logger.info("Completed archive run.",
661+
elapsed_time_in_seconds=elapsed_time_in_seconds)

_delphi_utils_python/delphi_utils/runner.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
"""Indicator running utilities."""
2+
import argparse as ap
3+
import importlib
24
from typing import Any, Callable, Dict, Optional
3-
from .archive import ArchiveDiffer
5+
from .archive import ArchiveDiffer, archiver_from_params
46
from .utils import read_params
57
from .validator.validate import Validator
8+
from .validator.run import validator_from_params
69

710
Params = Dict[str, Any]
811

@@ -33,3 +36,14 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None],
3336
validation_report = validator.validate()
3437
if archiver and (not validator or validation_report.success()):
3538
archiver.archive()
39+
40+
41+
if __name__ == "__main__":
42+
parser = ap.ArgumentParser()
43+
parser.add_argument("indicator_name",
44+
type=str,
45+
help="Name of the Python package containing the indicator. This package "
46+
"must export a `run_module(params)` function.")
47+
args = parser.parse_args()
48+
indicator_module = importlib.import_module(args.indicator_name)
49+
run_indicator_pipeline(indicator_module.run_module, validator_from_params, archiver_from_params)

_delphi_utils_python/delphi_utils/validator/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ Please update the follow settings:
5353

5454
* `common`: global validation settings
5555
* `data_source`: should match the [formatting](https://cmu-delphi.github.io/delphi-epidata/api/covidcast_signals.html) as used in COVIDcast API calls
56-
* `end_date`: specifies the last date to be checked; if set to "latest", `end_date` will always be the current date
57-
* `span_length`: specifies the number of days before the `end_date` to check. `span_length` should be long enough to contain all recent source data that is still in the process of being updated (i.e. in the backfill period), for example, if the data source of interest has a 2-week lag before all reports are in for a given date, `scan_length` should be 14 days
56+
* `end_date`: specifies the last date to be checked; this can be specified as `YYYY-MM-DD` or as `today-{num}`. The latter is interpretted as `num` days before the current date (with `today-0` being today).
57+
* `span_length`: specifies the number of days before the `end_date` to check. `span_length` should be long enough to contain all recent source data that is still in the process of being updated (i.e. in the backfill period), for example, if the data source of interest has a 2-week lag before all reports are in for a given date, `span_length` should be 14 days
5858
* `suppressed_errors`: list of objects specifying errors that have been manually verified as false positives or acceptable deviations from expected. These errors can be specified with the following variables, where omitted values are interpreted as a wildcard, i.e., not specifying a date applies to all dates:
5959
* `check_name` (required): name of the check, as specified in the validation output
6060
* `date`: date in `YYYY-MM-DD` format

_delphi_utils_python/delphi_utils/validator/run.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def run_module():
1414
validator.validate().print_and_exit()
1515

1616

17-
def from_params(params):
17+
def validator_from_params(params):
1818
"""Construct a validator from `params`.
1919
2020
Arguments

_delphi_utils_python/delphi_utils/validator/utils.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,9 @@ def __post_init__(self):
3535
def from_params(cls, end_date_str: str, span_length_int: int):
3636
"""Create a TimeWindow from param representations of its members."""
3737
span_length = timedelta(days=span_length_int)
38-
if end_date_str == "latest":
39-
end_date = date.today()
38+
if end_date_str.startswith("today-"):
39+
days_back = timedelta(days=int(end_date_str[6:]))
40+
end_date = date.today() - days_back
4041
else:
4142
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
4243
return cls(end_date, span_length)

_delphi_utils_python/tests/test_archive.py

Lines changed: 143 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@
55

66
from boto3 import Session
77
from git import Repo, exc
8+
import mock
89
from moto import mock_s3
910
import numpy as np
1011
import pandas as pd
1112
from pandas.testing import assert_frame_equal
1213
import pytest
1314

14-
from delphi_utils import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer
15+
from delphi_utils.archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer,\
16+
archiver_from_params
1517

1618
CSV_DTYPES = {"geo_id": str, "val": float, "se": float, "sample_size": float}
1719

@@ -467,3 +469,143 @@ def test_run(self, tmp_path):
467469
assert_frame_equal(
468470
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
469471
csv1_diff)
472+
473+
474+
class TestFromParams:
475+
"""Tests for creating archive differs from params."""
476+
477+
def test_null_creation(self):
478+
"""Test that a None object is created with no "archive" params."""
479+
assert archiver_from_params({"common": {}}) is None
480+
481+
@mock.patch("delphi_utils.archive.GitArchiveDiffer")
482+
def test_get_git_archiver(self, mock_archiver):
483+
"""Test that GitArchiveDiffer is created successfully."""
484+
params = {
485+
"common": {
486+
"export_dir": "dir"
487+
},
488+
"archive": {
489+
"cache_dir": "cache",
490+
"branch_name": "branch",
491+
"override_dirty": True,
492+
"commit_partial_success": True,
493+
"commit_message": "msg"
494+
}
495+
}
496+
497+
archiver_from_params(params)
498+
mock_archiver.assert_called_once_with(
499+
export_dir="dir",
500+
cache_dir="cache",
501+
branch_name="branch",
502+
override_dirty=True,
503+
commit_partial_success=True,
504+
commit_message="msg"
505+
)
506+
507+
@mock.patch("delphi_utils.archive.GitArchiveDiffer")
508+
def test_get_git_archiver_with_defaults(self, mock_archiver):
509+
"""Test that GitArchiveDiffer is created successfully without optional arguments."""
510+
params = {
511+
"common": {
512+
"export_dir": "dir"
513+
},
514+
"archive": {
515+
"cache_dir": "cache",
516+
"branch_name": "branch",
517+
"commit_message": "msg"
518+
}
519+
}
520+
521+
archiver_from_params(params)
522+
mock_archiver.assert_called_once_with(
523+
export_dir="dir",
524+
cache_dir="cache",
525+
branch_name="branch",
526+
commit_message="msg"
527+
)
528+
@mock.patch("delphi_utils.archive.S3ArchiveDiffer")
529+
def test_get_s3_archiver(self, mock_archiver):
530+
"""Test that S3ArchiveDiffer is created successfully."""
531+
params = {
532+
"common": {
533+
"export_dir": "dir"
534+
},
535+
"archive": {
536+
"cache_dir": "cache",
537+
"bucket_name": "bucket",
538+
"indicator_prefix": "ind",
539+
"aws_credentials": {"pass": "word"}
540+
}
541+
}
542+
543+
archiver_from_params(params)
544+
mock_archiver.assert_called_once_with(
545+
export_dir="dir",
546+
cache_dir="cache",
547+
bucket_name="bucket",
548+
indicator_prefix="ind",
549+
aws_credentials={"pass": "word"}
550+
)
551+
552+
def test_get_s3_archiver_without_required(self):
553+
"""Test that S3ArchiveDiffer is not created without required arguments."""
554+
params = {
555+
"common": {
556+
"export_dir": "dir"
557+
},
558+
"archive": {
559+
"cache_dir": "cache",
560+
"bucket_name": "bucket"
561+
}
562+
}
563+
564+
with pytest.raises(AssertionError,
565+
match="Missing indicator_prefix in params"):
566+
archiver_from_params(params)
567+
568+
params["archive"]["indicator_prefix"] = "prefix"
569+
with pytest.raises(AssertionError,
570+
match="Missing aws_credentials in params"):
571+
archiver_from_params(params)
572+
573+
@mock.patch("delphi_utils.archive.FilesystemArchiveDiffer")
574+
def test_get_filesystem_archiver(self, mock_archiver):
575+
"""Test that FilesystemArchiveDiffer is created successfully."""
576+
params = {
577+
"common": {
578+
"export_dir": "dir"
579+
},
580+
"archive": {
581+
"cache_dir": "cache"
582+
}
583+
}
584+
585+
archiver_from_params(params)
586+
mock_archiver.assert_called_once_with(
587+
export_dir="dir",
588+
cache_dir="cache"
589+
)
590+
591+
def test_get_filesystem_archiver_with_extra_params(self):
592+
"""Test that FilesystemArchiveDiffer is not created with extra parameters."""
593+
params = {
594+
"common": {
595+
"export_dir": "dir"
596+
},
597+
"archive": {
598+
"cache_dir": "cache",
599+
"indicator_prefix": "prefix"
600+
}
601+
}
602+
603+
with pytest.raises(AssertionError,
604+
match="If you intended to run"):
605+
archiver_from_params(params)
606+
607+
del params["archive"]["cache_dir"]
608+
del params["archive"]["indicator_prefix"]
609+
with pytest.raises(AssertionError,
610+
match="If you intended to run"):
611+
archiver_from_params(params)

0 commit comments

Comments
 (0)