Skip to content

Allow archive to be run as its own module #295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Oct 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 131 additions & 17 deletions _delphi_utils_python/delphi_utils/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
Created: 2020-08-06
"""

from argparse import ArgumentParser
from contextlib import contextmanager
import filecmp
from glob import glob
Expand All @@ -39,9 +40,12 @@
from git.refs.head import Head
import pandas as pd

from .utils import read_params

Files = List[str]
FileDiffMap = Dict[str, Optional[str]]


def diff_export_csv(
before_csv: str,
after_csv: str
Expand All @@ -65,7 +69,8 @@ def diff_export_csv(
added_df is the pd.DataFrame of added rows from after_csv.
"""

export_csv_dtypes = {"geo_id": str, "val": float, "se": float, "sample_size": float}
export_csv_dtypes = {"geo_id": str, "val": float,
"se": float, "sample_size": float}

before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes)
before_df.set_index("geo_id", inplace=True)
Expand All @@ -89,6 +94,42 @@ def diff_export_csv(
after_df_cmn.loc[~(same_mask.all(axis=1)), :],
after_df.loc[added_idx, :])


def run_module(archive_type: str,
cache_dir: str,
export_dir: str,
**kwargs):
"""Builds and runs an ArchiveDiffer.

Parameters
----------
archive_type: str
Type of ArchiveDiffer to run. Must be one of ["git", "s3"] which correspond to `GitArchiveDiffer` and `S3ArchiveDiffer`, respectively.
cache_dir: str
The directory for storing most recent archived/uploaded CSVs to start diffing from.
export_dir: str
The directory with most recent exported CSVs to diff to.
**kwargs:
Keyword arguments corresponding to constructor arguments for the respective ArchiveDiffers.
"""
if archive_type == "git":
arch_diff = GitArchiveDiffer(cache_dir,
export_dir,
kwargs["branch_name"],
kwargs["override_dirty"],
kwargs["commit_partial_success"],
kwargs["commit_message"])
elif archive_type == "s3":
arch_diff = S3ArchiveDiffer(cache_dir,
export_dir,
kwargs["bucket_name"],
kwargs["indicator_prefix"],
kwargs["aws_credentials"])
else:
raise ValueError(f"No archive type named '{archive_type}'")
arch_diff.run()


