diff --git a/ansible/templates/dsew_community_profile-params-prod.json.j2 b/ansible/templates/dsew_community_profile-params-prod.json.j2 index 8c03db852..68779098e 100644 --- a/ansible/templates/dsew_community_profile-params-prod.json.j2 +++ b/ansible/templates/dsew_community_profile-params-prod.json.j2 @@ -28,7 +28,9 @@ "naats_total_7dav", "naats_positivity_7dav", "confirmed_admissions_covid_1d_prop_7dav", - "confirmed_admissions_covid_1d_7dav" + "confirmed_admissions_covid_1d_7dav", + "doses_admin_7dav", + "booster_doses_admin_7dav" ] } } diff --git a/dsew_community_profile/delphi_dsew_community_profile/constants.py b/dsew_community_profile/delphi_dsew_community_profile/constants.py index 1404e52f4..00deffee6 100644 --- a/dsew_community_profile/delphi_dsew_community_profile/constants.py +++ b/dsew_community_profile/delphi_dsew_community_profile/constants.py @@ -51,22 +51,50 @@ class Transform: "total": { "is_rate" : False, "api_name": "naats_total_7dav", - "make_prop": False + "make_prop": False, + "cumulative" : False }, "positivity": { "is_rate" : True, "api_name": "naats_positivity_7dav", - "make_prop": False + "make_prop": False, + "cumulative" : False }, "confirmed covid-19 admissions": { "is_rate" : False, "api_name": "confirmed_admissions_covid_1d_7dav", "make_prop": True, - "api_prop_name": "confirmed_admissions_covid_1d_prop_7dav" + "api_prop_name": "confirmed_admissions_covid_1d_prop_7dav", + "cumulative" : False + }, + "fully vaccinated": { + "is_rate" : False, + "api_name": "people_full_vaccinated", + "make_prop": False, + "cumulative" : True + }, + "booster dose since": { + "is_rate" : False, + "api_name": "people_booster_doses", + "make_prop": False, + "cumulative" : True + }, + "booster doses administered": { + "is_rate" : False, + "api_name": "booster_doses_admin_7dav", + "make_prop": False, + "cumulative" : False + }, + "doses administered": { + "is_rate" : False, + "api_name": "doses_admin_7dav", + "make_prop": False, + "cumulative" : False } } -COUNTS_7D_SIGNALS = {key for key, value in SIGNALS.items() if not value["is_rate"]} +COUNTS_7D_SIGNALS = {key for key, value in SIGNALS.items() \ + if not((value["is_rate"]) or (value["cumulative"]))} def make_signal_name(key, is_prop=False): """Convert a signal key to the corresponding signal name for the API. diff --git a/dsew_community_profile/delphi_dsew_community_profile/pull.py b/dsew_community_profile/delphi_dsew_community_profile/pull.py index f2e88217b..6807f0c3c 100644 --- a/dsew_community_profile/delphi_dsew_community_profile/pull.py +++ b/dsew_community_profile/delphi_dsew_community_profile/pull.py @@ -32,6 +32,16 @@ rf'HOSPITAL UTILIZATION: (.*) WEEK \({DATE_RANGE_EXP}\)' ) +# example: "COVID-19 VACCINATION DATA: LAST WEEK (January 5-11)" +RE_DATE_FROM_VAC_HEADER_WEEK= re.compile( + rf'COVID-19 VACCINATION DATA: (.*) WEEK \({DATE_RANGE_EXP}\)' +) + +# example: 'COVID-19 VACCINATION DATA: CUMULATIVE (January 11)' +RE_DATE_FROM_VAC_HEADER_CUMULATIVE= re.compile( + rf'COVID-19 VACCINATION DATA: CUMULATIVE (.*)\({DATE_EXP}\)' +) + # example: "NAAT positivity rate - last 7 days (may be an underestimate due to delayed reporting)" # example: "Total NAATs - last 7 days (may be an underestimate due to delayed reporting)" RE_COLUMN_FROM_HEADER = re.compile('- (.*) 7 days') @@ -44,15 +54,27 @@ class DatasetTimes: positivity_reference_date: datetime.date total_reference_date: datetime.date hosp_reference_date: datetime.date + vac_reference_date: datetime.date + cumulative_vac_reference_date: datetime.date @staticmethod def from_header(header, publish_date): """Convert reference dates in overheader to DatasetTimes.""" - def as_date(sub_result): - month = sub_result[2] if sub_result[2] else sub_result[0] - assert month, f"Bad month in header: {header}\nsub_result: {sub_result}" - month_numeric = datetime.datetime.strptime(month, "%B").month - day = sub_result[3] + positivity_reference_date = None + total_reference_date = None + hosp_reference_date = None + vac_reference_date = None + cumulative_vac_reference_date= None + def as_date(sub_result, is_single_date): + if is_single_date: + month = sub_result[0] + day = sub_result[1] + month_numeric = datetime.datetime.strptime(month, "%B").month + else: + month = sub_result[2] if sub_result[2] else sub_result[0] + assert month, f"Bad month in header: {header}\nsub_result: {sub_result}" + month_numeric = datetime.datetime.strptime(month, "%B").month + day = sub_result[3] year = publish_date.year # year boundary if month_numeric > publish_date.month: @@ -62,51 +84,64 @@ def as_date(sub_result): if RE_DATE_FROM_TEST_HEADER.match(header): findall_result = RE_DATE_FROM_TEST_HEADER.findall(header)[0] column = findall_result[0].lower() - positivity_reference_date = as_date(findall_result[1:5]) + positivity_reference_date = as_date(findall_result[1:5], False) if findall_result[6]: # Reports published starting 2021-03-17 specify different reference # dates for positivity and total test volume - total_reference_date = as_date(findall_result[6:10]) + total_reference_date = as_date(findall_result[6:10], False) else: total_reference_date = positivity_reference_date - - hosp_reference_date = None elif RE_DATE_FROM_HOSP_HEADER.match(header): findall_result = RE_DATE_FROM_HOSP_HEADER.findall(header)[0] column = findall_result[0].lower() - hosp_reference_date = as_date(findall_result[1:5]) - - total_reference_date = None - positivity_reference_date = None + hosp_reference_date = as_date(findall_result[1:5], False) + elif RE_DATE_FROM_VAC_HEADER_WEEK.match(header): + findall_result = RE_DATE_FROM_VAC_HEADER_WEEK.findall(header)[0] + column = findall_result[0].lower() + vac_reference_date = as_date(findall_result[1:5], False) + elif RE_DATE_FROM_VAC_HEADER_CUMULATIVE.match(header): + findall_result = RE_DATE_FROM_VAC_HEADER_CUMULATIVE.findall(header)[0] + column = findall_result[0].lower() + cumulative_vac_reference_date = as_date(findall_result[1:], True) else: raise ValueError(f"Couldn't find reference date in header '{header}'") - return DatasetTimes(column, positivity_reference_date, - total_reference_date, hosp_reference_date) + total_reference_date, hosp_reference_date, + cumulative_vac_reference_date, vac_reference_date) def __getitem__(self, key): """Use DatasetTimes like a dictionary.""" + ref_list = list(SIGNALS.keys()) if key.lower()=="positivity": return self.positivity_reference_date if key.lower()=="total": return self.total_reference_date if key.lower()=="confirmed covid-19 admissions": return self.hosp_reference_date + if key.lower() in ["doses administered","booster doses administered"]: + return self.cumulative_vac_reference_date + if key.lower() in ["fully vaccinated","booster dose since"]: + return self.vac_reference_date raise ValueError( f"Bad reference date type request '{key}'; " + \ - "need 'total', 'positivity', or 'confirmed covid-19 admissions'" + "need one of: " + " ,".join(ref_list) ) def __setitem__(self, key, newvalue): """Use DatasetTimes like a dictionary.""" + ref_list = list(SIGNALS.keys()) if key.lower()=="positivity": self.positivity_reference_date = newvalue if key.lower()=="total": self.total_reference_date = newvalue if key.lower()=="confirmed covid-19 admissions": self.hosp_reference_date = newvalue - else: + if key.lower() in ["doses administered","booster doses administered"]: + self.cumulative_vac_reference_date = newvalue + if key.lower() in ["fully vaccinated","booster dose since"]: + self.vac_reference_date = newvalue + if key.lower() not in ref_list: raise ValueError( f"Bad reference date type request '{key}'; " + \ - "need 'total', 'positivity', or 'confirmed covid-19 admissions'" + "need one of: " + " ,".join(ref_list) ) def __eq__(self, other): """Check equality by value.""" @@ -164,14 +199,21 @@ def skip_overheader(header): # include "VIRAL (RT-PCR) LAB TESTING: [LAST|PREVIOUS] WEEK (August 24-30, ..." # include "HOSPITAL UTILIZATION: LAST WEEK (January 2-8)" return not (isinstance(header, str) and \ - (header.startswith("TESTING:") or \ + (((header.startswith("TESTING:") or \ header.startswith("VIRAL (RT-PCR) LAB TESTING:") or \ - header.startswith("HOSPITAL UTILIZATION:")) and \ + header.startswith("HOSPITAL UTILIZATION: ")) and \ # exclude "TESTING: % CHANGE FROM PREVIOUS WEEK" \ # exclude "TESTING: DEMOGRAPHIC DATA" \ # exclude "HOSPITAL UTILIZATION: CHANGE FROM PREVIOUS WEEK" \ # exclude "HOSPITAL UTILIZATION: DEMOGRAPHIC DATA" \ - header.find("WEEK (") > 0) + header.find("WEEK (") > 0) or \ + # include "COVID-19 VACCINATION DATA: CUMULATIVE (January 25)" + # include "COVID-19 VACCINATION DATA: LAST WEEK (January 25-31)" + (header.startswith("COVID-19 VACCINATION DATA: CUMULATIVE") or + header.startswith("COVID-19 VACCINATION DATA: LAST WEEK") \ + ))) + + def _parse_times_for_sheet(self, sheet): """Record reference dates for this sheet.""" # grab reference dates from overheaders @@ -198,21 +240,32 @@ def _parse_times_for_sheet(self, sheet): self.times[dt.column][sig] = dt[sig] else: self.times[dt.column] = dt - assert len(self.times) == 2, \ - f"No times extracted from overheaders:\n{NEWLINE.join(str(s) for s in overheaders)}" + + if self.publish_date <= datetime.date(2021, 1, 11): + # No vaccination data available, so we only have hospitalization and testing overheaders + assert len(self.times) == 2, \ + f"No times extracted from overheaders:\n{NEWLINE.join(str(s) for s in overheaders)}" + else: + assert len(self.times) == 3, \ + f"No times extracted from overheaders:\n{NEWLINE.join(str(s) for s in overheaders)}" @staticmethod def retain_header(header): """Ignore irrelevant headers.""" - return all([ + return ((all([ # include "Total NAATs - [last|previous] 7 days ..." # include "Total RT-PCR diagnostic tests - [last|previous] 7 days ..." # include "NAAT positivity rate - [last|previous] 7 days ..." # include "Viral (RT-PCR) lab test positivity rate - [last|previous] 7 days ..." + # include "Booster doses administered - [last|previous] 7 days ..." + # include "Doses administered - [last|previous] 7 days ..." (header.startswith("Total NAATs") or header.startswith("NAAT positivity rate") or header.startswith("Total RT-PCR") or - header.startswith("Viral (RT-PCR)")), + header.startswith("Viral (RT-PCR)") or + header.startswith("Booster") or + header.startswith("Doses administered -") + ), # exclude "NAAT positivity rate - absolute change ..." header.find("7 days") > 0, # exclude "NAAT positivity rate - last 7 days - ages <5" @@ -227,7 +280,25 @@ def retain_header(header): header.find(" age") < 0, # exclude "Confirmed COVID-19 admissions per 100 inpatient beds - last 7 days" header.find(" beds") < 0, - ]) + ])) or (all([ + # include "People who are fully vaccinated" + # include "People who have received a booster dose since August 13, 2021" + header.startswith("People who"), + # exclude "People who are fully vaccinated as % of total population" + # exclude "People who have received a booster dose as % of fully vaccinated population" + header.find("%") < 0, + # exclude "People who are fully vaccinated - ages 5-11" ... + # exclude "People who have received a booster dose - ages 65+" ... + header.find(" age") < 0, + # exclude "People who are fully vaccinated - 12-17" ... + header.find("-") < 0, + + ]) or all([ + # include "People with full course administered" + header.startswith("People with full course"), + # exclude "People with full course administered as % of adult population" + header.find("%") < 0, + ]))) def _parse_sheet(self, sheet): """Extract data frame for this sheet.""" df = pd.read_excel( @@ -238,17 +309,52 @@ def _parse_sheet(self, sheet): ) if sheet.row_filter: df = df.loc[sheet.row_filter(df)] + + + def select_fn(h): + """Allow for default to the 7-day in the name of the dataframe column.""" + try: + return (RE_COLUMN_FROM_HEADER.findall(h)[0], h, h.lower()) + except IndexError: + return ("", h, h.lower()) + select = [ - (RE_COLUMN_FROM_HEADER.findall(h)[0], h, h.lower()) + select_fn(h) for h in list(df.columns) if self.retain_header(h) ] for sig in SIGNALS: + ## Check if field is known to be missing # Hospital admissions not available at the county or CBSA level prior to Jan 8, 2021. - if (sheet.level == "msa" or sheet.level == "county") \ + is_hosp_adm_before_jan8 = (sheet.level == "msa" or sheet.level == "county") \ and self.publish_date < datetime.date(2021, 1, 8) \ - and sig == "confirmed covid-19 admissions": + and sig == "confirmed covid-19 admissions" + # Booster data not available before November 1 2021. + is_booster_before_nov1 = self.publish_date < datetime.date(2021, 11, 1) \ + and (sig in ["booster dose since", "booster doses administered"]) + # Booster and weekly doses administered not available below the state level. + is_booster_below_state = ((sheet.level != "hhs" and sheet.level != "state") \ + and (sig in ["doses administered", \ + "booster doses administered", "booster dose since"])) + # Weekly doses administered not available on or before Apr 29, 2021. + is_dose_admin_apr29 = self.publish_date <= datetime.date(2021, 4, 29) \ + and sig == "doses administered" + # People fully vaccinated not available on or before Apr 11, 2021 at the CBSA level. + is_fully_vax_msa_before_apr11 = (sheet.level == "msa" or sheet.level == "county") \ + and self.publish_date <= datetime.date(2021, 4, 11) \ + and sig == "fully vaccinated" + # People fully vaccinated not available before Jan 15, 2021 at any geo level. + is_fully_vax_before_jan14 = self.publish_date <= datetime.date(2021, 1, 14) \ + and sig == "fully vaccinated" + + if any([is_hosp_adm_before_jan8, + is_booster_before_nov1, + is_booster_below_state, + is_dose_admin_apr29, + is_fully_vax_msa_before_apr11, + is_fully_vax_before_jan14 + ]): self.dfs[(sheet.level, sig, NOT_PROP)] = pd.DataFrame( columns = ["geo_id", "timestamp", "val", \ "se", "sample_size", "publish_date"] @@ -256,6 +362,15 @@ def _parse_sheet(self, sheet): continue sig_select = [s for s in select if s[-1].find(sig) >= 0] + # The name of the cumulative vaccination was changed after 03/09/2021 + # when J&J vaccines were added. + if (sig == "fully vaccinated") and (len(sig_select)==0): + sig_select = [s for s in select if s[-1].find("people with full course") >= 0] + # Since "doses administered" is a substring of another desired header, + # "booster doses administered", we need to more strictly check if "doses administered" + # occurs at the beginning of a header to find the correct match. + if sig == "doses administered": + sig_select = [s for s in select if s[-1].startswith(sig)] assert len(sig_select) > 0, \ f"No {sig} in any of {select}\n\nAll headers:\n{NEWLINE.join(list(df.columns))}" @@ -270,11 +385,10 @@ def _parse_sheet(self, sheet): }) for si in sig_select ]) - for sig in COUNTS_7D_SIGNALS: + assert (sheet.level, sig, NOT_PROP) in self.dfs.keys() self.dfs[(sheet.level, sig, NOT_PROP)]["val"] /= 7 # 7-day total -> 7-day average - def as_cached_filename(params, config): """Formulate a filename to uniquely identify this report in the input cache.""" # eg "Community Profile Report 20220128.xlsx" @@ -299,7 +413,6 @@ def fetch_listing(params): ) for el in listing if el['filename'].endswith("xlsx") ] - if params['indicator']['reports'] == 'new': # drop files we already have in the input cache listing = [el for el in listing if not os.path.exists(el['cached_filename'])] @@ -364,7 +477,6 @@ def fetch_new_reports(params, logger=None): # download and parse individual reports datasets = download_and_parse(listing, logger) - # collect like signals together, keeping most recent publish date ret = {} for sig, lst in datasets.items(): @@ -381,7 +493,6 @@ def fetch_new_reports(params, logger=None): if len(latest_sig_df.index) > 0: latest_sig_df = latest_sig_df.reset_index(drop=True) - assert all(latest_sig_df.groupby( ["timestamp", "geo_id"] ).size( diff --git a/dsew_community_profile/input_cache/.gitignore b/dsew_community_profile/input_cache/.gitignore deleted file mode 100644 index 7c1222033..000000000 --- a/dsew_community_profile/input_cache/.gitignore +++ /dev/null @@ -1 +0,0 @@ -*.xlsx diff --git a/dsew_community_profile/params.json.template b/dsew_community_profile/params.json.template index 0dab14103..d6da310ec 100644 --- a/dsew_community_profile/params.json.template +++ b/dsew_community_profile/params.json.template @@ -11,7 +11,11 @@ "export_signals": [ "confirmed covid-19 admissions", "total", - "positivity" + "positivity", + "doses administered", + "booster doses administered", + "fully vaccinated", + "booster dose since" ] }, "validation": { @@ -34,7 +38,9 @@ "naats_total_7dav", "naats_positivity_7dav", "confirmed_admissions_covid_1d_prop_7dav", - "confirmed_admissions_covid_1d_7dav" + "confirmed_admissions_covid_1d_7dav", + "doses_admin_7dav", + "booster_doses_admin_7dav" ] } } diff --git a/dsew_community_profile/tests/test_pull.py b/dsew_community_profile/tests/test_pull.py index b898e21b6..7b6c8acba 100644 --- a/dsew_community_profile/tests/test_pull.py +++ b/dsew_community_profile/tests/test_pull.py @@ -16,35 +16,41 @@ class TestPull: def test_DatasetTimes(self): examples = [ - example(DatasetTimes("xyzzy", date(2021, 10, 30), date(2021, 10, 20), date(2021, 10, 22)), - DatasetTimes("xyzzy", date(2021, 10, 30), date(2021, 10, 20), date(2021, 10, 22))), + example(DatasetTimes("xyzzy", date(2021, 10, 30), date(2021, 10, 20), date(2021, 10, 22), date(2021, 10, 23), date(2021, 10, 24)), + DatasetTimes("xyzzy", date(2021, 10, 30), date(2021, 10, 20), date(2021, 10, 22), date(2021, 10, 23), date(2021, 10, 24))), ] for ex in examples: assert ex.given == ex.expected, "Equality" - dt = DatasetTimes("xyzzy", date(2021, 10, 30), date(2021, 10, 20), date(2021, 10, 22)) + dt = DatasetTimes("xyzzy", date(2021, 10, 30), date(2021, 10, 20), date(2021, 10, 22), date(2021, 10, 23), date(2021, 10, 24)) assert dt["positivity"] == date(2021, 10, 30), "positivity" assert dt["total"] == date(2021, 10, 20), "total" assert dt["confirmed covid-19 admissions"] == date(2021, 10, 22), "confirmed covid-19 admissions" + assert dt["doses administered"] == date(2021, 10, 24), "doses administered" + assert dt["fully vaccinated"] == date(2021, 10, 23), "fully vaccinated" with pytest.raises(ValueError): dt["xyzzy"] def test_DatasetTimes_from_header(self): examples = [ example("TESTING: LAST WEEK (October 24-30, Test Volume October 20-26)", - DatasetTimes("last", date(2021, 10, 30), date(2021, 10, 26), None)), + DatasetTimes("last", date(2021, 10, 30), date(2021, 10, 26), None, None, None)), example("TESTING: PREVIOUS WEEK (October 24-30, Test Volume October 20-26)", - DatasetTimes("previous", date(2021, 10, 30), date(2021, 10, 26), None)), + DatasetTimes("previous", date(2021, 10, 30), date(2021, 10, 26), None, None, None)), example("TESTING: LAST WEEK (October 24-November 30, Test Volume October 20-26)", - DatasetTimes("last", date(2021, 11, 30), date(2021, 10, 26), None)), + DatasetTimes("last", date(2021, 11, 30), date(2021, 10, 26), None, None, None)), example("VIRAL (RT-PCR) LAB TESTING: LAST WEEK (June 7-13, Test Volume June 3-9 )", - DatasetTimes("last", date(2021, 6, 13), date(2021, 6, 9), None)), + DatasetTimes("last", date(2021, 6, 13), date(2021, 6, 9), None, None, None)), example("VIRAL (RT-PCR) LAB TESTING: LAST WEEK (March 7-13)", - DatasetTimes("last", date(2021, 3, 13), date(2021, 3, 13), None)), + DatasetTimes("last", date(2021, 3, 13), date(2021, 3, 13), None, None, None)), example("HOSPITAL UTILIZATION: LAST WEEK (June 2-8)", - DatasetTimes("last", None, None, date(2021, 6, 8))), + DatasetTimes("last", None, None, date(2021, 6, 8), None, None)), example("HOSPITAL UTILIZATION: LAST WEEK (June 28-July 8)", - DatasetTimes("last", None, None, date(2021, 7, 8))) + DatasetTimes("last", None, None, date(2021, 7, 8), None, None)), + example("COVID-19 VACCINATION DATA: CUMULATIVE (January 25)", + DatasetTimes("", None, None, None, date(2021, 1, 25), None)), + example("COVID-19 VACCINATION DATA: LAST WEEK (January 25-31)", + DatasetTimes("last", None, None, None, None, date(2021, 1, 25))) ] for ex in examples: assert DatasetTimes.from_header(ex.given, date(2021, 12, 31)) == ex.expected, ex.given @@ -52,7 +58,7 @@ def test_DatasetTimes_from_header(self): # test year boundary examples = [ example("TESTING: LAST WEEK (October 24-30, Test Volume October 20-26)", - DatasetTimes("last", date(2020, 10, 30), date(2020, 10, 26), None)), + DatasetTimes("last", date(2020, 10, 30), date(2020, 10, 26), None, None, None)), ] for ex in examples: assert DatasetTimes.from_header(ex.given, date(2021, 1, 1)) == ex.expected, ex.given @@ -78,6 +84,12 @@ def test_Dataset_skip_overheader(self): example("HOSPITAL UTILIZATION: CHANGE FROM PREVIOUS WEEK", True), example("HOSPITAL UTILIZATION: DEMOGRAPHIC DATA", + True), + example("COVID-19 VACCINATION DATA: CUMULATIVE (January 25)", + False), + example("COVID-19 VACCINATION DATA: LAST WEEK (January 25-31)", + False), + example("COVID-19 VACCINATION DATA: DEMOGRAPHIC DATA", True) ] for ex in examples: