Skip to content

Commit 732d696

Browse files
authored
Merge pull request #427 from cmu-delphi/large_spikes_validator
Large spikes validator
2 parents 1304e6f + ec44327 commit 732d696

File tree

4 files changed

+369
-13
lines changed

4 files changed

+369
-13
lines changed

validator/PLANS.md

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
* Most recent date seen in source data is not older than most recent date seen in reference data
2323
* Similar number of obs per day as recent API data (static threshold)
2424
* Similar average value as API data (static threshold)
25+
* Outliers in cases and deaths signals using [this method](https://github.com/cmu-delphi/covidcast-forecast/tree/dev/corrections/data_corrections)
2526
* Source data for specified date range is empty
2627
* API data for specified date range is empty
2728

@@ -44,6 +45,9 @@
4445

4546
### Larger issues
4647

48+
* Set up validator to use Sir-complains-a-lot alerting functionality on a signal-by-signal basis (should send alert output as a slack message and "@" a set person), as a stop-gap before the logging server is ready
49+
* This is [how Sir-CAL works](https://github.com/benjaminysmith/covidcast-indicators/blob/main/sir_complainsalot/delphi_sir_complainsalot/run.py)
50+
* [Example output](https://delphi-org.slack.com/archives/C01E81A3YKF/p1605793508000100)
4751
* Expand framework to support nchs_mortality, which is provided on a weekly basis and has some differences from the daily data. E.g. filenames use a different format ("weekly_YYYYWW_geotype_signalname.csv")
4852
* Make backtesting framework so new checks can be run individually on historical indicator data to tune false positives, output verbosity, understand frequency of error raising, etc. Should pull data from API the first time and save locally in `cache` dir.
4953
* Add DETAILS.md doc with detailed descriptions of what each check does and how. Will be especially important for statistical/anomaly detection checks.
@@ -52,20 +56,18 @@
5256
* Easier suppression of many errors at once
5357
* Maybe store errors as dict of dicts. Keys could be check strings (e.g. "check_bad_se"), then next layer geo type, etc
5458
* Nicer formatting for error “report”.
59+
* Potentially set `__print__()` method in ValidationError class
5560
* E.g. if a single type of error is raised for many different datasets, summarize all error messages into a single message? But it still has to be clear how to suppress each individually
5661
* Check for erratic data sources that wrongly report all zeroes
5762
* E.g. the error with the Wisconsin data for the 10/26 forecasts
5863
* Wary of a purely static check for this
5964
* Are there any geo regions where this might cause false positives? E.g. small counties or MSAs, certain signals (deaths, since it's << cases)
6065
* This test is partially captured by checking avgs in source vs reference data, unless erroneous zeroes continue for more than a week
61-
* Also partially captured by outlier checking. If zeroes aren't outliers, then it's hard to say that they're erroneous at all.
62-
* Outlier detection (in progress)
63-
* Current approach is tuned to daily cases and daily deaths; use just on those signals?
64-
* prophet (package) detection is flexible, but needs 2-3 months historical data to fit on. May make sense to use if other statistical checks also need that much data.
66+
* Also partially captured by outlier checking, depending on `size_cut` setting. If zeroes aren't outliers, then it's hard to say that they're erroneous at all.
6567
* Use known erroneous/anomalous days of source data to tune static thresholds and test behavior
6668
* If can't get data from API, do we want to use substitute data for the comparative checks instead?
67-
* E.g. most recent successful API pull -- might end up being a couple weeks older
6869
* Currently, any API fetch problems just doesn't do comparative checks at all.
70+
* E.g. most recent successful API pull -- might end up being a couple weeks older
6971
* Improve performance and reduce runtime (no particular goal, just avoid being painfully slow!)
7072
* Profiling (iterate)
7173
* Save intermediate files?
@@ -87,3 +89,4 @@
8789
* Raise errors when one p-value (per geo region, e.g.) is significant OR when a bunch of p-values for that same type of test (different geo regions, e.g.) are "close" to significant
8890
* Correct p-values for multiple testing
8991
* Bonferroni would be easy but is sensitive to choice of "family" of tests; Benjamimi-Hochberg is a bit more involved but is less sensitive to choice of "family"; [comparison of the two](https://delphi-org.slack.com/archives/D01A9KNTPKL/p1603294915000500)
92+
* Use prophet package? Would require 2-3 months of API data.

validator/delphi_validator/datafetcher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def fetch_api_reference(data_source, start_date, end_date, geo_type, signal_type
9191
).rename(
9292
columns={'geo_value': "geo_id", 'stderr': 'se', 'value': 'val'}
9393
).drop(
94-
['direction', 'issue', 'lag'], axis=1
94+
['issue', 'lag'], axis=1
9595
).reindex(columns=column_names)
9696

9797
return api_df

validator/delphi_validator/validate.py

Lines changed: 159 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
from os.path import join
1010
from datetime import date, datetime, timedelta
1111
import pandas as pd
12-
1312
from .errors import ValidationError, APIDataFetchError
1413
from .datafetcher import filename_regex, \
1514
read_filenames, load_csv, get_geo_signal_combos, \
@@ -608,18 +607,154 @@ def check_rapid_change_num_rows(self, df_to_test, df_to_reference, checking_date
608607

609608
self.increment_total_checks()
610609

610+
def check_positive_negative_spikes(self, source_df, api_frames, geo, sig):
611+
"""
612+
Adapt Dan's corrections package to Python (only consider spikes) :
613+
https://github.com/cmu-delphi/covidcast-forecast/tree/dev/corrections/data_corrections
614+
615+
Statistics for a right shifted rolling window and a centered rolling window are used
616+
to determine outliers for both positive and negative spikes.
617+
618+
As it is now, ststat will always be NaN for source frames.
619+
620+
Arguments:
621+
- source_df: pandas dataframe of CSV source data
622+
- api_frames: pandas dataframe of reference data, either from the
623+
COVIDcast API or semirecent data
624+
- geo: str; geo type name (county, msa, hrr, state) as in the CSV name
625+
- sig: str; signal name as in the CSV name
626+
627+
"""
628+
self.increment_total_checks()
629+
# Combine all possible frames so that the rolling window calculations make sense.
630+
source_frame_start = source_df["time_value"].min()
631+
source_frame_end = source_df["time_value"].max()
632+
api_frames_end = min(api_frames["time_value"].max(
633+
), source_frame_start-timedelta(days=1))
634+
all_frames = pd.concat([api_frames, source_df]). \
635+
drop_duplicates(subset=["geo_id", "time_value"], keep='last'). \
636+
sort_values(by=['time_value']).reset_index(drop=True)
637+
if "index" in all_frames.columns:
638+
all_frames = all_frames.drop(columns=["index"])
639+
# Tuned Variables from Dan's Code for flagging outliers. Size_cut is a
640+
# check on the minimum value reported, sig_cut is a check
641+
# on the ftstat or ststat reported (t-statistics) and sig_consec
642+
# is a lower check for determining outliers that are next to each other.
643+
size_cut = 20
644+
sig_cut = 3
645+
sig_consec = 2.25
646+
647+
# Functions mapped to rows to determine outliers based on fstat and ststat values
648+
649+
def outlier_flag(frame):
650+
if (abs(frame["val"]) > size_cut) and not (pd.isna(frame["ststat"])) \
651+
and (frame["ststat"] > sig_cut):
652+
return 1
653+
if (abs(frame["val"]) > size_cut) and (pd.isna(frame["ststat"])) and \
654+
not (pd.isna(frame["ftstat"])) and (frame["ftstat"] > sig_cut):
655+
return 1
656+
if (frame["val"] < -size_cut) and not (pd.isna(frame["ststat"])) and \
657+
not pd.isna(frame["ftstat"]):
658+
return 1
659+
return 0
660+
661+
def outlier_nearby(frame):
662+
if (not pd.isna(frame['ststat'])) and (frame['ststat'] > sig_consec):
663+
return 1
664+
if pd.isna(frame['ststat']) and (frame['ftstat'] > sig_consec):
665+
return 1
666+
return 0
667+
668+
# Calculate ftstat and ststat values for the rolling windows, group fames by geo region
669+
region_group = all_frames.groupby("geo_id")
670+
window_size = 14
671+
shift_val = 0
672+
673+
# Shift the window to match how R calculates rolling windows with even numbers
674+
if window_size % 2 == 0:
675+
shift_val = -1
676+
677+
# Calculate the t-statistics for the two rolling windows (windows center and windows right)
678+
all_full_frames = []
679+
for _, group in region_group:
680+
rolling_windows = group["val"].rolling(
681+
window_size, min_periods=window_size)
682+
center_windows = group["val"].rolling(
683+
window_size, min_periods=window_size, center=True)
684+
fmedian = rolling_windows.median()
685+
smedian = center_windows.median().shift(shift_val)
686+
fsd = rolling_windows.std() + 0.00001 # if std is 0
687+
ssd = center_windows.std().shift(shift_val) + 0.00001 # if std is 0
688+
vals_modified_f = group["val"] - fmedian.fillna(0)
689+
vals_modified_s = group["val"] - smedian.fillna(0)
690+
ftstat = abs(vals_modified_f)/fsd
691+
ststat = abs(vals_modified_s)/ssd
692+
group['ftstat'] = ftstat
693+
group['ststat'] = ststat
694+
all_full_frames.append(group)
695+
696+
all_frames = pd.concat(all_full_frames)
697+
# Determine outliers in source frames only, only need the reference
698+
# data from just before the start of the source data
699+
# because lead and lag outlier calculations are only one day
700+
outlier_df = all_frames.query(
701+
'time_value >= @api_frames_end & time_value <= @source_frame_end')
702+
outlier_df = outlier_df.sort_values(by=['geo_id', 'time_value']) \
703+
.reset_index(drop=True).copy()
704+
outlier_df["flag"] = 0
705+
outlier_df["flag"] = outlier_df.apply(outlier_flag, axis=1)
706+
outliers = outlier_df[outlier_df["flag"] == 1]
707+
outliers_reset = outliers.copy().reset_index(drop=True)
708+
709+
# Find the lead outliers and the lag outliers. Check that the selected row
710+
# is actually a leading and lagging row for given geo_id
711+
upper_index = list(filter(lambda x: x < outlier_df.shape[0],
712+
list(outliers.index+1)))
713+
upper_df = outlier_df.iloc[upper_index, :].reset_index(drop=True)
714+
upper_compare = outliers_reset[:len(upper_index)]
715+
sel_upper_df = upper_df[upper_compare["geo_id"]
716+
== upper_df["geo_id"]].copy()
717+
lower_index = list(filter(lambda x: x >= 0, list(outliers.index-1)))
718+
lower_df = outlier_df.iloc[lower_index, :].reset_index(drop=True)
719+
lower_compare = outliers_reset[-len(lower_index) :].reset_index(drop=True)
720+
sel_lower_df = lower_df[lower_compare["geo_id"]
721+
== lower_df["geo_id"]].copy()
722+
723+
sel_upper_df["flag"] = 0
724+
sel_lower_df["flag"] = 0
725+
726+
sel_upper_df["flag"] = sel_upper_df.apply(outlier_nearby, axis=1)
727+
sel_lower_df["flag"] = sel_lower_df.apply(outlier_nearby, axis=1)
728+
729+
upper_outliers = sel_upper_df[sel_upper_df["flag"] == 1]
730+
lower_outliers = sel_lower_df[sel_lower_df["flag"] == 1]
731+
732+
all_outliers = pd.concat([outliers, upper_outliers, lower_outliers]). \
733+
sort_values(by=['time_value', 'geo_id']). \
734+
drop_duplicates().reset_index(drop=True)
735+
736+
# Identify outliers just in the source data
737+
source_outliers = all_outliers.query(
738+
"time_value >= @source_frame_start & time_value <= @source_frame_end")
739+
740+
if source_outliers.shape[0] > 0:
741+
self.raised_errors.append(ValidationError(
742+
("check_positive_negative_spikes",
743+
source_frame_start, source_frame_end, geo, sig),
744+
(source_outliers),
745+
'Source dates with flagged ouliers based on the \
746+
previous 14 days of data available'))
747+
611748
def check_avg_val_vs_reference(self, df_to_test, df_to_reference, checking_date, geo_type,
612749
signal_type):
613750
"""
614751
Compare average values for each variable in test dataframe vs reference dataframe.
615-
616752
Arguments:
617753
- df_to_test: pandas dataframe of CSV source data
618754
- df_to_reference: pandas dataframe of reference data, either from the
619755
COVIDcast API or semirecent data
620756
- geo_type: str; geo type name (county, msa, hrr, state) as in the CSV name
621757
- signal_type: str; signal name as in the CSV name
622-
623758
Returns:
624759
- None
625760
"""
@@ -731,13 +866,14 @@ def validate(self, export_dir):
731866
Returns:
732867
- None
733868
"""
869+
734870
# Get relevant data file names and info.
871+
735872
export_files = read_filenames(export_dir)
736873
date_filter = make_date_filter(self.start_date, self.end_date)
737874

738875
# Make list of tuples of CSV names and regex match objects.
739876
validate_files = [(f, m) for (f, m) in export_files if date_filter(m)]
740-
741877
self.check_missing_date_files(validate_files)
742878
self.check_settings()
743879

@@ -747,7 +883,6 @@ def validate(self, export_dir):
747883
# For every daily file, read in and do some basic format and value checks.
748884
for filename, match in validate_files:
749885
data_df = load_csv(join(export_dir, filename))
750-
751886
self.check_df_format(data_df, filename)
752887
self.check_bad_geo_id_format(
753888
data_df, filename, match.groupdict()['geo_type'])
@@ -781,12 +916,14 @@ def validate(self, export_dir):
781916
date_list = [self.start_date + timedelta(days=days)
782917
for days in range(self.span_length.days + 1)]
783918

919+
# Get 14 days prior to the earliest list date
920+
outlier_lookbehind = timedelta(days=14)
921+
784922
# Get all expected combinations of geo_type and signal.
785923
geo_signal_combos = get_geo_signal_combos(self.data_source)
786924

787925
all_api_df = self.threaded_api_calls(
788-
self.start_date - min(semirecent_lookbehind,
789-
self.max_check_lookbehind),
926+
self.start_date - outlier_lookbehind,
790927
self.end_date, geo_signal_combos)
791928

792929
# Keeps script from checking all files in a test run.
@@ -821,6 +958,20 @@ def validate(self, export_dir):
821958
if geo_sig_api_df is None:
822959
continue
823960

961+
# Outlier dataframe
962+
if (signal_type in ["confirmed_7dav_cumulative_num", "confirmed_7dav_incidence_num",
963+
"confirmed_cumulative_num", "confirmed_incidence_num", "deaths_7dav_cumulative_num",
964+
"deaths_cumulative_num"]):
965+
earliest_available_date = geo_sig_df["time_value"].min()
966+
source_df = geo_sig_df.query(
967+
'time_value <= @date_list[-1] & time_value >= @date_list[0]')
968+
outlier_start_date = earliest_available_date - outlier_lookbehind
969+
outlier_end_date = earliest_available_date - timedelta(days=1)
970+
outlier_api_df = geo_sig_api_df.query(
971+
'time_value <= @outlier_end_date & time_value >= @outlier_start_date')
972+
self.check_positive_negative_spikes(
973+
source_df, outlier_api_df, geo_type, signal_type)
974+
824975
# Check data from a group of dates against recent (previous 7 days,
825976
# by default) data from the API.
826977
for checking_date in date_list:
@@ -872,6 +1023,7 @@ def validate(self, export_dir):
8721023
recent_df, reference_api_df, checking_date, geo_type, signal_type)
8731024

8741025
# Keeps script from checking all files in a test run.
1026+
8751027
if self.test_mode:
8761028
kroc += 1
8771029
if kroc == 2:

0 commit comments

Comments
 (0)