Skip to content

Release covidcast-indicators 0.3.8 #1560

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.3.7
current_version = 0.3.8
commit = True
message = chore: bump covidcast-indicators to {new_version}
tag = False
27 changes: 16 additions & 11 deletions dsew_community_profile/delphi_dsew_community_profile/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,55 +46,60 @@ class Transform:
)
]}

# signal id : is_rate, name to report in API
# key: signal id, string pattern used to find column to report as signal
# is_rate: originating signal is a percentage (e.g. test positivity)
# is_cumulative: originating signal is cumulative (e.g. vaccine doses ever administered)
# api_name: name to use in API
# make_prop: report originating signal as-is and per 100k population
# api_prop_name: name to use in API for proportion signal
SIGNALS = {
"total": {
"is_rate" : False,
"api_name": "naats_total_7dav",
"api_name": "covid_naat_num_7dav",
"make_prop": False,
"cumulative" : False
"is_cumulative" : False
},
"positivity": {
"is_rate" : True,
"api_name": "naats_positivity_7dav",
"api_name": "covid_naat_pct_positive_7dav",
"make_prop": False,
"cumulative" : False
"is_cumulative" : False
},
"confirmed covid-19 admissions": {
"is_rate" : False,
"api_name": "confirmed_admissions_covid_1d_7dav",
"make_prop": True,
"api_prop_name": "confirmed_admissions_covid_1d_prop_7dav",
"cumulative" : False
"is_cumulative" : False
},
"fully vaccinated": {
"is_rate" : False,
"api_name": "people_full_vaccinated",
"make_prop": False,
"cumulative" : True
"is_cumulative" : True
},
"booster dose since": {
"is_rate" : False,
"api_name": "people_booster_doses",
"make_prop": False,
"cumulative" : True
"is_cumulative" : True
},
"booster doses administered": {
"is_rate" : False,
"api_name": "booster_doses_admin_7dav",
"make_prop": False,
"cumulative" : False
"is_cumulative" : False
},
"doses administered": {
"is_rate" : False,
"api_name": "doses_admin_7dav",
"make_prop": False,
"cumulative" : False
"is_cumulative" : False
}
}

COUNTS_7D_SIGNALS = {key for key, value in SIGNALS.items() \
if not((value["is_rate"]) or (value["cumulative"]))}
if not((value["is_rate"]) or (value["is_cumulative"]))}

def make_signal_name(key, is_prop=False):
"""Convert a signal key to the corresponding signal name for the API.
Expand Down
146 changes: 141 additions & 5 deletions dsew_community_profile/delphi_dsew_community_profile/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from urllib.parse import quote_plus as quote_as_url

import pandas as pd
import numpy as np
import requests

from delphi_utils.geomap import GeoMapper
Expand Down Expand Up @@ -457,14 +458,23 @@ def nation_from_state(df, sig, geomapper):
).drop(
"norm_denom", axis=1
)
# The filter in `fetch_new_reports` to keep most recent publish date gurantees
# that we'll only see one unique publish date per timestamp here.
publish_date_by_ts = df.groupby(
["timestamp"]
)["publish_date"].apply(
lambda x: np.unique(x)[0]
).reset_index(
)
df = geomapper.replace_geocode(
df,
df.drop("publish_date", axis=1),
'state_id',
'nation',
new_col="geo_id"
)
df["se"] = None
df["sample_size"] = None
df = pd.merge(df, publish_date_by_ts, on="timestamp", how="left")

return df

Expand All @@ -483,8 +493,6 @@ def fetch_new_reports(params, logger=None):
"timestamp"
).apply(
lambda x: x[x["publish_date"] == x["publish_date"].max()]
).drop(
"publish_date", axis=1
).drop_duplicates(
)

Expand All @@ -496,7 +504,7 @@ def fetch_new_reports(params, logger=None):
).reset_index(
drop=True
) == 1), f"Duplicate rows in {sig} indicate that one or" \
+ " more reports was published multiple times and the copies differ"
+ " more reports were published multiple times and the copies differ"

ret[sig] = latest_sig_df

Expand All @@ -513,7 +521,25 @@ def fetch_new_reports(params, logger=None):
)

for key, df in ret.copy().items():
(geo, sig, _) = key
(geo, sig, prop) = key

if sig == "positivity":
# Combine with test volume using publish date.
total_key = (geo, "total", prop)
ret[key] = unify_testing_sigs(
df, ret[total_key]
).drop(
"publish_date", axis=1
)

# No longer need "total" signal.
del ret[total_key]
elif sig != "total":
# If signal is not test volume or test positivity, we don't need
# publish date.
df = df.drop("publish_date", axis=1)
ret[key] = df

if SIGNALS[sig]["make_prop"]:
ret[(geo, sig, IS_PROP)] = generate_prop_signal(df, geo, geomapper)

Expand Down Expand Up @@ -546,3 +572,113 @@ def generate_prop_signal(df, geo, geo_mapper):
df.drop(["population", geo], axis=1, inplace=True)

return df

def unify_testing_sigs(positivity_df, volume_df):
"""
Drop any observations with a sample size of 5 or less. Generate standard errors.

