diff --git a/.bumpversion.cfg b/.bumpversion.cfg index a0ea8cd83..5d6dee4a0 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.3.7 +current_version = 0.3.8 commit = True message = chore: bump covidcast-indicators to {new_version} tag = False diff --git a/dsew_community_profile/delphi_dsew_community_profile/constants.py b/dsew_community_profile/delphi_dsew_community_profile/constants.py index 00deffee6..9c2eda699 100644 --- a/dsew_community_profile/delphi_dsew_community_profile/constants.py +++ b/dsew_community_profile/delphi_dsew_community_profile/constants.py @@ -46,55 +46,60 @@ class Transform: ) ]} -# signal id : is_rate, name to report in API +# key: signal id, string pattern used to find column to report as signal +# is_rate: originating signal is a percentage (e.g. test positivity) +# is_cumulative: originating signal is cumulative (e.g. vaccine doses ever administered) +# api_name: name to use in API +# make_prop: report originating signal as-is and per 100k population +# api_prop_name: name to use in API for proportion signal SIGNALS = { "total": { "is_rate" : False, - "api_name": "naats_total_7dav", + "api_name": "covid_naat_num_7dav", "make_prop": False, - "cumulative" : False + "is_cumulative" : False }, "positivity": { "is_rate" : True, - "api_name": "naats_positivity_7dav", + "api_name": "covid_naat_pct_positive_7dav", "make_prop": False, - "cumulative" : False + "is_cumulative" : False }, "confirmed covid-19 admissions": { "is_rate" : False, "api_name": "confirmed_admissions_covid_1d_7dav", "make_prop": True, "api_prop_name": "confirmed_admissions_covid_1d_prop_7dav", - "cumulative" : False + "is_cumulative" : False }, "fully vaccinated": { "is_rate" : False, "api_name": "people_full_vaccinated", "make_prop": False, - "cumulative" : True + "is_cumulative" : True }, "booster dose since": { "is_rate" : False, "api_name": "people_booster_doses", "make_prop": False, - "cumulative" : True + "is_cumulative" : True }, "booster doses administered": { "is_rate" : False, "api_name": "booster_doses_admin_7dav", "make_prop": False, - "cumulative" : False + "is_cumulative" : False }, "doses administered": { "is_rate" : False, "api_name": "doses_admin_7dav", "make_prop": False, - "cumulative" : False + "is_cumulative" : False } } COUNTS_7D_SIGNALS = {key for key, value in SIGNALS.items() \ - if not((value["is_rate"]) or (value["cumulative"]))} + if not((value["is_rate"]) or (value["is_cumulative"]))} def make_signal_name(key, is_prop=False): """Convert a signal key to the corresponding signal name for the API. diff --git a/dsew_community_profile/delphi_dsew_community_profile/pull.py b/dsew_community_profile/delphi_dsew_community_profile/pull.py index b7e2259b9..92448a969 100644 --- a/dsew_community_profile/delphi_dsew_community_profile/pull.py +++ b/dsew_community_profile/delphi_dsew_community_profile/pull.py @@ -7,6 +7,7 @@ from urllib.parse import quote_plus as quote_as_url import pandas as pd +import numpy as np import requests from delphi_utils.geomap import GeoMapper @@ -457,14 +458,23 @@ def nation_from_state(df, sig, geomapper): ).drop( "norm_denom", axis=1 ) + # The filter in `fetch_new_reports` to keep most recent publish date gurantees + # that we'll only see one unique publish date per timestamp here. + publish_date_by_ts = df.groupby( + ["timestamp"] + )["publish_date"].apply( + lambda x: np.unique(x)[0] + ).reset_index( + ) df = geomapper.replace_geocode( - df, + df.drop("publish_date", axis=1), 'state_id', 'nation', new_col="geo_id" ) df["se"] = None df["sample_size"] = None + df = pd.merge(df, publish_date_by_ts, on="timestamp", how="left") return df @@ -483,8 +493,6 @@ def fetch_new_reports(params, logger=None): "timestamp" ).apply( lambda x: x[x["publish_date"] == x["publish_date"].max()] - ).drop( - "publish_date", axis=1 ).drop_duplicates( ) @@ -496,7 +504,7 @@ def fetch_new_reports(params, logger=None): ).reset_index( drop=True ) == 1), f"Duplicate rows in {sig} indicate that one or" \ - + " more reports was published multiple times and the copies differ" + + " more reports were published multiple times and the copies differ" ret[sig] = latest_sig_df @@ -513,7 +521,25 @@ def fetch_new_reports(params, logger=None): ) for key, df in ret.copy().items(): - (geo, sig, _) = key + (geo, sig, prop) = key + + if sig == "positivity": + # Combine with test volume using publish date. + total_key = (geo, "total", prop) + ret[key] = unify_testing_sigs( + df, ret[total_key] + ).drop( + "publish_date", axis=1 + ) + + # No longer need "total" signal. + del ret[total_key] + elif sig != "total": + # If signal is not test volume or test positivity, we don't need + # publish date. + df = df.drop("publish_date", axis=1) + ret[key] = df + if SIGNALS[sig]["make_prop"]: ret[(geo, sig, IS_PROP)] = generate_prop_signal(df, geo, geomapper) @@ -546,3 +572,113 @@ def generate_prop_signal(df, geo, geo_mapper): df.drop(["population", geo], axis=1, inplace=True) return df + +def unify_testing_sigs(positivity_df, volume_df): + """ + Drop any observations with a sample size of 5 or less. Generate standard errors. + + This combines test positivity and testing volume into a single signal, + where testing volume *from the same spreadsheet/publish date* (NOT the + same reference date) is used as the sample size for test positivity. + + Total testing volume is typically provided for a 7-day period about 4 days + before the test positivity period. Since the CPR is only published on + weekdays, test positivity and test volume are only available for the same + reported dates 3 times a week. We have chosen to censor 7dav test + positivity based on the 7dav test volume provided in the same originating + spreadsheet, corresponding to a period ~4 days earlier. + + This approach makes the signals maximally available (5 days per week) with + low latency. It avoids complications of having to process multiple + spreadsheets each day, and the fact that test positivity and test volume + are not available for all the same reference dates. + + Discussion of decision and alternatives (Delphi-internal share drive): + https://docs.google.com/document/d/1MoIimdM_8OwG4SygoeQ9QEVZzIuDl339_a0xoYa6vuA/edit# + + """ + # Combine test positivity and test volume, maintaining "this week" and "previous week" status. + assert len(positivity_df.index) == len(volume_df.index), \ + "Test positivity and volume data have different numbers of observations." + volume_df = add_max_ts_col(volume_df)[ + ["geo_id", "publish_date", "val", "is_max_group_ts"] + ].rename( + columns={"val":"sample_size"} + ) + col_order = list(positivity_df.columns) + positivity_df = add_max_ts_col(positivity_df).drop(["sample_size"], axis=1) + + df = pd.merge( + positivity_df, volume_df, + on=["publish_date", "geo_id", "is_max_group_ts"], + how="left" + ).drop( + ["is_max_group_ts"], axis=1 + ) + + # Drop everything with 5 or fewer total tests. + df = df.loc[df.sample_size > 5] + + # Generate stderr. + df = df.assign(se=std_err(df)) + + return df[col_order] + +def add_max_ts_col(df): + """ + Add column to differentiate timestamps for a given publish date-geo combo. + + Each publish date is associated with two timestamps for test volume and + test positivity. The older timestamp corresponds to data from the + "previous week"; the newer timestamp corresponds to the "last week". + + Since test volume and test positivity timestamps don't match exactly, we + can't use them to merge the two signals together, but we still need a way + to uniquely identify observations to avoid duplicating observations during + the join. This new column, which is analagous to the "last/previous week" + classification, is used to merge on. + """ + assert all( + df.groupby(["publish_date", "geo_id"])["timestamp"].count() == 2 + ) and all( + df.groupby(["publish_date", "geo_id"])["timestamp"].nunique() == 2 + ), "Testing signals should have two unique timestamps per publish date-region combination." + + max_ts_by_group = df.groupby( + ["publish_date", "geo_id"], as_index=False + )["timestamp"].max( + ).rename( + columns={"timestamp":"max_timestamp"} + ) + df = pd.merge( + df, max_ts_by_group, + on=["publish_date", "geo_id"], + how="outer" + ).assign( + is_max_group_ts=lambda df: df["timestamp"] == df["max_timestamp"] + ).drop( + ["max_timestamp"], axis=1 + ) + + return df + +def std_err(df): + """ + Find Standard Error of a binomial proportion. + + Assumes input sample_size are all > 0. + + Parameters + ---------- + df: pd.DataFrame + Columns: val, sample_size, ... + + Returns + ------- + pd.Series + Standard error of the positivity rate of PCR-specimen tests. + """ + assert all(df.sample_size > 0), "Sample sizes must be greater than 0" + p = df.val + n = df.sample_size + return np.sqrt(p * (1 - p) / n) diff --git a/dsew_community_profile/input_cache/.gitignore b/dsew_community_profile/input_cache/.gitignore new file mode 100644 index 000000000..7c1222033 --- /dev/null +++ b/dsew_community_profile/input_cache/.gitignore @@ -0,0 +1 @@ +*.xlsx diff --git a/dsew_community_profile/tests/test_pull.py b/dsew_community_profile/tests/test_pull.py index 7b6c8acba..3cc4479d2 100644 --- a/dsew_community_profile/tests/test_pull.py +++ b/dsew_community_profile/tests/test_pull.py @@ -2,14 +2,16 @@ from datetime import date, datetime from itertools import chain import pandas as pd +import numpy as np import pytest from unittest.mock import patch, Mock from delphi_utils.geomap import GeoMapper -from delphi_dsew_community_profile.pull import DatasetTimes -from delphi_dsew_community_profile.pull import Dataset -from delphi_dsew_community_profile.pull import fetch_listing, nation_from_state, generate_prop_signal +from delphi_dsew_community_profile.pull import (DatasetTimes, Dataset, + fetch_listing, nation_from_state, generate_prop_signal, + std_err, add_max_ts_col, unify_testing_sigs) + example = namedtuple("example", "given expected") @@ -185,7 +187,8 @@ def test_nation_from_state(self): 'timestamp': [datetime(year=2020, month=1, day=1)]*2, 'val': [15., 150.], 'se': [None, None], - 'sample_size': [None, None],}) + 'sample_size': [None, None], + 'publish_date': [datetime(year=2020, month=1, day=1)]*2,}) pa_pop = int(state_pop.loc[state_pop.state_id == "pa", "pop"]) wv_pop = int(state_pop.loc[state_pop.state_id == "wv", "pop"]) @@ -207,7 +210,8 @@ def test_nation_from_state(self): 'timestamp': [datetime(year=2020, month=1, day=1)], 'val': [15. + 150.], 'se': [None], - 'sample_size': [None],}), + 'sample_size': [None], + 'publish_date': [datetime(year=2020, month=1, day=1)],}), check_like=True ) @@ -222,7 +226,8 @@ def test_nation_from_state(self): 'timestamp': [datetime(year=2020, month=1, day=1)], 'val': [15*pa_pop/tot_pop + 150*wv_pop/tot_pop], 'se': [None], - 'sample_size': [None],}), + 'sample_size': [None], + 'publish_date': [datetime(year=2020, month=1, day=1)],}), check_like=True ) @@ -306,3 +311,113 @@ def test_generate_prop_signal_non_msa(self): expected_df, check_like=True ) + + def test_unify_testing_sigs(self): + positivity_df = pd.DataFrame({ + 'geo_id': ["ca", "ca", "fl", "fl"], + 'timestamp': [datetime(2021, 10, 27), datetime(2021, 10, 20)]*2, + 'val': [0.2, 0.34, 0.7, 0.01], + 'se': [None] * 4, + 'sample_size': [None] * 4, + 'publish_date': [datetime(2021, 10, 30)]*4, + }) + base_volume_df = pd.DataFrame({ + 'geo_id': ["ca", "ca", "fl", "fl"], + 'timestamp': [datetime(2021, 10, 23), datetime(2021, 10, 16)]*2, + 'val': [None] * 4, + 'se': [None] * 4, + 'sample_size': [None] * 4, + 'publish_date': [datetime(2021, 10, 30)]*4, + }) + + examples = [ + example( + [positivity_df, base_volume_df.assign(val = [101, 102, 103, 104])], + positivity_df.assign( + sample_size = [101, 102, 103, 104], + se = lambda df: np.sqrt(df.val * (1 - df.val) / df.sample_size) + ) + ), # No filtering + example( + [positivity_df, base_volume_df.assign(val = [110, 111, 112, 113]).iloc[::-1]], + positivity_df.assign( + sample_size = [110, 111, 112, 113], + se = lambda df: np.sqrt(df.val * (1 - df.val) / df.sample_size) + ) + ), # No filtering, volume df in reversed order + example( + [positivity_df, base_volume_df.assign(val = [100, 5, 1, 6])], + positivity_df.assign( + sample_size = [100, 5, 1, 6] + ).iloc[[0, 3]].assign( + se = lambda df: np.sqrt(df.val * (1 - df.val) / df.sample_size) + ) + ) + ] + for ex in examples: + pd.testing.assert_frame_equal(unify_testing_sigs(ex.given[0], ex.given[1]), ex.expected) + + with pytest.raises(AssertionError): + # Inputs have different numbers of rows. + unify_testing_sigs(positivity_df, positivity_df.iloc[0]) + + def test_add_max_ts_col(self): + input_df = pd.DataFrame({ + 'geo_id': ["ca", "ca", "fl", "fl"], + 'timestamp': [datetime(2021, 10, 27), datetime(2021, 10, 20)]*2, + 'val': [1, 2, 3, 4], + 'se': [None] * 4, + 'sample_size': [None] * 4, + 'publish_date': [datetime(2021, 10, 30)]*4, + }) + examples = [ + example(input_df, input_df.assign(is_max_group_ts = [True, False, True, False])), + ] + for ex in examples: + pd.testing.assert_frame_equal(add_max_ts_col(ex.given), ex.expected) + + with pytest.raises(AssertionError): + # Input df has 2 timestamps per geo id-publish date combination, but not 2 unique timestamps. + add_max_ts_col( + pd.DataFrame({ + 'geo_id': ["ca", "ca", "fl", "fl"], + 'timestamp': [datetime(2021, 10, 27)]*4, + 'val': [1, 2, 3, 4], + 'se': [None] * 4, + 'sample_size': [None] * 4, + 'publish_date': [datetime(2021, 10, 30)]*4, + }) + ) + with pytest.raises(AssertionError): + # Input df has fewer than 2 timestamps per geo id-publish date combination. + add_max_ts_col( + pd.DataFrame({ + 'geo_id': ["ca", "fl"], + 'timestamp': [datetime(2021, 10, 27)]*2, + 'val': [1, 2], + 'se': [None] * 2, + 'sample_size': [None] * 2, + 'publish_date': [datetime(2021, 10, 30)]*2, + }) + ) + + def test_std_err(self): + df = pd.DataFrame({ + "val": [0, 0.5, 0.4, 0.3, 0.2, 0.1], + "sample_size": [2, 2, 5, 10, 20, 50] + }) + + expected_se = np.sqrt(df.val * (1 - df.val) / df.sample_size) + se = std_err(df) + + assert (se >= 0).all() + assert not np.isnan(se).any() + assert not np.isinf(se).any() + assert np.allclose(se, expected_se, equal_nan=True) + with pytest.raises(AssertionError): + std_err( + pd.DataFrame({ + "val": [0, 0.5, 0.4, 0.3, 0.2, 0.1], + "sample_size": [2, 2, 5, 10, 20, 0] + }) + ) diff --git a/facebook/qsf-tools/qsf-differ.R b/facebook/qsf-tools/qsf-differ.R index 33ad22c03..d9a595e1d 100644 --- a/facebook/qsf-tools/qsf-differ.R +++ b/facebook/qsf-tools/qsf-differ.R @@ -115,10 +115,10 @@ get_qsf_file <- function(path, #' @return The modified questions_list object safe_insert_question <- function(questions_list, question) { if ( !is.null(questions_list[[question$DataExportTag]]) ) { - old_qid <- questions_list[[question$DataExportTag]]$QuestionID + already_seen_qid <- questions_list[[question$DataExportTag]]$QuestionID new_qid <- question$QuestionID - stop(paste0("Multiple copies of item ", question$DataExportTag, " exist, ", old_qid, " and ", new_qid)) + stop(paste0("Multiple copies of item ", question$DataExportTag, " exist, ", already_seen_qid, " and ", new_qid)) } questions_list[[question$DataExportTag]] <- question @@ -139,8 +139,8 @@ diff_surveys <- function(old_qsf, new_qsf) { added <- setdiff(new_shown_items, old_shown_items) removed <- setdiff(old_shown_items, new_shown_items) - added_df <- create_diff_df(added, "Added", new_questions) - removed_df <- create_diff_df(removed, "Removed", old_questions) + added_df <- create_diff_df(added, "Added", NULL, new_questions) + removed_df <- create_diff_df(removed, "Removed", old_questions, NULL) ## For questions that appear in both surveys, check for changes in wording, ## display logic, and answer options. @@ -166,7 +166,9 @@ diff_surveys <- function(old_qsf, new_qsf) { #' survey #' @param new_qsf named list of trimmed output from `get_qsf_file` for newer #' survey -diff_question <- function(names, change_type=c("Choices", "QuestionText", "DisplayLogic", "Subquestions"), old_qsf, new_qsf) { +diff_question <- function(names, change_type=c("Choices", "QuestionText", + "DisplayLogic", "Subquestions"), + old_qsf, new_qsf) { change_type <- match.arg(change_type) changed <- c() @@ -175,7 +177,7 @@ diff_question <- function(names, change_type=c("Choices", "QuestionText", "Displ changed <- append(changed, question) } } - out <- create_diff_df(changed, change_type, new_qsf) + out <- create_diff_df(changed, change_type, old_qsf, new_qsf) return(out) } @@ -185,10 +187,14 @@ diff_question <- function(names, change_type=c("Choices", "QuestionText", "Displ #' @param questions character vector of Qualtrics question IDs for items that #' changed between survey versions #' @param change_type character; type of change to look for -#' @param reference_qsf named list of trimmed output from `get_qsf_file` for survey that -#' contains descriptive info about a particular type of change. For "removed" -#' questions, should be older survey, else newer survey. -create_diff_df <- function(questions, change_type=c("Added", "Removed", "Choices", "QuestionText", "DisplayLogic", "Subquestions"), reference_qsf) { +#' @param old_qsf named list of trimmed output from `get_qsf_file` for older +#' survey +#' @param new_qsf named list of trimmed output from `get_qsf_file` for newer +#' survey +create_diff_df <- function(questions, change_type=c("Added", "Removed", + "Choices", "QuestionText", + "DisplayLogic", "Subquestions"), + old_qsf, new_qsf) { out <- data.frame() if ( length(questions) > 0 ) { @@ -203,9 +209,24 @@ create_diff_df <- function(questions, change_type=c("Added", "Removed", "Choices Subquestions = "Matrix subquestions changed" ) questions <- sort(questions) - qids <- sapply(questions, function(question) { reference_qsf[[question]]$QuestionID }) + + if (!is.null(old_qsf, new_qsf { + old_qids <- sapply(questions, function(question) { old_qsf, new_qsfquestion]]$QuestionID }) + } else { + old_qids <- NA + } + if (!is.null(new_qsf)) { + new_qids <- sapply(questions, function(question) { new_qsf[[question]]$QuestionID }) + } else { + new_qids <- NA + } - out <- data.frame(change_type=change_descriptions[[change_type]], item=questions, qid=qids) + out <- data.frame( + change_type=change_descriptions[[change_type]], + item=questions, + old_qid=old_qids, + new_qid=new_qids + ) } return(out) diff --git a/facebook/qsf-tools/static/item_shortquestion_map.csv b/facebook/qsf-tools/static/item_shortquestion_map.csv index 0dd29d307..203efa612 100644 --- a/facebook/qsf-tools/static/item_shortquestion_map.csv +++ b/facebook/qsf-tools/static/item_shortquestion_map.csv @@ -114,7 +114,7 @@ H2,"public others masked 7d" H3,"friends fam vaccinated" I1,"believe stop masking" I2,"believe children immune" -I3,"belive small group" +I3,"believe small group" I4,"believe govt control" I5,"news sources 7d" I6,"trust COVID source"