Skip to content

Dsew vaccination #1495

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 40 commits into from
Feb 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
749a6c8
first attempt at adding booster signals
Ananya-Joshi Jan 25, 2022
7abc758
temporary changes
Ananya-Joshi Jan 25, 2022
5b5f19e
lint changes
Ananya-Joshi Jan 26, 2022
a2a39c5
Merge remote-tracking branch 'origin/main' into HEAD
Ananya-Joshi Jan 29, 2022
e51f756
Changes after Katie's Review
Ananya-Joshi Jan 31, 2022
4fd803c
Merge remote-tracking branch 'origin/main' into HEAD
Ananya-Joshi Jan 31, 2022
470a48f
Added 4 indicators for vaccination
Ananya-Joshi Feb 2, 2022
1a87a11
Merge branch 'main' into dsew_cp_vaccination
Ananya-Joshi Feb 2, 2022
f9b62b5
working with the new overheaders
Ananya-Joshi Feb 3, 2022
499a5f4
changes to tests and lint
Ananya-Joshi Feb 3, 2022
37df65f
Merge branch 'dsew_cp_vaccination' of https://github.com/cmu-delphi/c…
Ananya-Joshi Feb 3, 2022
44f2101
removed comma in the json file
Ananya-Joshi Feb 3, 2022
53482f3
removed print statements
Ananya-Joshi Feb 3, 2022
7c0564b
remove print
Ananya-Joshi Feb 3, 2022
6a935b4
removed cumulative from 7 day
Ananya-Joshi Feb 3, 2022
95bb252
removed unused line
Ananya-Joshi Feb 6, 2022
731ed9f
added cumulative flag to remove some signals from COUNTS_7D_SIGNALS
Ananya-Joshi Feb 6, 2022
94522c2
Change the header specification
Ananya-Joshi Feb 6, 2022
3eb0381
changed if statement to assert in COUNTS_7D_SIGNALS creation
Ananya-Joshi Feb 6, 2022
8a1e357
changing as_day to be part of as_date using Nat's backward compatibil…
Ananya-Joshi Feb 6, 2022
0f93ea7
lint
Ananya-Joshi Feb 6, 2022
d2f28b6
changes to skip overheader test to pass
Ananya-Joshi Feb 6, 2022
733bdd6
added more tests for the new vaccination overheaders
Ananya-Joshi Feb 7, 2022
42a2b80
added smoothened signals to the json template and to the ansible temp…
Ananya-Joshi Feb 7, 2022
06e9dc9
Merge remote-tracking branch 'origin/main' into dsew_cp_vaccination
Ananya-Joshi Feb 7, 2022
bcf6f0b
changed api name for cumulative signals
Ananya-Joshi Feb 7, 2022
2ed2c17
Update dsew_community_profile/delphi_dsew_community_profile/pull.py
Ananya-Joshi Feb 7, 2022
f39b315
Update dsew_community_profile/delphi_dsew_community_profile/pull.py
Ananya-Joshi Feb 7, 2022
fa853ab
added new end date for vaccine signals
Ananya-Joshi Feb 9, 2022
108968a
return empty df for doses before apr 29
nmdefries Feb 14, 2022
66de89e
add check for fully_vaccinated by date
nmdefries Feb 14, 2022
a736155
in early jan 2021, no vax info available so decrement times in assert
nmdefries Feb 14, 2022
a8eada6
condense checks on signal availability
nmdefries Feb 14, 2022
760d2c2
comment
nmdefries Feb 14, 2022
8d9527e
exclude different format of by-age fully vaccinated
nmdefries Feb 15, 2022
5980eaa
Merge branch 'dsew_cp_vaccination' into ndefries/dsew-generalizations
Ananya-Joshi Feb 15, 2022
8e3b7f8
Merge pull request #1524 from cmu-delphi/ndefries/dsew-generalizations
Ananya-Joshi Feb 15, 2022
83cca2d
added original vaccination signal (pre JJ) and lint
Ananya-Joshi Feb 16, 2022
f126de7
clean-up and ready for review
Ananya-Joshi Feb 17, 2022
f2f778b
small comments + name change
nmdefries Feb 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ansible/templates/dsew_community_profile-params-prod.json.j2
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
"naats_total_7dav",
"naats_positivity_7dav",
"confirmed_admissions_covid_1d_prop_7dav",
"confirmed_admissions_covid_1d_7dav"
"confirmed_admissions_covid_1d_7dav",
"doses_admin_7dav",
"booster_doses_admin_7dav"
]
}
}
Expand Down
36 changes: 32 additions & 4 deletions dsew_community_profile/delphi_dsew_community_profile/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,50 @@ class Transform:
"total": {
"is_rate" : False,
"api_name": "naats_total_7dav",
"make_prop": False
"make_prop": False,
"cumulative" : False
},
"positivity": {
"is_rate" : True,
"api_name": "naats_positivity_7dav",
"make_prop": False
"make_prop": False,
"cumulative" : False
},
"confirmed covid-19 admissions": {
"is_rate" : False,
"api_name": "confirmed_admissions_covid_1d_7dav",
"make_prop": True,
"api_prop_name": "confirmed_admissions_covid_1d_prop_7dav"
"api_prop_name": "confirmed_admissions_covid_1d_prop_7dav",
"cumulative" : False
},
"fully vaccinated": {
"is_rate" : False,
"api_name": "people_full_vaccinated",
"make_prop": False,
"cumulative" : True
},
"booster dose since": {
"is_rate" : False,
"api_name": "people_booster_doses",
"make_prop": False,
"cumulative" : True
},
"booster doses administered": {
"is_rate" : False,
"api_name": "booster_doses_admin_7dav",
"make_prop": False,
"cumulative" : False
},
"doses administered": {
"is_rate" : False,
"api_name": "doses_admin_7dav",
"make_prop": False,
"cumulative" : False
}
}

