diff --git a/_delphi_utils_python/delphi_utils/runner.py b/_delphi_utils_python/delphi_utils/runner.py index 01f090b6c..8547b2e5f 100644 --- a/_delphi_utils_python/delphi_utils/runner.py +++ b/_delphi_utils_python/delphi_utils/runner.py @@ -17,6 +17,12 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None], archiver_fn: Callable[[Params], Optional[ArchiveDiffer]] = NULL_FN): """Run an indicator with its optional validation and archiving. + Each argument to this function should itself be a function that will be passed a common set of + parameters (see details below). This parameter dictionary should have four subdictionaries + keyed as "indicator", "validation", "archive", and "common" corresponding to parameters to be + used in `indicator_fn`, `validator_fn`, `archiver_fn`, and shared across functions, + respectively. + Arguments --------- indicator_fn: Callable[[Params], None] @@ -43,7 +49,9 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None], parser.add_argument("indicator_name", type=str, help="Name of the Python package containing the indicator. This package " - "must export a `run_module(params)` function.") + "must export a `run.run_module(params)` function.") args = parser.parse_args() indicator_module = importlib.import_module(args.indicator_name) - run_indicator_pipeline(indicator_module.run_module, validator_from_params, archiver_from_params) + run_indicator_pipeline(indicator_module.run.run_module, + validator_from_params, + archiver_from_params) diff --git a/_delphi_utils_python/delphi_utils/validator/datafetcher.py b/_delphi_utils_python/delphi_utils/validator/datafetcher.py index 540becbda..8c60e6ef5 100644 --- a/_delphi_utils_python/delphi_utils/validator/datafetcher.py +++ b/_delphi_utils_python/delphi_utils/validator/datafetcher.py @@ -5,6 +5,7 @@ import threading from os import listdir from os.path import isfile, join +import warnings import pandas as pd import numpy as np @@ -114,9 +115,6 @@ def get_geo_signal_combos(data_source): geo_signal_combos = list(map(tuple, source_meta[["geo_type", "signal"]].to_records(index=False))) - print("Number of expected geo region-signal combinations:", - len(geo_signal_combos)) - return geo_signal_combos @@ -126,8 +124,10 @@ def fetch_api_reference(data_source, start_date, end_date, geo_type, signal_type Formatting is changed to match that of source data CSVs. """ - api_df = covidcast.signal( - data_source, signal_type, start_date, end_date, geo_type) + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + api_df = covidcast.signal( + data_source, signal_type, start_date, end_date, geo_type) if not isinstance(api_df, pd.DataFrame): custom_msg = "Error fetching data from " + str(start_date) + \ diff --git a/_delphi_utils_python/delphi_utils/validator/report.py b/_delphi_utils_python/delphi_utils/validator/report.py index 5ac26c3a9..7c34a3fac 100644 --- a/_delphi_utils_python/delphi_utils/validator/report.py +++ b/_delphi_utils_python/delphi_utils/validator/report.py @@ -90,13 +90,19 @@ def log(self): for warning in self.raised_warnings: logger.warning(str(warning)) - def print_and_exit(self): - """Print results and, if any unsuppressed exceptions were raised, exit with non-0 status.""" + def print_and_exit(self, die_on_failures=True): + """Print results and exit. + + Arguments + --------- + die_on_failures: bool + Whether to return non-zero status if any failures were encountered. + """ print(self.summary()) self.log() if self.success(): sys.exit(0) - else: + elif die_on_failures: sys.exit(1) def success(self): diff --git a/_delphi_utils_python/delphi_utils/validator/run.py b/_delphi_utils_python/delphi_utils/validator/run.py index af1bcdc2e..e39fe2d44 100644 --- a/_delphi_utils_python/delphi_utils/validator/run.py +++ b/_delphi_utils_python/delphi_utils/validator/run.py @@ -2,16 +2,21 @@ """Functions to call when running the tool. This module should contain a function called `run_module`, that is executed -when the module is run with `python -m delphi_validator`. +when the module is run with `python -m delphi_utils.validator`. """ +import argparse as ap from .. import read_params from .validate import Validator def run_module(): """Run the validator as a module.""" + parser = ap.ArgumentParser() + parser.add_argument("--dry_run", action="store_true", help="When provided, return zero exit" + " status irrespective of the number of failures") + args = parser.parse_args() validator = Validator(read_params()) - validator.validate().print_and_exit() + validator.validate().print_and_exit(not args.dry_run) def validator_from_params(params): diff --git a/_template_python/delphi_NAME/__main__.py b/_template_python/delphi_NAME/__main__.py index bf03405fe..32fc0eecc 100644 --- a/_template_python/delphi_NAME/__main__.py +++ b/_template_python/delphi_NAME/__main__.py @@ -6,6 +6,7 @@ no need to change this template. """ +from delphi_utils import read_params from .run import run_module # pragma: no cover -run_module() # pragma: no cover +run_module(read_params()) # pragma: no cover diff --git a/_template_python/delphi_NAME/constants.py b/_template_python/delphi_NAME/constants.py index 94b3e32d3..b6b1fdfce 100644 --- a/_template_python/delphi_NAME/constants.py +++ b/_template_python/delphi_NAME/constants.py @@ -3,11 +3,11 @@ """ -## example: +## example: # FULL_TIME = "full_time_work_prop" # PART_TIME = "part_time_work_prop" # COVIDNET = "covidnet" -# +# # SIGNALS = [ # FULL_TIME, # PART_TIME, diff --git a/_template_python/delphi_NAME/run.py b/_template_python/delphi_NAME/run.py index c5342aba8..a73c48804 100644 --- a/_template_python/delphi_NAME/run.py +++ b/_template_python/delphi_NAME/run.py @@ -2,20 +2,34 @@ """Functions to call when running the function. This module should contain a function called `run_module`, that is executed -when the module is run with `python -m MODULE_NAME`. +when the module is run with `python -m MODULE_NAME`. `run_module`'s lone argument should be a +nested dictionary of parameters loaded from the params.json file. We expect the `params` to have +the following structure: + - "common": + - "export_dir": str, directory to which the results are exported + - "log_filename": (optional) str, path to log file + - "indicator": (optional) + - "wip_signal": (optional) Any[str, bool], list of signals that are works in progress, or + True if all signals in the registry are works in progress, or False if only + unpublished signals are. See `delphi_utils.add_prefix()` + - Any other indicator-specific settings """ -from delphi_utils import read_params -from .handle_wip_signal import add_prefix +from delphi_utils import add_prefix from .constants import SIGNALS -def run_module(): +def run_module(params): """ - Calls the method for handling the wip signals + Runs the indicator + + Arguments + -------- + params: Dict[str, Any] + Nested dictionary of parameters. + Returns ------- prints the updated signal names """ - params = read_params() - wip_signal = params["wip_signal"] + wip_signal = params["indicator"]["wip_signal"] signal_names = add_prefix(SIGNALS, wip_signal, prefix="wip_") print(signal_names) diff --git a/facebook/delphiFacebook/R/binary.R b/facebook/delphiFacebook/R/binary.R index 2ccc6274a..2f4efb5d3 100644 --- a/facebook/delphiFacebook/R/binary.R +++ b/facebook/delphiFacebook/R/binary.R @@ -167,6 +167,12 @@ get_binary_indicators <- function() { "smoothed_wdontneed_reason_not_beneficial", "weight", "v_dontneed_reason_not_beneficial", 6, compute_binary_response, jeffreys_binary, "smoothed_dontneed_reason_other", "weight_unif", "v_dontneed_reason_other", 6, compute_binary_response, jeffreys_binary, "smoothed_wdontneed_reason_other", "weight", "v_dontneed_reason_other", 6, compute_binary_response, jeffreys_binary, + + # schooling + "smoothed_inperson_school_fulltime", "weight_unif", "s_inperson_school_fulltime", 6, compute_binary_response, jeffreys_binary, + "smoothed_winperson_school_fulltime", "weight", "s_inperson_school_fulltime", 6, compute_binary_response, jeffreys_binary, + "smoothed_inperson_school_parttime", "weight_unif", "s_inperson_school_parttime", 6, compute_binary_response, jeffreys_binary, + "smoothed_winperson_school_parttime", "weight", "s_inperson_school_parttime", 6, compute_binary_response, jeffreys_binary, ) @@ -210,22 +216,42 @@ compute_binary_response <- function(response, weight, sample_size) #' @return Updated data frame. #' @importFrom dplyr mutate jeffreys_binary <- function(df) { - return(mutate(df, - val = jeffreys_percentage(.data$val, .data$sample_size), - se = binary_se(.data$val, .data$sample_size))) + return( jeffreys_multinomial_factory(2)(df) ) +} + +#' Generate function that applies Jeffreys correction to multinomial estimates. +#' +#' @param k Number of groups. +#' +#' @return Function to apply multinomial Jeffreys correction. +#' @importFrom dplyr mutate +jeffreys_multinomial_factory <- function(k) { + # Apply a Jeffreys correction to multinomial estimates and their standard errors. + # + # Param df: Data frame + # Returns: Updated data frame. + jeffreys_multinomial <- function(df) { + return(mutate(df, + val = jeffreys_percentage(.data$val, .data$sample_size, k), + se = binary_se(.data$val, .data$sample_size))) + } + + return(jeffreys_multinomial) } -#' Adjust a percentage estimate to use the Jeffreys method. +#' Adjust a multinomial percentage estimate using the Jeffreys method. #' -#' Takes a previously estimated percentage (calculated with num_yes / total * +#' Takes a previously estimated percentage (calculated with num_group1 / total * #' 100) and replaces it with the Jeffreys version, where one pseudo-observation -#' with 50% yes is inserted. +#' with 1/k mass in each group is inserted. #' #' @param percentage Vector of percentages to adjust. #' @param sample_size Vector of corresponding sample sizes. +#' @param k Number of groups. +#' #' @return Vector of adjusted percentages. -jeffreys_percentage <- function(percentage, sample_size) { - return((percentage * sample_size + 50) / (sample_size + 1)) +jeffreys_percentage <- function(percentage, sample_size, k) { + return((percentage * sample_size + 100/k) / (sample_size + 1)) } #' Calculate the standard error for a binary proportion (as a percentage) diff --git a/facebook/delphiFacebook/R/contingency_aggregate.R b/facebook/delphiFacebook/R/contingency_aggregate.R index f27288f80..b5ab44434 100644 --- a/facebook/delphiFacebook/R/contingency_aggregate.R +++ b/facebook/delphiFacebook/R/contingency_aggregate.R @@ -254,11 +254,11 @@ summarize_aggs <- function(df, crosswalk_data, aggregations, geo_level, params) group_vars <- aggregations$group_by[[1]] - if ( !all(groupby_vars %in% names(df)) ) { + if ( !all(group_vars %in% names(df)) ) { msg_plain( sprintf( "not all of grouping columns %s available in data; skipping aggregation", - paste(groupby_vars, collapse=", ") + paste(group_vars, collapse=", ") )) return( list( )) } @@ -267,11 +267,11 @@ summarize_aggs <- function(df, crosswalk_data, aggregations, geo_level, params) # Keep rows with missing values initially so that we get the correct column # names. Explicitly drop groups with missing values in second step. unique_groups_counts <- as.data.frame( - table(df[, groupby_vars, with=FALSE], exclude=NULL, dnn=groupby_vars), + table(df[, group_vars, with=FALSE], exclude=NULL, dnn=group_vars), stringsAsFactors=FALSE ) unique_groups_counts <- unique_groups_counts[ - complete.cases(unique_groups_counts[, groupby_vars]), + complete.cases(unique_groups_counts[, group_vars]), ] # Drop groups with less than threshold sample size. @@ -283,7 +283,7 @@ summarize_aggs <- function(df, crosswalk_data, aggregations, geo_level, params) ## Convert col type in unique_groups to match that in data. # Filter on data.table in `calculate_group` requires that columns and filter # values are of the same type. - for (col_var in groupby_vars) { + for (col_var in group_vars) { if ( class(df[[col_var]]) != class(unique_groups_counts[[col_var]]) ) { class(unique_groups_counts[[col_var]]) <- class(df[[col_var]]) } @@ -295,10 +295,10 @@ summarize_aggs <- function(df, crosswalk_data, aggregations, geo_level, params) setindexv(df, group_vars) calculate_group <- function(ii) { - target_group <- unique_groups_counts[ii, groupby_vars, drop=FALSE] + target_group <- unique_groups_counts[ii, group_vars, drop=FALSE] # Use data.table's index to make this filter efficient out <- summarize_aggregations_group( - df[as.list(target_group), on=groupby_vars], + df[as.list(target_group), on=group_vars], aggregations, target_group, geo_level, diff --git a/facebook/delphiFacebook/R/responses.R b/facebook/delphiFacebook/R/responses.R index 74808a76c..ad3392368 100644 --- a/facebook/delphiFacebook/R/responses.R +++ b/facebook/delphiFacebook/R/responses.R @@ -159,6 +159,7 @@ load_response_one <- function(input_filename, params) { input_data <- code_testing(input_data) input_data <- code_activities(input_data) input_data <- code_vaccines(input_data) + input_data <- code_schooling(input_data) # create testing variables diff --git a/facebook/delphiFacebook/R/variables.R b/facebook/delphiFacebook/R/variables.R index 61f797c13..56f4717bb 100644 --- a/facebook/delphiFacebook/R/variables.R +++ b/facebook/delphiFacebook/R/variables.R @@ -471,3 +471,32 @@ code_vaccines <- function(input_data) { return(input_data) } + +#' Schooling +#' +#' @param input_data input data frame of raw survey data +#' @return data frame augmented with `hh_number_total` +code_schooling <- function(input_data) { + if ("E2_1" %in% names(input_data)) { + # Coded as 2 = "Yes", 3 = "No", 4 = "I don't know" + input_data$s_inperson_school_fulltime <- case_when( + input_data$E2_1 == 2 ~ 1, + input_data$E2_1 == 3 ~ 0, + TRUE ~ NA_real_ + ) + } else { + input_data$s_inperson_school_fulltime <- NA_real_ + } + + if ("E2_2" %in% names(input_data)) { + # Coded as 2 = "Yes", 3 = "No", 4 = "I don't know" + input_data$s_inperson_school_parttime <- case_when( + input_data$E2_2 == 2 ~ 1, + input_data$E2_2 == 3 ~ 0, + TRUE ~ NA_real_ + ) + } else { + input_data$s_inperson_school_parttime <- NA_real_ + } + return(input_data) +} diff --git a/facebook/delphiFacebook/man/code_schooling.Rd b/facebook/delphiFacebook/man/code_schooling.Rd new file mode 100644 index 000000000..7fca7de59 --- /dev/null +++ b/facebook/delphiFacebook/man/code_schooling.Rd @@ -0,0 +1,17 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/variables.R +\name{code_schooling} +\alias{code_schooling} +\title{Schooling} +\usage{ +code_schooling(input_data) +} +\arguments{ +\item{input_data}{input data frame of raw survey data} +} +\value{ +data frame augmented with `hh_number_total` +} +\description{ +Schooling +} diff --git a/facebook/delphiFacebook/man/jeffreys_multinomial_factory.Rd b/facebook/delphiFacebook/man/jeffreys_multinomial_factory.Rd new file mode 100644 index 000000000..e58b38b68 --- /dev/null +++ b/facebook/delphiFacebook/man/jeffreys_multinomial_factory.Rd @@ -0,0 +1,17 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/binary.R +\name{jeffreys_multinomial_factory} +\alias{jeffreys_multinomial_factory} +\title{Generate function that applies Jeffreys correction to multinomial estimates.} +\usage{ +jeffreys_multinomial_factory(k) +} +\arguments{ +\item{k}{Number of groups.} +} +\value{ +Function to apply multinomial Jeffreys correction. +} +\description{ +Generate function that applies Jeffreys correction to multinomial estimates. +} diff --git a/facebook/delphiFacebook/man/jeffreys_percentage.Rd b/facebook/delphiFacebook/man/jeffreys_percentage.Rd index 711d98f4a..bd6556378 100644 --- a/facebook/delphiFacebook/man/jeffreys_percentage.Rd +++ b/facebook/delphiFacebook/man/jeffreys_percentage.Rd @@ -2,20 +2,22 @@ % Please edit documentation in R/binary.R \name{jeffreys_percentage} \alias{jeffreys_percentage} -\title{Adjust a percentage estimate to use the Jeffreys method.} +\title{Adjust a multinomial percentage estimate using the Jeffreys method.} \usage{ -jeffreys_percentage(percentage, sample_size) +jeffreys_percentage(percentage, sample_size, k) } \arguments{ \item{percentage}{Vector of percentages to adjust.} \item{sample_size}{Vector of corresponding sample sizes.} + +\item{k}{Number of groups.} } \value{ Vector of adjusted percentages. } \description{ -Takes a previously estimated percentage (calculated with num_yes / total * +Takes a previously estimated percentage (calculated with num_group1 / total * 100) and replaces it with the Jeffreys version, where one pseudo-observation -with 50% yes is inserted. +with 1/k mass in each group is inserted. } diff --git a/facebook/delphiFacebook/tests/testthat/test-binary.R b/facebook/delphiFacebook/tests/testthat/test-binary.R new file mode 100644 index 000000000..fec01631b --- /dev/null +++ b/facebook/delphiFacebook/tests/testthat/test-binary.R @@ -0,0 +1,17 @@ +context("Testing functions for calculating binary and multinomial proportions") + +test_that("testing jeffreys_binary command", { + input <- tibble(val=c(0), sample_size=1) + expected_output <- tibble(val=c(25), sample_size=1, se=binary_se(25, 1)) + + expect_equal(jeffreys_binary(input), expected_output) +}) + +test_that("testing jeffreys_multinomial command", { + jeffreys_multinomial <- jeffreys_multinomial_factory(4) + + input <- tibble(val=c(0), sample_size=3) + expected_output <- tibble(val=c(25/4), sample_size=3, se=binary_se(25/4, 3)) + + expect_equal(jeffreys_multinomial(input), expected_output) +}) diff --git a/google_symptoms/.pylintrc b/google_symptoms/.pylintrc index 24739e60e..f337ecf9c 100644 --- a/google_symptoms/.pylintrc +++ b/google_symptoms/.pylintrc @@ -5,4 +5,4 @@ min-public-methods=1 [MESSAGES CONTROL] -disable=R0801, C0330, E1101, E0611, C0114, C0116, C0103, R0913, R0914, W0702, W0707 +disable=R0801, E1101, E0611, C0114, C0116, C0103, R0913, R0914, W0702, W0707 diff --git a/google_symptoms/delphi_google_symptoms/pull.py b/google_symptoms/delphi_google_symptoms/pull.py index 2a4eb1fb2..c4d900a3b 100644 --- a/google_symptoms/delphi_google_symptoms/pull.py +++ b/google_symptoms/delphi_google_symptoms/pull.py @@ -94,19 +94,21 @@ def preprocess(df, level): return df -def get_date_range(export_start_date, num_export_days): +def get_date_range(export_start_date, export_end_date, num_export_days): """Produce date range to retrieve data for. - Calculate start of date range as a static offset from the end date - ("now"). Pad date range by an additional 7 days before the earliest - date to produce data for calculating smoothed estimates. + Calculate start of date range as a static offset from the end date. + Pad date range by an additional 7 days before the earliest date to + produce data for calculating smoothed estimates. Parameters ---------- export_start_date: date first date to retrieve data for + export_end_date: date + last date to retrieve data for num_export_days: int - number of days before end date ("now") to export + number of days before end date to export Returns ------- @@ -114,7 +116,6 @@ def get_date_range(export_start_date, num_export_days): """ PAD_DAYS = 7 - end_date = date.today() if num_export_days == "all": # Get all dates since export_start_date. start_date = export_start_date @@ -123,13 +124,13 @@ def get_date_range(export_start_date, num_export_days): # dates/datetimes to date to avoid error from trying to compare # different types. start_date = max( - end_date - timedelta(days=num_export_days), + export_end_date - timedelta(days=num_export_days), export_start_date.date() ) retrieve_dates = [ start_date - timedelta(days=PAD_DAYS - 1), - end_date] + export_end_date] return retrieve_dates @@ -254,7 +255,7 @@ def initialize_credentials(credentials): pandas_gbq.context.project = credentials.project_id -def pull_gs_data(credentials, export_start_date, num_export_days): +def pull_gs_data(credentials, export_start_date, export_end_date, num_export_days): """Pull latest dataset for each geo level and combine. PS: No information for PR @@ -267,15 +268,18 @@ def pull_gs_data(credentials, export_start_date, num_export_days): "county" or "state" export_start_date: date first date to retrieve data for + export_end_date: date + last date to retrieve data for num_export_days: int - number of days before end date ("now") to export + number of days before end date to export Returns ------- dict: {"county": pd.DataFrame, "state": pd.DataFrame} """ # Fetch and format dates we want to attempt to retrieve - retrieve_dates = get_date_range(export_start_date, num_export_days) + retrieve_dates = get_date_range( + export_start_date, export_end_date, num_export_days) retrieve_dates = format_dates_for_query(retrieve_dates) initialize_credentials(credentials) diff --git a/google_symptoms/delphi_google_symptoms/run.py b/google_symptoms/delphi_google_symptoms/run.py index 2ef71c30b..76aa34223 100644 --- a/google_symptoms/delphi_google_symptoms/run.py +++ b/google_symptoms/delphi_google_symptoms/run.py @@ -5,7 +5,7 @@ when the module is run with `python -m delphi_google_symptoms`. """ import time -from datetime import datetime +from datetime import datetime, date from itertools import product import numpy as np @@ -45,6 +45,12 @@ def run_module(params): export_start_date = datetime.strptime( params["indicator"]["export_start_date"], "%Y-%m-%d") + # If end_date not specified, use current date. + export_end_date = datetime.strptime( + params["indicator"].get( + "export_end_date", datetime.strftime(date.today(), "%Y-%m-%d") + ), "%Y-%m-%d") + export_dir = params["common"]["export_dir"] num_export_days = params["indicator"].get("num_export_days", "all") @@ -55,6 +61,7 @@ def run_module(params): # Pull GS data dfs = pull_gs_data(params["indicator"]["bigquery_credentials"], export_start_date, + export_end_date, num_export_days) gmpr = geomap.GeoMapper() diff --git a/google_symptoms/tests/test_pull.py b/google_symptoms/tests/test_pull.py index e53a1a481..4f81ad9e2 100644 --- a/google_symptoms/tests/test_pull.py +++ b/google_symptoms/tests/test_pull.py @@ -48,7 +48,7 @@ def test_good_file(self, mock_credentials, mock_read_gbq): mock_credentials.return_value = None dfs = pull_gs_data( - "", datetime.strptime("20201230", "%Y%m%d"), 0) + "", datetime.strptime("20201230", "%Y%m%d"), date.today(), 0) for level in ["county", "state"]: df = dfs[level] @@ -85,6 +85,7 @@ class TestPullHelperFuncs: def test_get_date_range_recent_export_start_date(self): output = get_date_range( datetime.strptime("20201230", "%Y%m%d"), + date.today(), 14 ) @@ -96,6 +97,7 @@ def test_get_date_range_recent_export_start_date(self): def test_get_date_range(self): output = get_date_range( datetime.strptime("20200201", "%Y%m%d"), + date.today(), 14 ) diff --git a/hhs_hosp/Makefile b/hhs_hosp/Makefile index d4cceb694..e0aca95c0 100644 --- a/hhs_hosp/Makefile +++ b/hhs_hosp/Makefile @@ -25,5 +25,5 @@ clean: run: env/bin/python -m $(dir) - env/bin/python -m delphi_utils.validator + env/bin/python -m delphi_utils.validator --dry_run env/bin/python -m delphi_utils.archive