Skip to content

Commit 2ddcbee

Browse files
Merge branch 'main' of https://github.com/cmu-delphi/covidcast-indicators into even_more_logging
2 parents 5d3988a + 47bfd62 commit 2ddcbee

File tree

14 files changed

+161
-93
lines changed

14 files changed

+161
-93
lines changed

_delphi_utils_python/delphi_utils/geomap.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,17 @@ def fips_to_megacounty(
528528
data = data.reset_index().groupby([date_col, mega_col]).sum()
529529
return data.reset_index()
530530

531+
def as_mapper_name(self, geo_type, state="state_id"):
532+
"""
533+
Return the mapper equivalent of a region type.
534+
535+
Human-readable names like 'county' will return their mapper equivalents ('fips').
536+
"""
537+
if geo_type == "state":
538+
return state
539+
if geo_type == "county":
540+
return "fips"
541+
return geo_type
531542
def get_geo_values(self, geo_type):
532543
"""
533544
Return a set of all values for a given geography type.

changehc/delphi_changehc/config.py

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -54,26 +54,3 @@ class Config:
5454
7 # maximum number of days used to average a backfill correction
5555
)
5656
MIN_CUM_VISITS = 500 # need to observe at least 500 counts before averaging
57-
58-
59-
class Constants:
60-
"""
61-
Contains the maximum number of geo units for each geo type.
62-
63-
Used for sanity checks
64-
"""
65-
66-
# number of counties in usa, including megacounties
67-
NUM_COUNTIES = 3141 + 52
68-
NUM_HRRS = 308
69-
NUM_MSAS = 392 + 52 # MSA + States
70-
NUM_STATES = 52 # including DC and PR
71-
NUM_NATIONS = 1
72-
NUM_HHSS = 10
73-
74-
MAX_GEO = {"county": NUM_COUNTIES,
75-
"hrr": NUM_HRRS,
76-
"msa": NUM_MSAS,
77-
"state": NUM_STATES,
78-
"nation": NUM_NATIONS,
79-
"hhs": NUM_HHSS}

changehc/delphi_changehc/update_sensor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from delphi_utils import GeoMapper, read_params, add_prefix
1515

1616
# first party
17-
from .config import Config, Constants
17+
from .config import Config
1818
from .constants import SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI, NA
1919
from .sensor import CHCSensor
2020
from .weekday import Weekday
@@ -164,7 +164,7 @@ def geo_reindex(self, data):
164164
# for each location, fill in all missing dates with 0 values
165165
multiindex = pd.MultiIndex.from_product((unique_geo_ids, self.fit_dates),
166166
names=[geo, Config.DATE_COL])
167-
assert (len(multiindex) <= (Constants.MAX_GEO[geo] * len(self.fit_dates))
167+
assert (len(multiindex) <= (len(gmpr.get_geo_values(gmpr.as_mapper_name(geo))) * len(self.fit_dates))
168168
), "more loc-date pairs than maximum number of geographies x number of dates"
169169
# fill dataframe with missing dates using 0
170170
data_frame = data_frame.reindex(multiindex, fill_value=0)

changehc/tests/test_load_data.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,14 @@
22
import pytest
33

44
# third party
5-
from delphi_utils import read_params
5+
from delphi_utils import read_params, GeoMapper
66
import pandas as pd
77

88
# first party
9-
from delphi_changehc.config import Config, Constants
9+
from delphi_changehc.config import Config
1010
from delphi_changehc.load_data import *
1111

1212
CONFIG = Config()
13-
CONSTANTS = Constants()
1413
PARAMS = read_params()
1514
COVID_FILEPATH = PARAMS["input_covid_file"]
1615
DENOM_FILEPATH = PARAMS["input_denom_file"]
@@ -24,6 +23,7 @@ class TestLoadData:
2423
Config.COVID_COLS, Config.COVID_DTYPES, Config.COVID_COL)
2524
combined_data = load_combined_data(DENOM_FILEPATH, COVID_FILEPATH, DROP_DATE,
2625
"fips")
26+
gmpr = GeoMapper()
2727

2828
def test_base_unit(self):
2929
with pytest.raises(AssertionError):
@@ -78,7 +78,7 @@ def test_fips_values(self):
7878
self.combined_data]:
7979
assert (
8080
len(data.index.get_level_values(
81-
'fips').unique()) <= CONSTANTS.NUM_COUNTIES
81+
'fips').unique()) <= len(self.gmpr.get_geo_values("fips"))
8282
)
8383

8484
def test_combined_fips_values(self):

changehc/tests/test_update_sensor.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,10 @@
1515
from delphi_utils import read_params
1616

1717
# first party
18-
from delphi_changehc.config import Config, Constants
18+
from delphi_changehc.config import Config
1919
from delphi_changehc.update_sensor import write_to_csv, CHCSensorUpdator
2020

2121
CONFIG = Config()
22-
CONSTANTS = Constants()
2322
PARAMS = read_params()
2423
COVID_FILEPATH = PARAMS["input_covid_file"]
2524
DENOM_FILEPATH = PARAMS["input_denom_file"]
Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,22 @@
11
"""Registry for constants."""
22
# global constants
33
METRICS = [
4-
"covid_deaths", "total_deaths", "percent_of_expected_deaths",
5-
"pneumonia_deaths", "pneumonia_and_covid_deaths", "influenza_deaths",
4+
"covid_19_deaths", "total_deaths", "percent_of_expected_deaths",
5+
"pneumonia_deaths", "pneumonia_and_covid_19_deaths", "influenza_deaths",
66
"pneumonia_influenza_or_covid_19_deaths"
77
]
8+
RENAME = [
9+
("start_week", "timestamp"),
10+
("start_date", "timestamp"),
11+
("covid_deaths", "covid_19_deaths"),
12+
("pneumonia_and_covid_deaths", "pneumonia_and_covid_19_deaths")
13+
]
814
SENSOR_NAME_MAP = {
9-
"covid_deaths": "deaths_covid_incidence",
15+
"covid_19_deaths": "deaths_covid_incidence",
1016
"total_deaths": "deaths_allcause_incidence",
1117
"percent_of_expected_deaths": "deaths_percent_of_expected",
1218
"pneumonia_deaths": "deaths_pneumonia_notflu_incidence",
13-
"pneumonia_and_covid_deaths": "deaths_covid_and_pneumonia_notflu_incidence",
19+
"pneumonia_and_covid_19_deaths": "deaths_covid_and_pneumonia_notflu_incidence",
1420
"influenza_deaths": "deaths_flu_incidence",
1521
"pneumonia_influenza_or_covid_19_deaths": "deaths_pneumonia_or_flu_or_covid_incidence"
1622
}
@@ -20,3 +26,8 @@
2026
]
2127
INCIDENCE_BASE = 100000
2228
GEO_RES = "state"
29+
30+
# this is necessary as a delimiter in the f-string expressions we use to
31+
# construct detailed error reports
32+
# (https://www.python.org/dev/peps/pep-0498/#escape-sequences)
33+
NEWLINE = "\n"

nchs_mortality/delphi_nchs_mortality/pull.py

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,18 @@
33
import numpy as np
44
import pandas as pd
55
from sodapy import Socrata
6-
from .constants import METRICS
6+
from .constants import METRICS, RENAME, NEWLINE
7+
8+
def standardize_columns(df):
9+
"""Rename columns to comply with a standard set.
10+
11+
NCHS has changed column names a few times, so this will help us maintain
12+
backwards-compatibility without the processing code getting all gnarly.
13+
"""
14+
rename_pairs = [(from_col, to_col) for (from_col, to_col) in RENAME
15+
if from_col in df.columns]
16+
return df.rename(columns=dict(rename_pairs))
17+
718

819
def pull_nchs_mortality_data(token: str, map_df: pd.DataFrame, test_mode: str):
920
"""Pull the latest NCHS Mortality data, and conforms it into a dataset.
@@ -42,25 +53,44 @@ def pull_nchs_mortality_data(token: str, map_df: pd.DataFrame, test_mode: str):
4253
# Pull data from Socrata API
4354
client = Socrata("data.cdc.gov", token)
4455
results = client.get("r8kw-7aab", limit=10**10)
45-
df = pd.DataFrame.from_records(results).rename(
46-
{"start_week": "timestamp"}, axis=1)
56+
df = pd.DataFrame.from_records(results)
57+
# drop "By Total" rows
58+
df = df[df["group"].transform(str.lower) == "by week"]
4759
else:
4860
df = pd.read_csv("./test_data/%s"%test_mode)
4961

50-
# Check missing start_week == end_week
51-
try:
52-
assert sum(df["timestamp"] != df["end_week"]) == 0
53-
except AssertionError as exc:
54-
raise ValueError(
55-
"end_week is not always the same as start_week, check the raw file"
56-
) from exc
62+
df = standardize_columns(df)
63+
64+
if "end_date" in df.columns:
65+
# Check missing week_ending_date == end_date
66+
try:
67+
assert all(df["week_ending_date"] == df["end_date"])
68+
except AssertionError as exc:
69+
raise ValueError(
70+
"week_ending_date is not always the same as end_date, check the raw file"
71+
) from exc
72+
else:
73+
# Check missing start_week == end_week
74+
try:
75+
assert all(df["timestamp"] == df["end_week"])
76+
except AssertionError as exc:
77+
raise ValueError(
78+
"end_week is not always the same as start_week, check the raw file"
79+
) from exc
5780

5881
try:
5982
df = df.astype(type_dict)
6083
except KeyError as exc:
61-
raise ValueError("Expected column(s) missed, The dataset "
62-
"schema may have changed. Please investigate and "
63-
"amend the code.") from exc
84+
raise ValueError(f"""
85+
Expected column(s) missed, The dataset schema may
86+
have changed. Please investigate and amend the code.
87+
88+
Columns needed:
89+
{NEWLINE.join(type_dict.keys())}
90+
91+
Columns available:
92+
{NEWLINE.join(df.columns)}
93+
""") from exc
6494

6595
# Drop rows for locations outside US
6696
df = df[df["state"] != "United States"]

nchs_mortality/delphi_nchs_mortality/run.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
"""
77
from datetime import datetime, date, timedelta
88
from os.path import join
9+
import time
910

1011
import numpy as np
1112
import pandas as pd
12-
import time
1313
from delphi_utils import read_params, S3ArchiveDiffer, get_structured_logger
1414

1515
from .pull import pull_nchs_mortality_data
@@ -34,11 +34,13 @@ def run_module():
3434
token = params["token"]
3535
test_mode = params["mode"]
3636

37-
daily_arch_diff = S3ArchiveDiffer(
38-
daily_cache_dir, daily_export_dir,
39-
params["bucket_name"], "nchs_mortality",
40-
params["aws_credentials"])
41-
daily_arch_diff.update_cache()
37+
if params["bucket_name"]:
38+
daily_arch_diff = S3ArchiveDiffer(
39+
daily_cache_dir, daily_export_dir,
40+
params["bucket_name"], "nchs_mortality",
41+
params["aws_credentials"])
42+
daily_arch_diff.update_cache()
43+
4244

4345
map_df = pd.read_csv(
4446
join(static_file_dir, "state_pop.csv"), dtype={"fips": int}
@@ -87,8 +89,9 @@ def run_module():
8789
# Daily run of archiving utility
8890
# - Uploads changed files to S3
8991
# - Does not export any issues into receiving
90-
arch_diffs(params, daily_arch_diff)
91-
92+
if params["bucket_name"]:
93+
arch_diffs(params, daily_arch_diff)
94+
9295
elapsed_time_in_seconds = round(time.time() - start_time, 2)
9396
logger.info("Completed indicator run",
9497
elapsed_time_in_seconds = elapsed_time_in_seconds)

nchs_mortality/tests/test_data/test_data.csv

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
data_as_of,timestamp,end_week,group,state,indicator,covid_deaths,total_deaths,percent_of_expected_deaths,pneumonia_deaths,pneumonia_and_covid_deaths,influenza_deaths,pneumonia_influenza_or_covid_19_deaths,footnote
1+
data_as_of,start_week,end_week,group,state,indicator,covid_deaths,total_deaths,percent_of_expected_deaths,pneumonia_deaths,pneumonia_and_covid_19_deaths,influenza_deaths,pneumonia_influenza_or_covid_19_deaths,footnote
22
2020-09-09T00:00:00.000,2020-02-01T00:00:00.000,2020-02-01T00:00:00.000,By week,United States,Week-ending,0,58570,0.99,3796,0,479,4275,
33
2020-09-09T00:00:00.000,2020-02-08T00:00:00.000,2020-02-08T00:00:00.000,By week,United States,Week-ending,1,59286,0.99,3798,0,520,4319,
44
2020-09-09T00:00:00.000,2020-02-15T00:00:00.000,2020-02-15T00:00:00.000,By week,United States,Week-ending,0,58691,1,3824,0,558,4382,

nchs_mortality/tests/test_pull.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import pandas as pd
66
from delphi_utils import read_params
77

8-
from delphi_nchs_mortality.pull import pull_nchs_mortality_data
8+
from delphi_nchs_mortality.pull import pull_nchs_mortality_data, standardize_columns
99
from delphi_nchs_mortality.constants import METRICS
1010

1111
params = read_params()
@@ -19,18 +19,35 @@
1919
)
2020

2121
class TestPullNCHS:
22+
def test_standardize_columns(self):
23+
df = standardize_columns(
24+
pd.DataFrame({
25+
"start_week": [1],
26+
"covid_deaths": [2],
27+
"pneumonia_and_covid_deaths": [4],
28+
"pneumonia_influenza_or_covid_19_deaths": [8]
29+
}))
30+
expected = pd.DataFrame({
31+
"timestamp": [1],
32+
"covid_19_deaths": [2],
33+
"pneumonia_and_covid_19_deaths": [4],
34+
"pneumonia_influenza_or_covid_19_deaths": [8]
35+
})
36+
pd.testing.assert_frame_equal(expected, df)
37+
2238
def test_good_file(self):
2339
df = pull_nchs_mortality_data(token, map_df, "test_data.csv")
2440

2541
# Test columns
2642
assert (df.columns.values == [
27-
'covid_deaths', 'total_deaths', 'percent_of_expected_deaths',
28-
'pneumonia_deaths', 'pneumonia_and_covid_deaths',
43+
'covid_19_deaths', 'total_deaths', 'percent_of_expected_deaths',
44+
'pneumonia_deaths', 'pneumonia_and_covid_19_deaths',
2945
'influenza_deaths', 'pneumonia_influenza_or_covid_19_deaths',
3046
"timestamp", "geo_id", "population"]).all()
3147

3248
# Test aggregation for NYC and NY
33-
raw_df = pd.read_csv("./test_data/test_data.csv", parse_dates=["timestamp"])
49+
raw_df = pd.read_csv("./test_data/test_data.csv", parse_dates=["start_week"])
50+
raw_df = standardize_columns(raw_df)
3451
for metric in METRICS:
3552
ny_list = raw_df.loc[(raw_df["state"] == "New York")
3653
& (raw_df[metric].isnull()), "timestamp"].values
@@ -62,4 +79,4 @@ def test_bad_file_with_inconsistent_time_col(self):
6279
"bad_data_with_missing_cols.csv")
6380

6481

65-
82+

validator/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ Please update the follow settings:
5555
* `data_source`: should match the [formatting](https://cmu-delphi.github.io/delphi-epidata/api/covidcast_signals.html) as used in COVIDcast API calls
5656
* `end_date`: specifies the last date to be checked; if set to "latest", `end_date` will always be the current date
5757
* `span_length`: specifies the number of days before the `end_date` to check. `span_length` should be long enough to contain all recent source data that is still in the process of being updated (i.e. in the backfill period), for example, if the data source of interest has a 2-week lag before all reports are in for a given date, `scan_length` should be 14 days
58-
* `suppressed_errors`: list of lists uniquely specifying errors that have been manually verified as false positives or acceptable deviations from expected
58+
* `suppressed_errors`: list of pairs of (`check_name`, `file_name`) uniquely specifying errors that have been manually verified as false positives or acceptable deviations from expected. Either value can also take on the value `*` to apply to all check or file names.
5959
* `test_mode`: boolean; `true` checks only a small number of data files
6060
* `static`: settings for validations that don't require comparison with external COVIDcast API data
6161
* `minimum_sample_size` (default: 100): threshold for flagging small sample sizes as invalid

validator/delphi_validator/errors.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,13 @@ def is_suppressed(self, suppressed_errors):
3636
errors_to_suppress: Set[Tuple[str]]
3737
set of (check_name, data_name) tuples to ignore.
3838
"""
39-
return (self.check_name, self.data_name) in suppressed_errors
39+
if (self.check_name, self.data_name) in suppressed_errors:
40+
return True
41+
if (self.check_name, "*") in suppressed_errors:
42+
return True
43+
if ("*", self.data_name) in suppressed_errors:
44+
return True
45+
return False
4046

4147
def __str__(self):
4248
return f"{self.check_name} failed for {self.data_name}: {self.message}"

0 commit comments

Comments
 (0)