Skip to content

1078 - Refactor csv_importer.py and csv_to_database.py #1103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions integrations/acquisition/covidcast/test_csv_uploading.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
# first party
from delphi_utils import Nans
from delphi.epidata.client.delphi_epidata import Epidata
from delphi.epidata.acquisition.covidcast.csv_to_database import main
from delphi.epidata.acquisition.covidcast.csv_importer import main
import delphi.operations.secrets as secrets

# py3tester coverage target (equivalent to `import *`)
__test_target__ = 'delphi.epidata.acquisition.covidcast.csv_to_database'
__test_target__ = 'delphi.epidata.acquisition.covidcast.csv_importer'


class CsvUploadingTests(unittest.TestCase):
Expand Down
202 changes: 190 additions & 12 deletions src/acquisition/covidcast/csv_importer.py
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented extract_and_check_row, validate_missing_code, validate_quantity and other validations with pandas. I found the only way to create CovidcastRows - with itertuples.
Not sure if we need to save @staticmethods it makes no sense if we not using them somewhere else

Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
"""Collects and reads covidcast data from a set of local CSV files."""
"""
Collects and reads covidcast data from a set of local CSV files.

# standard library
Imports covidcast CSVs and stores them in the epidata database.
"""

import argparse
import os
import re
import time
from dataclasses import dataclass
from datetime import date
from glob import glob
from typing import Iterator, NamedTuple, Optional, Tuple
from logging import Logger
from typing import Callable, Iterable, Iterator, NamedTuple, Optional, Tuple

# third party
import epiweeks as epi
import pandas as pd

# first party
from delphi_utils import Nans
from delphi.utils.epiweek import delta_epiweeks
from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow
from delphi.epidata.acquisition.covidcast.database import Database, DBLoadStateException
from delphi.epidata.acquisition.covidcast.file_archiver import FileArchiver
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger
from delphi.utils.epiweek import delta_epiweeks
from delphi_utils import Nans