class ArchiveDiffer:
"""
Base class for performing diffing and archiving of exported covidcast CSVs
Expand Down Expand Up @@ -140,12 +181,16 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]:
assert self._cache_updated

# Glob to only pick out CSV files, ignore hidden files
previous_files = set(basename(f) for f in glob(join(self.cache_dir, "*.csv")))
exported_files = set(basename(f) for f in glob(join(self.export_dir, "*.csv")))
previous_files = set(basename(f)
for f in glob(join(self.cache_dir, "*.csv")))
exported_files = set(basename(f)
for f in glob(join(self.export_dir, "*.csv")))

deleted_files = sorted(join(self.cache_dir, f) for f in previous_files - exported_files)
deleted_files = sorted(join(self.cache_dir, f)
for f in previous_files - exported_files)
common_filenames = sorted(exported_files & previous_files)
new_files = sorted(join(self.export_dir, f) for f in exported_files - previous_files)
new_files = sorted(join(self.export_dir, f)
for f in exported_files - previous_files)

common_diffs: Dict[str, Optional[str]] = {}
for filename in common_filenames:
Expand All @@ -158,11 +203,13 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]:
if filecmp.cmp(before_file, after_file, shallow=False):
continue

deleted_df, changed_df, added_df = diff_export_csv(before_file, after_file)
deleted_df, changed_df, added_df = diff_export_csv(
before_file, after_file)
new_issues_df = pd.concat([changed_df, added_df], axis=0)

if len(deleted_df) > 0:
print(f"Warning, diff has deleted indices in {after_file} that will be ignored")
print(
f"Warning, diff has deleted indices in {after_file} that will be ignored")

# Write the diffs to diff_file, if applicable
if len(new_issues_df) > 0:
Expand Down Expand Up @@ -220,6 +267,29 @@ def filter_exports(self, common_diffs: FileDiffMap):
else:
replace(diff_file, exported_file)

def run(self):
"""Runs the differ and archives the changed and new files."""
self.update_cache()

# Diff exports, and make incremental versions
_, common_diffs, new_files = self.diff_exports()

# Archive changed and new files only
to_archive = [f for f, diff in common_diffs.items()
if diff is not None]
to_archive += new_files
_, fails = self.archive_exports(to_archive)

# Filter existing exports to exclude those that failed to archive
succ_common_diffs = {f: diff for f,
diff in common_diffs.items() if f not in fails}
self.filter_exports(succ_common_diffs)

# Report failures: someone should probably look at them
for exported_file in fails:
print(f"Failed to archive '{exported_file}'")


class S3ArchiveDiffer(ArchiveDiffer):
"""
AWS S3 backend for archving
Expand Down Expand Up @@ -263,11 +333,14 @@ def update_cache(self):
For making sure cache_dir is updated with all latest files from the S3 bucket.
"""
# List all indicator-related objects from S3
archive_objects = self.bucket.objects.filter(Prefix=self.indicator_prefix).all()
archive_objects = [obj for obj in archive_objects if obj.key.endswith(".csv")]
archive_objects = self.bucket.objects.filter(
Prefix=self.indicator_prefix).all()
archive_objects = [
obj for obj in archive_objects if obj.key.endswith(".csv")]

# Check against what we have locally and download missing ones
cached_files = set(basename(f) for f in glob(join(self.cache_dir, "*.csv")))
cached_files = set(basename(f)
for f in glob(join(self.cache_dir, "*.csv")))
for obj in archive_objects:
archive_file = basename(obj.key)
cached_file = join(self.cache_dir, archive_file)
Expand Down Expand Up @@ -297,7 +370,8 @@ def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]:
archive_fail = []

for exported_file in exported_files:
cached_file = abspath(join(self.cache_dir, basename(exported_file)))
cached_file = abspath(
join(self.cache_dir, basename(exported_file)))
archive_key = join(self.indicator_prefix, basename(exported_file))

try:
Expand All @@ -314,6 +388,7 @@ def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]:

return archive_success, archive_fail


class GitArchiveDiffer(ArchiveDiffer):
"""
Local git repo backend for archiving
Expand Down Expand Up @@ -352,7 +427,7 @@ def __init__(
super().__init__(cache_dir, export_dir)

assert override_dirty or not commit_partial_success, \
"Only can commit_partial_success=True when override_dirty=True"
"Only can commit_partial_success=True when override_dirty=True"

# Assumes a repository is set up already, will raise exception if not found
self.repo = Repo(cache_dir, search_parent_directories=True)
Expand Down Expand Up @@ -405,7 +480,8 @@ def update_cache(self):
"""
# Make sure cache directory is clean: has everything nicely committed
if not self.override_dirty:
cache_clean = not self.repo.is_dirty(untracked_files=True, path=abspath(self.cache_dir))
cache_clean = not self.repo.is_dirty(
untracked_files=True, path=abspath(self.cache_dir))
assert cache_clean, f"There are uncommitted changes in the cache dir '{self.cache_dir}'"

self._cache_updated = True
Expand Down Expand Up @@ -439,11 +515,14 @@ def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]:
with self.archiving_branch():
# Abs paths of all modified files to check if we will override uncommitted changes
working_tree_dir = self.repo.working_tree_dir
dirty_files = [join(working_tree_dir, f) for f in self.repo.untracked_files]
dirty_files += [join(working_tree_dir, d.a_path) for d in self.repo.index.diff(None)]
dirty_files = [join(working_tree_dir, f)
for f in self.repo.untracked_files]
dirty_files += [join(working_tree_dir, d.a_path)
for d in self.repo.index.diff(None)]

for exported_file in exported_files:
archive_file = abspath(join(self.cache_dir, basename(exported_file)))
archive_file = abspath(
join(self.cache_dir, basename(exported_file)))

# Archive and explicitly stage new export, depending if override
if self.override_dirty or archive_file not in dirty_files:
Expand All @@ -469,11 +548,46 @@ def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]:
if len(exported_files) > 0:

# Support partial success and at least one archive succeeded
partial_success = self.commit_partial_success and len(archive_success) > 0
partial_success = self.commit_partial_success and len(
archive_success) > 0

if len(archive_success) == len(exported_files) or partial_success:
self.repo.index.commit(message=self.commit_message)

self._exports_archived = True

return archive_success, archive_fail


if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("--archive_type", required=True, type=str,
choices=["git", "s3"],
help="Type of archive differ to use.")
parser.add_argument("--indicator_prefix", type=str, default="",
help="The prefix for S3 keys related to this indicator."
" Required for `archive_type = 's3'")
parser.add_argument("--branch_name", type=str, default="",
help=" Branch to use for `archive_type` = 'git'.")
parser.add_argument("--override_dirty", action="store_true",
help="Whether to allow overwriting of untracked &"
" uncommitted changes for `archive_type` = 'git'")
parser.add_argument("--commit_partial_success", action="store_true",
help="Whether to still commit for `archive_type` = "
"'git' even if some files were not archived and "
"staged due to `override_dirty` = False.")
parser.add_argument("--commit_message", type=str, default="",
help="Commit message for `archive_type` = 'git'")
args = parser.parse_args()
params = read_params()
run_module(args.archive_type,
params.cache_dir,
params.export_dir,
aws_credentials=params.aws_credentials,
branch_name=args.branch_name,
bucket_name=params.bucket_name,
commit_message=args.commit_message,
commit_partial_success=args.commit_partial_success,
indicator_prefix=args.indicator_prefix,
override_dirty=args.override_dirty
)
Loading