COUNTS_7D_SIGNALS = {key for key, value in SIGNALS.items() if not value["is_rate"]}
COUNTS_7D_SIGNALS = {key for key, value in SIGNALS.items() \
if not((value["is_rate"]) or (value["cumulative"]))}

def make_signal_name(key, is_prop=False):
"""Convert a signal key to the corresponding signal name for the API.
Expand Down
179 changes: 145 additions & 34 deletions dsew_community_profile/delphi_dsew_community_profile/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@
rf'HOSPITAL UTILIZATION: (.*) WEEK \({DATE_RANGE_EXP}\)'
)

# example: "COVID-19 VACCINATION DATA: LAST WEEK (January 5-11)"
RE_DATE_FROM_VAC_HEADER_WEEK= re.compile(
rf'COVID-19 VACCINATION DATA: (.*) WEEK \({DATE_RANGE_EXP}\)'
)

# example: 'COVID-19 VACCINATION DATA: CUMULATIVE (January 11)'
RE_DATE_FROM_VAC_HEADER_CUMULATIVE= re.compile(
rf'COVID-19 VACCINATION DATA: CUMULATIVE (.*)\({DATE_EXP}\)'
)

# example: "NAAT positivity rate - last 7 days (may be an underestimate due to delayed reporting)"
# example: "Total NAATs - last 7 days (may be an underestimate due to delayed reporting)"
RE_COLUMN_FROM_HEADER = re.compile('- (.*) 7 days')
Expand All @@ -44,15 +54,27 @@ class DatasetTimes:
positivity_reference_date: datetime.date
total_reference_date: datetime.date
hosp_reference_date: datetime.date
vac_reference_date: datetime.date
cumulative_vac_reference_date: datetime.date

@staticmethod
def from_header(header, publish_date):
"""Convert reference dates in overheader to DatasetTimes."""
def as_date(sub_result):
month = sub_result[2] if sub_result[2] else sub_result[0]
assert month, f"Bad month in header: {header}\nsub_result: {sub_result}"
month_numeric = datetime.datetime.strptime(month, "%B").month
day = sub_result[3]
positivity_reference_date = None
total_reference_date = None
hosp_reference_date = None
vac_reference_date = None
cumulative_vac_reference_date= None
def as_date(sub_result, is_single_date):
if is_single_date:
month = sub_result[0]
day = sub_result[1]
month_numeric = datetime.datetime.strptime(month, "%B").month
else:
month = sub_result[2] if sub_result[2] else sub_result[0]
assert month, f"Bad month in header: {header}\nsub_result: {sub_result}"
month_numeric = datetime.datetime.strptime(month, "%B").month
day = sub_result[3]
year = publish_date.year
# year boundary
if month_numeric > publish_date.month:
Expand All @@ -62,51 +84,64 @@ def as_date(sub_result):
if RE_DATE_FROM_TEST_HEADER.match(header):
findall_result = RE_DATE_FROM_TEST_HEADER.findall(header)[0]
column = findall_result[0].lower()
positivity_reference_date = as_date(findall_result[1:5])
positivity_reference_date = as_date(findall_result[1:5], False)
if findall_result[6]:
# Reports published starting 2021-03-17 specify different reference
# dates for positivity and total test volume
total_reference_date = as_date(findall_result[6:10])
total_reference_date = as_date(findall_result[6:10], False)
else:
total_reference_date = positivity_reference_date

hosp_reference_date = None
elif RE_DATE_FROM_HOSP_HEADER.match(header):
findall_result = RE_DATE_FROM_HOSP_HEADER.findall(header)[0]
column = findall_result[0].lower()
hosp_reference_date = as_date(findall_result[1:5])

total_reference_date = None
positivity_reference_date = None
hosp_reference_date = as_date(findall_result[1:5], False)
elif RE_DATE_FROM_VAC_HEADER_WEEK.match(header):
findall_result = RE_DATE_FROM_VAC_HEADER_WEEK.findall(header)[0]
column = findall_result[0].lower()
vac_reference_date = as_date(findall_result[1:5], False)
elif RE_DATE_FROM_VAC_HEADER_CUMULATIVE.match(header):
findall_result = RE_DATE_FROM_VAC_HEADER_CUMULATIVE.findall(header)[0]
column = findall_result[0].lower()
cumulative_vac_reference_date = as_date(findall_result[1:], True)
else:
raise ValueError(f"Couldn't find reference date in header '{header}'")

return DatasetTimes(column, positivity_reference_date,
total_reference_date, hosp_reference_date)
total_reference_date, hosp_reference_date,
cumulative_vac_reference_date, vac_reference_date)
def __getitem__(self, key):
"""Use DatasetTimes like a dictionary."""
ref_list = list(SIGNALS.keys())
if key.lower()=="positivity":
return self.positivity_reference_date
if key.lower()=="total":
return self.total_reference_date
if key.lower()=="confirmed covid-19 admissions":
return self.hosp_reference_date
if key.lower() in ["doses administered","booster doses administered"]:
return self.cumulative_vac_reference_date
if key.lower() in ["fully vaccinated","booster dose since"]:
return self.vac_reference_date
raise ValueError(
f"Bad reference date type request '{key}'; " + \
"need 'total', 'positivity', or 'confirmed covid-19 admissions'"
"need one of: " + " ,".join(ref_list)
)
def __setitem__(self, key, newvalue):
"""Use DatasetTimes like a dictionary."""
ref_list = list(SIGNALS.keys())
if key.lower()=="positivity":
self.positivity_reference_date = newvalue
if key.lower()=="total":
self.total_reference_date = newvalue
if key.lower()=="confirmed covid-19 admissions":
self.hosp_reference_date = newvalue
else:
if key.lower() in ["doses administered","booster doses administered"]:
self.cumulative_vac_reference_date = newvalue
if key.lower() in ["fully vaccinated","booster dose since"]:
self.vac_reference_date = newvalue
if key.lower() not in ref_list:
raise ValueError(
f"Bad reference date type request '{key}'; " + \
"need 'total', 'positivity', or 'confirmed covid-19 admissions'"
"need one of: " + " ,".join(ref_list)
)
def __eq__(self, other):
"""Check equality by value."""
Expand Down Expand Up @@ -164,14 +199,21 @@ def skip_overheader(header):
# include "VIRAL (RT-PCR) LAB TESTING: [LAST|PREVIOUS] WEEK (August 24-30, ..."
# include "HOSPITAL UTILIZATION: LAST WEEK (January 2-8)"
return not (isinstance(header, str) and \
(header.startswith("TESTING:") or \
(((header.startswith("TESTING:") or \
header.startswith("VIRAL (RT-PCR) LAB TESTING:") or \
header.startswith("HOSPITAL UTILIZATION:")) and \
header.startswith("HOSPITAL UTILIZATION: ")) and \
# exclude "TESTING: % CHANGE FROM PREVIOUS WEEK" \
# exclude "TESTING: DEMOGRAPHIC DATA" \
# exclude "HOSPITAL UTILIZATION: CHANGE FROM PREVIOUS WEEK" \
# exclude "HOSPITAL UTILIZATION: DEMOGRAPHIC DATA" \
header.find("WEEK (") > 0)
header.find("WEEK (") > 0) or \
# include "COVID-19 VACCINATION DATA: CUMULATIVE (January 25)"
# include "COVID-19 VACCINATION DATA: LAST WEEK (January 25-31)"
(header.startswith("COVID-19 VACCINATION DATA: CUMULATIVE") or
header.startswith("COVID-19 VACCINATION DATA: LAST WEEK") \
)))


def _parse_times_for_sheet(self, sheet):
"""Record reference dates for this sheet."""
# grab reference dates from overheaders
Expand All @@ -198,21 +240,32 @@ def _parse_times_for_sheet(self, sheet):
self.times[dt.column][sig] = dt[sig]
else:
self.times[dt.column] = dt
assert len(self.times) == 2, \
f"No times extracted from overheaders:\n{NEWLINE.join(str(s) for s in overheaders)}"

if self.publish_date <= datetime.date(2021, 1, 11):
# No vaccination data available, so we only have hospitalization and testing overheaders
assert len(self.times) == 2, \
f"No times extracted from overheaders:\n{NEWLINE.join(str(s) for s in overheaders)}"
else:
assert len(self.times) == 3, \
f"No times extracted from overheaders:\n{NEWLINE.join(str(s) for s in overheaders)}"

@staticmethod
def retain_header(header):
"""Ignore irrelevant headers."""
return all([
return ((all([
# include "Total NAATs - [last|previous] 7 days ..."
# include "Total RT-PCR diagnostic tests - [last|previous] 7 days ..."
# include "NAAT positivity rate - [last|previous] 7 days ..."
# include "Viral (RT-PCR) lab test positivity rate - [last|previous] 7 days ..."
# include "Booster doses administered - [last|previous] 7 days ..."
# include "Doses administered - [last|previous] 7 days ..."
(header.startswith("Total NAATs") or
header.startswith("NAAT positivity rate") or
header.startswith("Total RT-PCR") or
header.startswith("Viral (RT-PCR)")),
header.startswith("Viral (RT-PCR)") or
header.startswith("Booster") or
header.startswith("Doses administered -")
),
# exclude "NAAT positivity rate - absolute change ..."
header.find("7 days") > 0,
# exclude "NAAT positivity rate - last 7 days - ages <5"
Expand All @@ -227,7 +280,25 @@ def retain_header(header):
header.find(" age") < 0,
# exclude "Confirmed COVID-19 admissions per 100 inpatient beds - last 7 days"
header.find(" beds") < 0,
])
])) or (all([
# include "People who are fully vaccinated"
# include "People who have received a booster dose since August 13, 2021"
header.startswith("People who"),
# exclude "People who are fully vaccinated as % of total population"
# exclude "People who have received a booster dose as % of fully vaccinated population"
header.find("%") < 0,
# exclude "People who are fully vaccinated - ages 5-11" ...
# exclude "People who have received a booster dose - ages 65+" ...
header.find(" age") < 0,
# exclude "People who are fully vaccinated - 12-17" ...
header.find("-") < 0,

]) or all([
# include "People with full course administered"
header.startswith("People with full course"),
# exclude "People with full course administered as % of adult population"
header.find("%") < 0,
])))
def _parse_sheet(self, sheet):
"""Extract data frame for this sheet."""
df = pd.read_excel(
Expand All @@ -238,24 +309,68 @@ def _parse_sheet(self, sheet):
)
if sheet.row_filter:
df = df.loc[sheet.row_filter(df)]


def select_fn(h):
"""Allow for default to the 7-day in the name of the dataframe column."""
try:
return (RE_COLUMN_FROM_HEADER.findall(h)[0], h, h.lower())
except IndexError:
return ("", h, h.lower())

select = [
(RE_COLUMN_FROM_HEADER.findall(h)[0], h, h.lower())
select_fn(h)
for h in list(df.columns)
if self.retain_header(h)
]

for sig in SIGNALS:
## Check if field is known to be missing
# Hospital admissions not available at the county or CBSA level prior to Jan 8, 2021.
if (sheet.level == "msa" or sheet.level == "county") \
is_hosp_adm_before_jan8 = (sheet.level == "msa" or sheet.level == "county") \
and self.publish_date < datetime.date(2021, 1, 8) \
and sig == "confirmed covid-19 admissions":
and sig == "confirmed covid-19 admissions"
# Booster data not available before November 1 2021.
is_booster_before_nov1 = self.publish_date < datetime.date(2021, 11, 1) \
and (sig in ["booster dose since", "booster doses administered"])
# Booster and weekly doses administered not available below the state level.
is_booster_below_state = ((sheet.level != "hhs" and sheet.level != "state") \
and (sig in ["doses administered", \
"booster doses administered", "booster dose since"]))
# Weekly doses administered not available on or before Apr 29, 2021.
is_dose_admin_apr29 = self.publish_date <= datetime.date(2021, 4, 29) \
and sig == "doses administered"
# People fully vaccinated not available on or before Apr 11, 2021 at the CBSA level.
is_fully_vax_msa_before_apr11 = (sheet.level == "msa" or sheet.level == "county") \
and self.publish_date <= datetime.date(2021, 4, 11) \
and sig == "fully vaccinated"
# People fully vaccinated not available before Jan 15, 2021 at any geo level.
is_fully_vax_before_jan14 = self.publish_date <= datetime.date(2021, 1, 14) \
and sig == "fully vaccinated"

if any([is_hosp_adm_before_jan8,
is_booster_before_nov1,
is_booster_below_state,
is_dose_admin_apr29,
is_fully_vax_msa_before_apr11,
is_fully_vax_before_jan14
]):
self.dfs[(sheet.level, sig, NOT_PROP)] = pd.DataFrame(
columns = ["geo_id", "timestamp", "val", \
"se", "sample_size", "publish_date"]
)
continue

sig_select = [s for s in select if s[-1].find(sig) >= 0]
# The name of the cumulative vaccination was changed after 03/09/2021
# when J&J vaccines were added.
if (sig == "fully vaccinated") and (len(sig_select)==0):
sig_select = [s for s in select if s[-1].find("people with full course") >= 0]
# Since "doses administered" is a substring of another desired header,
# "booster doses administered", we need to more strictly check if "doses administered"
# occurs at the beginning of a header to find the correct match.
if sig == "doses administered":
sig_select = [s for s in select if s[-1].startswith(sig)]
assert len(sig_select) > 0, \
f"No {sig} in any of {select}\n\nAll headers:\n{NEWLINE.join(list(df.columns))}"

Expand All @@ -270,11 +385,10 @@ def _parse_sheet(self, sheet):
})
for si in sig_select
])

for sig in COUNTS_7D_SIGNALS:
assert (sheet.level, sig, NOT_PROP) in self.dfs.keys()
self.dfs[(sheet.level, sig, NOT_PROP)]["val"] /= 7 # 7-day total -> 7-day average


def as_cached_filename(params, config):
"""Formulate a filename to uniquely identify this report in the input cache."""
# eg "Community Profile Report 20220128.xlsx"
Expand All @@ -299,7 +413,6 @@ def fetch_listing(params):
)
for el in listing if el['filename'].endswith("xlsx")
]

if params['indicator']['reports'] == 'new':
# drop files we already have in the input cache
listing = [el for el in listing if not os.path.exists(el['cached_filename'])]
Expand Down Expand Up @@ -364,7 +477,6 @@ def fetch_new_reports(params, logger=None):

# download and parse individual reports
datasets = download_and_parse(listing, logger)

# collect like signals together, keeping most recent publish date
ret = {}
for sig, lst in datasets.items():
Expand All @@ -381,7 +493,6 @@ def fetch_new_reports(params, logger=None):

if len(latest_sig_df.index) > 0:
latest_sig_df = latest_sig_df.reset_index(drop=True)

assert all(latest_sig_df.groupby(
["timestamp", "geo_id"]
).size(
Expand Down
1 change: 0 additions & 1 deletion dsew_community_profile/input_cache/.gitignore

This file was deleted.

Loading