DataFrameRow = NamedTuple('DFRow', [
('geo_id', str),
Expand Down Expand Up @@ -208,7 +214,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today()
def is_header_valid(columns):
"""Return whether the given pandas columns contains the required fields."""

return set(columns) >= CsvImporter.REQUIRED_COLUMNS
return CsvImporter.REQUIRED_COLUMNS.issubset(set(columns))


@staticmethod
Expand Down Expand Up @@ -282,7 +288,6 @@ def validate_missing_code(row, attr_quantity, attr_name, filepath=None, logger=N

return missing_entry


@staticmethod
def extract_and_check_row(row: DataFrameRow, geo_type: str, filepath: Optional[str] = None) -> Tuple[Optional[CsvRowValue], Optional[str]]:
"""Extract and return `CsvRowValue` from a CSV row, with sanity checks.
Expand Down Expand Up @@ -379,9 +384,16 @@ def load_csv(filepath: str, details: PathDetails) -> Iterator[Optional[Covidcast

try:
table = pd.read_csv(filepath, dtype=CsvImporter.DTYPES)
except ValueError as e:
except pd.errors.DtypeWarning as e:
logger.warning(event='Failed to open CSV with specified dtypes, switching to str', detail=str(e), file=filepath)
table = pd.read_csv(filepath, dtype='str')
try:
table = pd.read_csv(filepath, dtype='str')
except pd.errors.DtypeWarning as e:
logger.warning(event='Failed to open CSV with str dtype', detail=str(e), file=filepath)
return
except pd.errors.EmptyDataError as e:
logger.warning(event='Empty data or header is encountered', detail=str(e), file=filepath)
return

if not CsvImporter.is_header_valid(table.columns):
logger.warning(event='invalid header', detail=table.columns, file=filepath)
Expand Down Expand Up @@ -414,3 +426,169 @@ def load_csv(filepath: str, details: PathDetails) -> Iterator[Optional[Covidcast
details.issue,
details.lag,
)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from this line down, all this text is a straight copy-paste from csv_to_database.py (line 15 down) with no modifications, right?


def get_argument_parser():
"""Define command line arguments."""

parser = argparse.ArgumentParser()
parser.add_argument(
'--data_dir',
help='top-level directory where CSVs are stored')
parser.add_argument(
'--specific_issue_date',
action='store_true',
help='indicates <data_dir> argument is where issuedate-specific subdirectories can be found.')
parser.add_argument(
'--log_file',
help="filename for log output (defaults to stdout)")
return parser


def collect_files(data_dir: str, specific_issue_date: bool):
"""Fetch path and data profile details for each file to upload."""
logger= get_structured_logger('collect_files')
if specific_issue_date:
results = list(CsvImporter.find_issue_specific_csv_files(data_dir))
else:
results = list(CsvImporter.find_csv_files(os.path.join(data_dir, 'receiving')))
logger.info(f'found {len(results)} files')
return results


def make_handlers(data_dir: str, specific_issue_date: bool):
if specific_issue_date:
# issue-specific uploads are always one-offs, so we can leave all
# files in place without worrying about cleaning up
def handle_failed(path_src, filename, source, logger):
logger.info(event='leaving failed file alone', dest=source, file=filename)

def handle_successful(path_src, filename, source, logger):
logger.info(event='archiving as successful',file=filename)
FileArchiver.archive_inplace(path_src, filename)
else:
# normal automation runs require some shuffling to remove files
# from receiving and place them in the archive
archive_successful_dir = os.path.join(data_dir, 'archive', 'successful')
archive_failed_dir = os.path.join(data_dir, 'archive', 'failed')

# helper to archive a failed file without compression
def handle_failed(path_src, filename, source, logger):
logger.info(event='archiving as failed - ', detail=source, file=filename)
path_dst = os.path.join(archive_failed_dir, source)
compress = False
FileArchiver.archive_file(path_src, path_dst, filename, compress)

# helper to archive a successful file with compression
def handle_successful(path_src, filename, source, logger):
logger.info(event='archiving as successful',file=filename)
path_dst = os.path.join(archive_successful_dir, source)
compress = True
FileArchiver.archive_file(path_src, path_dst, filename, compress)

return handle_successful, handle_failed


def upload_archive(
path_details: Iterable[Tuple[str, Optional[PathDetails]]],
database: Database,
handlers: Tuple[Callable],
logger: Logger
):
"""Upload CSVs to the database and archive them using the specified handlers.

:path_details: output from CsvImporter.find*_csv_files

:database: an open connection to the epidata database

:handlers: functions for archiving (successful, failed) files

:return: the number of modified rows
"""
archive_as_successful, archive_as_failed = handlers
total_modified_row_count = 0
# iterate over each file
for path, details in path_details:
logger.info(event='handling', dest=path)
path_src, filename = os.path.split(path)

# file path or name was invalid, source is unknown
if not details:
archive_as_failed(path_src, filename, 'unknown',logger)
continue

csv_rows = CsvImporter.load_csv(path, details)
rows_list = list(csv_rows)
all_rows_valid = rows_list and all(r is not None for r in rows_list)
if all_rows_valid:
try:
modified_row_count = database.insert_or_update_bulk(rows_list)
logger.info(f"insert_or_update_bulk {filename} returned {modified_row_count}")
logger.info(
"Inserted database rows",
row_count = modified_row_count,
source = details.source,
signal = details.signal,
geo_type = details.geo_type,
time_value = details.time_value,
issue = details.issue,
lag = details.lag
)
if modified_row_count is None or modified_row_count: # else would indicate zero rows inserted
total_modified_row_count += (modified_row_count if modified_row_count else 0)
database.commit()
except DBLoadStateException as e:
# if the db is in a state that is not fit for loading new data,
# then we should stop processing any more files
raise e
except Exception as e:
all_rows_valid = False
logger.exception('exception while inserting rows', exc_info=e)
database.rollback()

# archive the current file based on validation results
if all_rows_valid:
archive_as_successful(path_src, filename, details.source, logger)
else:
archive_as_failed(path_src, filename, details.source, logger)

return total_modified_row_count


def main(args):
"""Find, parse, and upload covidcast signals."""

logger = get_structured_logger("csv_ingestion", filename=args.log_file)
start_time = time.time()

# shortcut escape without hitting db if nothing to do
path_details = collect_files(args.data_dir, args.specific_issue_date)
if not path_details:
logger.info('nothing to do; exiting...')
return

logger.info("Ingesting CSVs", csv_count = len(path_details))

database = Database()
database.connect()

try:
modified_row_count = upload_archive(
path_details,
database,
make_handlers(args.data_dir, args.specific_issue_date),
logger
)
logger.info("Finished inserting/updating database rows", row_count = modified_row_count)
finally:
database.do_analyze()
# unconditionally commit database changes since CSVs have been archived
database.disconnect(True)

logger.info(
"Ingested CSVs into database",
total_runtime_in_seconds=round(time.time() - start_time, 2))


if __name__ == '__main__':
main(get_argument_parser().parse_args())
Loading