-
Notifications
You must be signed in to change notification settings - Fork 67
1078 - Refactor csv_importer.py and csv_to_database.py #1103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
BrainIsDead
wants to merge
7
commits into
dev
Choose a base branch
from
issue_1078
base: dev
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 2 commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
b9d8cfa
csv importer with issues
65a8da3
made rollback for sanity check, `csv_to_database` functionality mover…
73c38d6
test+csv_to_database moved to test_csv_imported, minor changes
04eaada
load_csv handled with pandas
5de1dae
refactor: csv_importer
dshemetov 175c900
refactor: improve a few error messages
dshemetov 5bd8408
Merge pull request #1116 from cmu-delphi/ds/csv_importer_pandas
BrainIsDead File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,22 +1,28 @@ | ||
"""Collects and reads covidcast data from a set of local CSV files.""" | ||
""" | ||
Collects and reads covidcast data from a set of local CSV files. | ||
|
||
# standard library | ||
Imports covidcast CSVs and stores them in the epidata database. | ||
""" | ||
|
||
import argparse | ||
import os | ||
import re | ||
import time | ||
from dataclasses import dataclass | ||
from datetime import date | ||
from glob import glob | ||
from typing import Iterator, NamedTuple, Optional, Tuple | ||
from logging import Logger | ||
from typing import Callable, Iterable, Iterator, NamedTuple, Optional, Tuple | ||
|
||
# third party | ||
import epiweeks as epi | ||
import pandas as pd | ||
|
||
# first party | ||
from delphi_utils import Nans | ||
from delphi.utils.epiweek import delta_epiweeks | ||
from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow | ||
from delphi.epidata.acquisition.covidcast.database import Database, DBLoadStateException | ||
from delphi.epidata.acquisition.covidcast.file_archiver import FileArchiver | ||
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger | ||
from delphi.utils.epiweek import delta_epiweeks | ||
from delphi_utils import Nans | ||
|
||
|
||
DataFrameRow = NamedTuple('DFRow', [ | ||
('geo_id', str), | ||
|
@@ -208,7 +214,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() | |
def is_header_valid(columns): | ||
"""Return whether the given pandas columns contains the required fields.""" | ||
|
||
return set(columns) >= CsvImporter.REQUIRED_COLUMNS | ||
return CsvImporter.REQUIRED_COLUMNS.issubset(set(columns)) | ||
|
||
|
||
@staticmethod | ||
|
@@ -282,7 +288,6 @@ def validate_missing_code(row, attr_quantity, attr_name, filepath=None, logger=N | |
|
||
return missing_entry | ||
|
||
|
||
@staticmethod | ||
def extract_and_check_row(row: DataFrameRow, geo_type: str, filepath: Optional[str] = None) -> Tuple[Optional[CsvRowValue], Optional[str]]: | ||
"""Extract and return `CsvRowValue` from a CSV row, with sanity checks. | ||
|
@@ -379,9 +384,16 @@ def load_csv(filepath: str, details: PathDetails) -> Iterator[Optional[Covidcast | |
|
||
try: | ||
table = pd.read_csv(filepath, dtype=CsvImporter.DTYPES) | ||
except ValueError as e: | ||
except pd.errors.DtypeWarning as e: | ||
melange396 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
logger.warning(event='Failed to open CSV with specified dtypes, switching to str', detail=str(e), file=filepath) | ||
table = pd.read_csv(filepath, dtype='str') | ||
try: | ||
table = pd.read_csv(filepath, dtype='str') | ||
except pd.errors.DtypeWarning as e: | ||
logger.warning(event='Failed to open CSV with str dtype', detail=str(e), file=filepath) | ||
melange396 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return | ||
melange396 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
except pd.errors.EmptyDataError as e: | ||
logger.warning(event='Empty data or header is encountered', detail=str(e), file=filepath) | ||
return | ||
melange396 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if not CsvImporter.is_header_valid(table.columns): | ||
logger.warning(event='invalid header', detail=table.columns, file=filepath) | ||
|
@@ -414,3 +426,169 @@ def load_csv(filepath: str, details: PathDetails) -> Iterator[Optional[Covidcast | |
details.issue, | ||
details.lag, | ||
) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. from this line down, all this text is a straight copy-paste from |
||
|
||
def get_argument_parser(): | ||
melange396 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Define command line arguments.""" | ||
|
||
parser = argparse.ArgumentParser() | ||
parser.add_argument( | ||
'--data_dir', | ||
help='top-level directory where CSVs are stored') | ||
parser.add_argument( | ||
'--specific_issue_date', | ||
action='store_true', | ||
help='indicates <data_dir> argument is where issuedate-specific subdirectories can be found.') | ||
parser.add_argument( | ||
'--log_file', | ||
help="filename for log output (defaults to stdout)") | ||
return parser | ||
|
||
|
||
def collect_files(data_dir: str, specific_issue_date: bool): | ||
"""Fetch path and data profile details for each file to upload.""" | ||
logger= get_structured_logger('collect_files') | ||
if specific_issue_date: | ||
results = list(CsvImporter.find_issue_specific_csv_files(data_dir)) | ||
else: | ||
results = list(CsvImporter.find_csv_files(os.path.join(data_dir, 'receiving'))) | ||
logger.info(f'found {len(results)} files') | ||
return results | ||
|
||
|
||
def make_handlers(data_dir: str, specific_issue_date: bool): | ||
if specific_issue_date: | ||
# issue-specific uploads are always one-offs, so we can leave all | ||
# files in place without worrying about cleaning up | ||
def handle_failed(path_src, filename, source, logger): | ||
logger.info(event='leaving failed file alone', dest=source, file=filename) | ||
|
||
def handle_successful(path_src, filename, source, logger): | ||
logger.info(event='archiving as successful',file=filename) | ||
FileArchiver.archive_inplace(path_src, filename) | ||
else: | ||
# normal automation runs require some shuffling to remove files | ||
# from receiving and place them in the archive | ||
archive_successful_dir = os.path.join(data_dir, 'archive', 'successful') | ||
archive_failed_dir = os.path.join(data_dir, 'archive', 'failed') | ||
|
||
# helper to archive a failed file without compression | ||
def handle_failed(path_src, filename, source, logger): | ||
logger.info(event='archiving as failed - ', detail=source, file=filename) | ||
path_dst = os.path.join(archive_failed_dir, source) | ||
compress = False | ||
FileArchiver.archive_file(path_src, path_dst, filename, compress) | ||
|
||
# helper to archive a successful file with compression | ||
def handle_successful(path_src, filename, source, logger): | ||
logger.info(event='archiving as successful',file=filename) | ||
path_dst = os.path.join(archive_successful_dir, source) | ||
compress = True | ||
FileArchiver.archive_file(path_src, path_dst, filename, compress) | ||
|
||
return handle_successful, handle_failed | ||
|
||
|
||
def upload_archive( | ||
path_details: Iterable[Tuple[str, Optional[PathDetails]]], | ||
database: Database, | ||
handlers: Tuple[Callable], | ||
logger: Logger | ||
): | ||
"""Upload CSVs to the database and archive them using the specified handlers. | ||
|
||
:path_details: output from CsvImporter.find*_csv_files | ||
|
||
:database: an open connection to the epidata database | ||
|
||
:handlers: functions for archiving (successful, failed) files | ||
|
||
:return: the number of modified rows | ||
""" | ||
archive_as_successful, archive_as_failed = handlers | ||
total_modified_row_count = 0 | ||
# iterate over each file | ||
for path, details in path_details: | ||
logger.info(event='handling', dest=path) | ||
path_src, filename = os.path.split(path) | ||
|
||
# file path or name was invalid, source is unknown | ||
if not details: | ||
archive_as_failed(path_src, filename, 'unknown',logger) | ||
continue | ||
|
||
csv_rows = CsvImporter.load_csv(path, details) | ||
rows_list = list(csv_rows) | ||
all_rows_valid = rows_list and all(r is not None for r in rows_list) | ||
if all_rows_valid: | ||
try: | ||
modified_row_count = database.insert_or_update_bulk(rows_list) | ||
logger.info(f"insert_or_update_bulk {filename} returned {modified_row_count}") | ||
logger.info( | ||
"Inserted database rows", | ||
row_count = modified_row_count, | ||
source = details.source, | ||
signal = details.signal, | ||
geo_type = details.geo_type, | ||
time_value = details.time_value, | ||
issue = details.issue, | ||
lag = details.lag | ||
) | ||
if modified_row_count is None or modified_row_count: # else would indicate zero rows inserted | ||
total_modified_row_count += (modified_row_count if modified_row_count else 0) | ||
database.commit() | ||
except DBLoadStateException as e: | ||
# if the db is in a state that is not fit for loading new data, | ||
# then we should stop processing any more files | ||
raise e | ||
except Exception as e: | ||
all_rows_valid = False | ||
logger.exception('exception while inserting rows', exc_info=e) | ||
database.rollback() | ||
|
||
# archive the current file based on validation results | ||
if all_rows_valid: | ||
archive_as_successful(path_src, filename, details.source, logger) | ||
else: | ||
archive_as_failed(path_src, filename, details.source, logger) | ||
|
||
return total_modified_row_count | ||
|
||
|
||
def main(args): | ||
"""Find, parse, and upload covidcast signals.""" | ||
|
||
logger = get_structured_logger("csv_ingestion", filename=args.log_file) | ||
start_time = time.time() | ||
|
||
# shortcut escape without hitting db if nothing to do | ||
path_details = collect_files(args.data_dir, args.specific_issue_date) | ||
if not path_details: | ||
logger.info('nothing to do; exiting...') | ||
return | ||
|
||
logger.info("Ingesting CSVs", csv_count = len(path_details)) | ||
|
||
database = Database() | ||
database.connect() | ||
|
||
try: | ||
modified_row_count = upload_archive( | ||
path_details, | ||
database, | ||
make_handlers(args.data_dir, args.specific_issue_date), | ||
logger | ||
) | ||
logger.info("Finished inserting/updating database rows", row_count = modified_row_count) | ||
finally: | ||
database.do_analyze() | ||
# unconditionally commit database changes since CSVs have been archived | ||
database.disconnect(True) | ||
|
||
logger.info( | ||
"Ingested CSVs into database", | ||
total_runtime_in_seconds=round(time.time() - start_time, 2)) | ||
|
||
|
||
if __name__ == '__main__': | ||
main(get_argument_parser().parse_args()) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I implemented
extract_and_check_row
,validate_missing_code
,validate_quantity
and other validations with pandas. I found the only way to createCovidcastRows
- with itertuples.Not sure if we need to save
@staticmethod
s it makes no sense if we not using them somewhere else