This combines test positivity and testing volume into a single signal,
where testing volume *from the same spreadsheet/publish date* (NOT the
same reference date) is used as the sample size for test positivity.

Total testing volume is typically provided for a 7-day period about 4 days
before the test positivity period. Since the CPR is only published on
weekdays, test positivity and test volume are only available for the same
reported dates 3 times a week. We have chosen to censor 7dav test
positivity based on the 7dav test volume provided in the same originating
spreadsheet, corresponding to a period ~4 days earlier.

This approach makes the signals maximally available (5 days per week) with
low latency. It avoids complications of having to process multiple
spreadsheets each day, and the fact that test positivity and test volume
are not available for all the same reference dates.

Discussion of decision and alternatives (Delphi-internal share drive):
https://docs.google.com/document/d/1MoIimdM_8OwG4SygoeQ9QEVZzIuDl339_a0xoYa6vuA/edit#

"""
# Combine test positivity and test volume, maintaining "this week" and "previous week" status.
assert len(positivity_df.index) == len(volume_df.index), \
"Test positivity and volume data have different numbers of observations."
volume_df = add_max_ts_col(volume_df)[
["geo_id", "publish_date", "val", "is_max_group_ts"]
].rename(
columns={"val":"sample_size"}
)
col_order = list(positivity_df.columns)
positivity_df = add_max_ts_col(positivity_df).drop(["sample_size"], axis=1)

df = pd.merge(
positivity_df, volume_df,
on=["publish_date", "geo_id", "is_max_group_ts"],
how="left"
).drop(
["is_max_group_ts"], axis=1
)

# Drop everything with 5 or fewer total tests.
df = df.loc[df.sample_size > 5]

# Generate stderr.
df = df.assign(se=std_err(df))

return df[col_order]

def add_max_ts_col(df):
"""
Add column to differentiate timestamps for a given publish date-geo combo.

Each publish date is associated with two timestamps for test volume and
test positivity. The older timestamp corresponds to data from the
"previous week"; the newer timestamp corresponds to the "last week".

Since test volume and test positivity timestamps don't match exactly, we
can't use them to merge the two signals together, but we still need a way
to uniquely identify observations to avoid duplicating observations during
the join. This new column, which is analagous to the "last/previous week"
classification, is used to merge on.
"""
assert all(
df.groupby(["publish_date", "geo_id"])["timestamp"].count() == 2
) and all(
df.groupby(["publish_date", "geo_id"])["timestamp"].nunique() == 2
), "Testing signals should have two unique timestamps per publish date-region combination."

max_ts_by_group = df.groupby(
["publish_date", "geo_id"], as_index=False
)["timestamp"].max(
).rename(
columns={"timestamp":"max_timestamp"}
)
df = pd.merge(
df, max_ts_by_group,
on=["publish_date", "geo_id"],
how="outer"
).assign(
is_max_group_ts=lambda df: df["timestamp"] == df["max_timestamp"]
).drop(
["max_timestamp"], axis=1
)

return df

def std_err(df):
"""
Find Standard Error of a binomial proportion.

Assumes input sample_size are all > 0.

Parameters
----------
df: pd.DataFrame
Columns: val, sample_size, ...

Returns
-------
pd.Series
Standard error of the positivity rate of PCR-specimen tests.
"""
assert all(df.sample_size > 0), "Sample sizes must be greater than 0"
p = df.val
n = df.sample_size
return np.sqrt(p * (1 - p) / n)
1 change: 1 addition & 0 deletions dsew_community_profile/input_cache/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.xlsx
Loading