Skip to content

Make validation class pass the linter #578

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Dec 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions validator/.pylintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

[MESSAGES CONTROL]

disable=logging-format-interpolation,
too-many-locals,
too-many-arguments,
# Allow pytest functions to be part of a class.
no-self-use,
# Allow pytest classes to have one test.
too-few-public-methods

[BASIC]

# Allow arbitrarily short-named variables.
variable-rgx=[a-z_][a-z0-9_]*
argument-rgx=[a-z_][a-z0-9_]*
attr-rgx=[a-z_][a-z0-9_]*

[DESIGN]

# Don't complain about pytest "unused" arguments.
ignored-argument-names=(_.*|run_as_module)
127 changes: 123 additions & 4 deletions validator/delphi_validator/datafetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,80 @@
"""

import re
import threading
from os import listdir
from os.path import isfile, join
from itertools import product
import pandas as pd
import numpy as np

import covidcast
from .errors import APIDataFetchError
from .errors import APIDataFetchError, ValidationError

filename_regex = re.compile(
FILENAME_REGEX = re.compile(
r'^(?P<date>\d{8})_(?P<geo_type>\w+?)_(?P<signal>\w+)\.csv$')


def make_date_filter(start_date, end_date):
"""
Create a function to return a boolean of whether a filename of appropriate
format contains a date within (inclusive) the specified date range.

Arguments:
- start_date: datetime date object
- end_date: datetime date object

Returns:
- Custom function object
"""
# Convert dates from datetime format to int.
start_code = int(start_date.strftime("%Y%m%d"))
end_code = int(end_date.strftime("%Y%m%d"))

def custom_date_filter(match):
"""
Return a boolean of whether a filename of appropriate format contains a date
within the specified date range.

Arguments:
- match: regex match object based on FILENAME_REGEX applied to a filename str

Returns:
- boolean
"""
# If regex match doesn't exist, current filename is not an appropriately
# formatted source data file.
if not match:
return False

# Convert date found in CSV name to int.
code = int(match.groupdict()['date'])

# Return boolean True if current file date "code" is within the defined date range.
return start_code <= code <= end_code

return custom_date_filter


def load_all_files(export_dir, start_date, end_date):
"""Load all files in a directory.
Parameters
----------
export_dir: str
directory from which to load files

Returns
-------
loaded_data: List[Tuple(str, re.match, pd.DataFrame)]
triples of filenames, filename matches with the geo regex, and the data from the file
"""
export_files = read_filenames(export_dir)
date_filter = make_date_filter(start_date, end_date)

# Make list of tuples of CSV names and regex match objects.
return [(f, m, load_csv(join(export_dir, f))) for (f, m) in export_files if date_filter(m)]


def read_filenames(path):
"""
Return a list of tuples of every filename and regex match to the CSV filename
Expand All @@ -28,7 +89,7 @@ def read_filenames(path):
Returns:
- list of tuples
"""
daily_filenames = [(f, filename_regex.match(f))
daily_filenames = [(f, FILENAME_REGEX.match(f))
for f in listdir(path) if isfile(join(path, f))]
return daily_filenames

Expand Down Expand Up @@ -75,7 +136,7 @@ def fetch_api_reference(data_source, start_date, end_date, geo_type, signal_type
if not isinstance(api_df, pd.DataFrame):
custom_msg = "Error fetching data from " + str(start_date) + \
" to " + str(end_date) + \
"for data source: " + data_source + \
" for data source: " + data_source + \
", signal type: " + signal_type + \
", geo type: " + geo_type

Expand All @@ -95,3 +156,61 @@ def fetch_api_reference(data_source, start_date, end_date, geo_type, signal_type
).reindex(columns=column_names)

return api_df


def get_one_api_df(data_source, min_date, max_date,
geo_type, signal_type,
api_semaphore, dict_lock, output_dict):
"""
Pull API data for a single geo type-signal combination. Raises
error if data couldn't be retrieved. Saves data to data dict.
"""
api_semaphore.acquire()

# Pull reference data from API for all dates.
try:
geo_sig_api_df_or_error = fetch_api_reference(
data_source, min_date, max_date, geo_type, signal_type)

except APIDataFetchError as e:
geo_sig_api_df_or_error = ValidationError(
("api_data_fetch_error", geo_type, signal_type), None, e)

api_semaphore.release()

# Use a lock so only one thread can access the dictionary.
dict_lock.acquire()
output_dict[(geo_type, signal_type)] = geo_sig_api_df_or_error
dict_lock.release()


def threaded_api_calls(data_source, min_date, max_date, geo_signal_combos, n_threads=32):
"""
Get data from API for all geo-signal combinations in a threaded way
to save time.
"""
if n_threads > 32:
n_threads = 32
print("Warning: Don't run more than 32 threads at once due "
+ "to API resource limitations")

output_dict = dict()
dict_lock = threading.Lock()
api_semaphore = threading.Semaphore(value=n_threads)

thread_objs = [threading.Thread(
target=get_one_api_df, args=(data_source, min_date, max_date,
geo_type, signal_type,
api_semaphore,
dict_lock, output_dict)
) for geo_type, signal_type in geo_signal_combos]

# Start all threads.
for thread in thread_objs:
thread.start()

# Wait until all threads are finished.
for thread in thread_objs:
thread.join()

return output_dict
1 change: 1 addition & 0 deletions validator/delphi_validator/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ def __init__(self, check_data_id, expression, message):
check_data_id, tuple) and not isinstance(check_data_id, list) else tuple(check_data_id)
self.expression = expression
self.message = message
super().__init__(self.check_data_id, self.expression, self.message)
1 change: 1 addition & 0 deletions validator/delphi_validator/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@


def run_module():
"""Run the validator as a module."""
parent_params = read_params()
params = parent_params['validation']

Expand Down
47 changes: 47 additions & 0 deletions validator/delphi_validator/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Utility functions for validation."""
from datetime import datetime
import pandas as pd

# Recognized geo types.
GEO_REGEX_DICT = {
'county': r'^\d{5}$',
'hrr': r'^\d{1,3}$',
'msa': r'^\d{5}$',
'dma': r'^\d{3}$',
'state': r'^[a-zA-Z]{2}$',
'national': r'^[a-zA-Z]{2}$'
}


def relative_difference_by_min(x, y):
"""
Calculate relative difference between two numbers.
"""
return (x - y) / min(x, y)


def aggregate_frames(frames_list):
"""Aggregates a list of data frames into a single frame.

Parameters
----------
frames_list: List[Tuple(str, re.match, pd.DataFrame)]
triples of filenames, filename matches with the geo regex, and the data from the file

Returns
-------
A pd.DataFrame concatenation of all data frames in `frames_list` with additional columns for
geo_type, time_value, and signal derived from the corresponding re.match.
"""
all_frames = []
for _, match, data_df in frames_list:
df = data_df.copy()
# Get geo_type, date, and signal name as specified by CSV name.
df['geo_type'] = match.groupdict()['geo_type']
df['time_value'] = datetime.strptime(
match.groupdict()['date'], "%Y%m%d").date()
df['signal'] = match.groupdict()['signal']

all_frames.append(df)

return pd.concat(all_frames).reset_index(drop=True)
Loading