diff --git a/.gitignore b/.gitignore index 1b1a8273..50352f70 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ tmp/ extras/**.html *.pdf +.Renviron \ No newline at end of file diff --git a/DESCRIPTION b/DESCRIPTION index 50721377..f22b374c 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -27,6 +27,7 @@ Imports: purrr, recipes (>= 1.0.4), rlang, + targets, tibble, tidyr Suggests: diff --git a/NAMESPACE b/NAMESPACE index 7d8bec64..80f027ab 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -4,8 +4,9 @@ export(absolute_error) export(add_id) export(arx_postprocess) export(arx_preprocess) +export(clear_lastminute_nas) export(collapse_cards) -export(confirm_insufficient_data) +export(confirm_sufficient_data) export(covidhub_probs) export(evaluate_predictions) export(extend_ahead) @@ -15,6 +16,12 @@ export(format_storage) export(id_ahead_ensemble_grid) export(interval_coverage) export(lookup_ids) +export(make_data_targets) +export(make_ensemble_targets) +export(make_external_names_and_scores) +export(make_forecasts_and_scores) +export(make_forecasts_and_scores_by_ahead) +export(make_shared_grids) export(make_target_param_grid) export(manage_S3_forecast_cache) export(overprediction) @@ -71,6 +78,7 @@ importFrom(epipredict,step_epi_lag) importFrom(epipredict,step_epi_naomit) importFrom(epipredict,step_population_scaling) importFrom(epipredict,step_training_window) +importFrom(epiprocess,as_epi_df) importFrom(epiprocess,epix_slide) importFrom(here,here) importFrom(magrittr,"%<>%") @@ -84,7 +92,10 @@ importFrom(rlang,.data) importFrom(rlang,quo) importFrom(rlang,sym) importFrom(rlang,syms) +importFrom(targets,tar_group) +importFrom(targets,tar_target) importFrom(tibble,tibble) +importFrom(tidyr,drop_na) importFrom(tidyr,expand_grid) importFrom(tidyr,pivot_wider) importFrom(tidyr,unnest) diff --git a/R/forecaster.R b/R/forecaster.R index 79f204ff..aff4d0fd 100644 --- a/R/forecaster.R +++ b/R/forecaster.R @@ -43,29 +43,39 @@ perform_sanity_checks <- function(epi_data, #' confirm that there's enough data to run this model #' @description #' epipredict is a little bit fragile about having enough data to train; we want -#' to be able to return a null result rather than error out; this check say to -#' return a null +#' to be able to return a null result rather than error out. #' @param epi_data the input data -#' @param buffer how many training data to insist on having (e.g. if `buffer=1`, -#' this trains on one sample; the default is set so that `linear_reg` isn't -#' rank deficient) #' @param ahead the effective ahead; may be infinite if there isn't enough data. #' @param args_input the input as supplied to `forecaster_pred`; lags is the #' important argument, which may or may not be defined, with the default #' coming from `arx_args_list` +#' @param buffer how many training data to insist on having (e.g. if `buffer=1`, +#' this trains on one sample; the default is set so that `linear_reg` isn't +#' rank deficient) +#' @importFrom tidyr drop_na #' @export -confirm_insufficient_data <- function(epi_data, ahead, args_input, buffer = 9) { +confirm_sufficient_data <- function(epi_data, ahead, args_input, buffer = 9) { if (!is.null(args_input$lags)) { lag_max <- max(args_input$lags) } else { lag_max <- 14 # default value of 2 weeks } + + # TODO: Buffer should probably be 2 * n(lags) * n(predictors). But honestly, + # this needs to be fixed in epipredict itself, see + # https://github.com/cmu-delphi/epipredict/issues/106. + return( - is.infinite(ahead) || - as.integer(max(epi_data$time_value) - min(epi_data$time_value)) <= - lag_max + ahead + buffer + !is.infinite(ahead) && + epi_data %>% + drop_na() %>% + group_by(geo_value) %>% + summarise(has_enough_data = n_distinct(time_value) >= lag_max + ahead + buffer) %>% + pull(has_enough_data) %>% + any() ) } + # TODO replace with `step_arx_forecaster` #' add the default steps for arx_forecaster #' @description @@ -149,7 +159,11 @@ run_workflow_and_format <- function(preproc, postproc, trainer, epi_data) { latest <- get_test_data(recipe = preproc, x = epi_data) pred <- predict(workflow, latest) # the forecast_date may currently be the max time_value - true_forecast_date <- attributes(epi_data)$metadata$as_of + as_of <- attributes(epi_data)$metadata$as_of + if (is.null(as_of)) { + as_of <- max(epi_data$time_value) + } + true_forecast_date <- as_of return(format_storage(pred, true_forecast_date)) } @@ -176,6 +190,8 @@ run_workflow_and_format <- function(preproc, postproc, trainer, epi_data) { #' contain `ahead` #' @param forecaster_args_names a bit of a hack around targets, it contains #' the names of the `forecaster_args`. +#' @param date_range_step_size the step size (in days) to use when generating +#' the forecast dates. #' @importFrom epiprocess epix_slide #' @importFrom cli cli_abort #' @importFrom rlang !! @@ -187,7 +203,8 @@ forecaster_pred <- function(data, slide_training = 0, n_training_pad = 5, forecaster_args = list(), - forecaster_args_names = list()) { + forecaster_args_names = list(), + date_range_step_size = 1L) { archive <- data if (length(forecaster_args) > 0) { names(forecaster_args) <- forecaster_args_names @@ -210,25 +227,47 @@ forecaster_pred <- function(data, # restrict the dataset to areas where training is possible start_date <- min(archive$DT$time_value) + net_slide_training end_date <- max(archive$DT$time_value) - forecaster_args$ahead - valid_predict_dates <- seq.Date(from = start_date, to = end_date, by = 1) + valid_predict_dates <- seq.Date(from = start_date, to = end_date, by = date_range_step_size) # first generate the forecasts before <- n_training + n_training_pad - 1 - ## TODO epix_slide doesn't support infinite `before` + ## TODO: epix_slide doesn't support infinite `before` ## https://github.com/cmu-delphi/epiprocess/issues/219 if (before == Inf) before <- 365L * 10000 res <- epix_slide(archive, function(data, gk, rtv, ...) { - do.call( - forecaster, - append( - list( - epi_data = data, - outcome = outcome, - extra_sources = extra_sources - ), - forecaster_args - ) + # TODO: Can we get rid of this tryCatch and instead hook it up to targets + # error handling or something else? + # https://github.com/cmu-delphi/exploration-tooling/issues/41 + tryCatch( + { + do.call( + forecaster, + append( + list( + epi_data = data, + outcome = outcome, + extra_sources = extra_sources + ), + forecaster_args + ) + ) + }, + error = function(e) { + if (interactive()) { + browser() + } else { + dump_vars <- list( + data = data, + rtv = rtv, + forecaster = forecaster, + forecaster_args = forecaster_args, + e = e + ) + saveRDS(dump_vars, "forecaster_pred_error.rds") + e + } + } ) }, before = before, diff --git a/R/forecaster_flatline.R b/R/forecaster_flatline.R index 0ff56164..ab0c5075 100644 --- a/R/forecaster_flatline.R +++ b/R/forecaster_flatline.R @@ -14,6 +14,8 @@ flatline_fc <- function(epi_data, quantile_levels = covidhub_probs(), ...) { # perform any preprocessing not supported by epipredict + # this is a temp fix until a real fix gets put into epipredict + epi_data <- clear_lastminute_nas(epi_data) # one that every forecaster will need to handle: how to manage max(time_value) # that's older than the `as_of` date epidataAhead <- extend_ahead(epi_data, ahead) @@ -23,7 +25,7 @@ flatline_fc <- function(epi_data, effective_ahead <- epidataAhead[[2]] args_input <- list(...) # edge case where there is no data or less data than the lags; eventually epipredict will handle this - if (confirm_insufficient_data(epi_data, effective_ahead, args_input)) { + if (!confirm_sufficient_data(epi_data, effective_ahead, args_input)) { null_result <- tibble( geo_value = character(), forecast_date = lubridate::Date(), @@ -48,6 +50,9 @@ flatline_fc <- function(epi_data, # since this is just the flatline, we don't need much of anything res <- flatline_forecaster(epi_data, outcome = outcome, args_list = args_list) true_forecast_date <- attributes(epi_data)$metadata$as_of + if (is.null(true_forecast_date)) { + true_forecast_date <- max(epi_data$time_value) + } pred <- format_storage(res$predictions, true_forecast_date) # (geo_value, forecast_date, target_end_date, quantile, value) # finally, any postprocessing not supported by epipredict e.g. calibration diff --git a/R/forecaster_scaled_pop.R b/R/forecaster_scaled_pop.R index 0086df7f..6db0b8b0 100644 --- a/R/forecaster_scaled_pop.R +++ b/R/forecaster_scaled_pop.R @@ -49,6 +49,8 @@ scaled_pop <- function(epi_data, quantile_levels = covidhub_probs(), ...) { # perform any preprocessing not supported by epipredict + # this is a temp fix until a real fix gets put into epipredict + epi_data <- clear_lastminute_nas(epi_data) # one that every forecaster will need to handle: how to manage max(time_value) # that's older than the `as_of` date epidataAhead <- extend_ahead(epi_data, ahead) @@ -58,7 +60,7 @@ scaled_pop <- function(epi_data, effective_ahead <- epidataAhead[[2]] args_input <- list(...) # edge case where there is no data or less data than the lags; eventually epipredict will handle this - if (confirm_insufficient_data(epi_data, effective_ahead, args_input)) { + if (!confirm_sufficient_data(epi_data, effective_ahead, args_input)) { null_result <- tibble( geo_value = character(), forecast_date = lubridate::Date(), @@ -73,6 +75,7 @@ scaled_pop <- function(epi_data, args_list <- do.call(arx_args_list, args_input) # if you want to ignore extra_sources, setting predictors is the way to do it predictors <- c(outcome, extra_sources) + # TODO: Partial match quantile_level coming from here (on Dmitry's machine) argsPredictorsTrainer <- perform_sanity_checks(epi_data, outcome, predictors, trainer, args_list) args_list <- argsPredictorsTrainer[[1]] predictors <- argsPredictorsTrainer[[2]] @@ -98,7 +101,6 @@ scaled_pop <- function(epi_data, # postprocessing supported by epipredict postproc <- frosting() postproc %<>% arx_postprocess(trainer, args_list) - postproc if (pop_scaling) { postproc %<>% layer_population_scaling( .pred, .pred_distn, diff --git a/R/latency_adjusting.R b/R/latency_adjusting.R index 25642f09..b9bd97d6 100644 --- a/R/latency_adjusting.R +++ b/R/latency_adjusting.R @@ -11,10 +11,13 @@ extend_ahead <- function(epi_data, ahead) { time_values <- epi_data$time_value if (length(time_values) > 0) { + as_of <- attributes(epi_data)$metadata$as_of + max_time <- max(time_values) + if (is.null(as_of)) { + as_of <- max_time + } effective_ahead <- as.integer( - as.Date(attributes(epi_data)$metadata$as_of) - - max(time_values) + - ahead + as.Date(as_of) - max_time + ahead ) } else { effective_ahead <- Inf diff --git a/R/targets_utils.R b/R/targets_utils.R index e4d8d371..5f0230c2 100644 --- a/R/targets_utils.R +++ b/R/targets_utils.R @@ -6,8 +6,10 @@ #' @export #' @importFrom rlang syms make_target_param_grid <- function(param_grid) { - param_grid %<>% mutate(forecaster = syms(forecaster)) - param_grid %<>% mutate(trainer = syms(trainer)) + param_grid %<>% + select(-any_of("parent_id")) %>% + mutate(forecaster = syms(forecaster)) %>% + mutate(trainer = syms(trainer)) list_of_params <- lists_of_real_values(param_grid) list_names <- map(list_of_params, names) tibble( @@ -23,7 +25,282 @@ make_target_param_grid <- function(param_grid) { lists_of_real_values <- function(param_grid) { full_lists <- transpose(param_grid %>% select(-forecaster, -id)) filter_nonvalues <- function(x) { - Filter(Negate(function(a) is.null(a) || is.na(a)), x) + Filter(function(a) !all(is.null(a)) && !all(is.na(a)), x) } map(full_lists, filter_nonvalues) } + +#' Make common targets for fetching data +#' +#' Relies on the following globals: +#' - `hhs_signal` +#' - `chng_signal` +#' - `fetch_args` +#' +#' @export +make_data_targets <- function() { + list( + tar_target( + name = hhs_latest_data, + command = { + epidatr::pub_covidcast( + source = "hhs", + signals = hhs_signal, + geo_type = "state", + time_type = "day", + geo_values = "*", + time_values = epirange(from = "2020-01-01", to = "2024-01-01"), + fetch_args = fetch_args + ) + } + ), + tar_target( + name = chng_latest_data, + command = { + epidatr::pub_covidcast( + source = "chng", + signals = chng_signal, + geo_type = "state", + time_type = "day", + geo_values = "*", + time_values = epirange(from = "2020-01-01", to = "2024-01-01"), + fetch_args = fetch_args + ) + } + ), + tar_target( + name = hhs_evaluation_data, + command = { + hhs_latest_data %>% + rename( + actual = value, + target_end_date = time_value + ) + } + ), + tar_target( + name = hhs_latest_data_2022, + command = { + hhs_latest_data # %>% filter(time_value >= "2022-01-01", time_value < "2022-04-01") + } + ), + tar_target( + name = chng_latest_data_2022, + command = { + chng_latest_data # %>% filter(time_value >= "2022-01-01", time_value < "2022-04-01") + } + ), + tar_target( + name = hhs_archive_data_2022, + command = { + epidatr::pub_covidcast( + source = "hhs", + signals = hhs_signal, + geo_type = "state", + time_type = "day", + geo_values = "*", + time_values = epirange(from = "20220101", to = "20220401"), + issues = "*", + fetch_args = fetch_args + ) + } + ), + tar_target( + name = chng_archive_data_2022, + command = { + epidatr::pub_covidcast( + source = "chng", + signals = chng_signal, + geo_type = "state", + time_type = "day", + geo_values = "*", + time_values = epirange(from = "20220101", to = "20220401"), + issues = "*", + fetch_args = fetch_args + ) + } + ), + tar_target( + name = joined_archive_data_2022, + command = { + hhs_archive_data_2022 %<>% + select(geo_value, time_value, value, issue) %>% + rename("hhs" := value) %>% + rename(version = issue) %>% + as_epi_archive( + geo_type = "state", + time_type = "day", + compactify = TRUE + ) + chng_archive_data_2022 %<>% + select(geo_value, time_value, value, issue) %>% + rename("chng" := value) %>% + rename(version = issue) %>% + as_epi_archive( + geo_type = "state", + time_type = "day", + compactify = TRUE + ) + epix_merge(hhs_archive_data_2022, chng_archive_data_2022, sync = "locf")$DT %>% + drop_na() %>% + filter(!geo_value %in% c("as", "pr", "vi", "gu", "mp")) %>% + epiprocess::as_epi_archive() + } + ) + ) +} + +#' Make common targets for forecasting experiments +#' @export +make_shared_grids <- function() { + list( + tidyr::expand_grid( + forecaster = "scaled_pop", + trainer = c("linreg", "quantreg"), + ahead = 1:4, + pop_scaling = c(FALSE) + ), + tidyr::expand_grid( + forecaster = "scaled_pop", + trainer = c("linreg", "quantreg"), + ahead = 5:7, + lags = list(c(0, 3, 5, 7, 14), c(0, 7, 14)), + pop_scaling = c(FALSE) + ) + ) +} + +#' Make forecasts and scores by ahead targets +#' @description +#' globals this depends on: +#' Relies on the following globals: +#' - `date_step` +#' @export +make_forecasts_and_scores_by_ahead <- function() { + tar_map( + values = targets_param_grid, + names = id, + unlist = FALSE, + tar_target_raw( + name = ONE_AHEAD_FORECAST_NAME, + command = expression( + forecaster_pred( + data = joined_archive_data_2022, + outcome = "hhs", + extra_sources = "", + forecaster = forecaster, + n_training_pad = 30L, + forecaster_args = params, + forecaster_args_names = param_names, + date_range_step_size = date_step + ) + ) + ), + tar_target_raw( + name = ONE_AHEAD_SCORE_NAME, + command = expression( + run_evaluation_measure( + data = forecast_by_ahead, + evaluation_data = hhs_evaluation_data, + measure = list( + wis = weighted_interval_score, + ae = absolute_error, + cov_80 = interval_coverage(0.8) + ) + ) + ) + ) + ) +} + +#' Make forecasts and scores targets +#' @export +make_forecasts_and_scores <- function() { + tar_map( + values = forecaster_parent_id_map, + names = parent_id, + tar_target( + name = forecast, + command = { + bind_rows(forecast_component_ids) %>% + mutate(parent_forecaster = parent_id) + } + ), + tar_target( + name = score, + command = { + bind_rows(score_component_ids) %>% + mutate(parent_forecaster = parent_id) + } + ) + ) +} + +#' Make ensemble targets +#' @export +make_ensemble_targets <- function() { + list() +} + + +#' Make external names and scores targets +#' @importFrom targets tar_target tar_group +#' @export +make_external_names_and_scores <- function() { + external_scores_path <- Sys.getenv("EXTERNAL_SCORES_PATH", "") + if (external_scores_path != "") { + external_names_and_scores <- list( + tar_target( + name = external_scores_df, + command = { + readRDS(external_scores_path) %>% + group_by(forecaster) %>% + tar_group() + }, + iteration = "group", + garbage_collection = TRUE + ), + tar_target( + name = external_names, + command = { + external_scores_df %>% + group_by(forecaster) %>% + group_keys() %>% + pull(forecaster) + }, + garbage_collection = TRUE + ), + tar_target( + name = external_scores, + pattern = map(external_scores_df), + command = { + external_scores_df + }, + # This step causes the pipeline to exit with an error, apparently due to + # running out of memory. Run this in series on a non-parallel `crew` + # controller to avoid. + # https://books.ropensci.org/targets/crew.html#heterogeneous-workers + resources = tar_resources( + crew = tar_resources_crew(controller = "serial_controller") + ), + memory = "transient", + garbage_collection = TRUE + ) + ) + } else { + external_names_and_scores <- list( + tar_target( + name = external_names, + command = { + c() + } + ), + tar_target( + name = external_scores, + command = { + data.frame() + } + ) + ) + } +} diff --git a/R/small_utils.R b/R/utils.R similarity index 84% rename from R/small_utils.R rename to R/utils.R index d8d41bf2..e0e81355 100644 --- a/R/small_utils.R +++ b/R/utils.R @@ -84,3 +84,20 @@ id_ahead_ensemble_grid <- function(ensemble_grid, aheads, n_adj = 2) { mutate(forecaster_ids = list(map2_vec(forecasters, ahead, single_id, n_adj = 2))) return(ensemble_grid) } + + +#' temporary patch that pulls `NA`'s out of an epi_df +#' @description +#' just delete rows that have NA's in them. eventually epipredict should directly handle this so we don't have to +#' @param epi_data the epi_df to be fixed +#' @importFrom tidyr drop_na +#' @importFrom epiprocess as_epi_df +#' @export +clear_lastminute_nas <- function(epi_data) { + meta_data <- attr(epi_data, "metadata") + epi_data %<>% + drop_na() %>% + as_epi_df() + attr(epi_data, "metadata") <- meta_data + return(epi_data) +} diff --git a/_targets.yaml b/_targets.yaml index 394a91b0..0d733f62 100644 --- a/_targets.yaml +++ b/_targets.yaml @@ -1,16 +1,16 @@ covid_hosp_explore: - script: covid_hosp_explore.R - store: covid_hosp_explore - use_crew: yes + script: covid_hosp_explore.R + store: covid_hosp_explore + use_crew: no flu_hosp_explore: - script: flu_hosp_explore.R - store: flu_hosp_explore - use_crew: yes + script: flu_hosp_explore.R + store: flu_hosp_explore + use_crew: no flu_hosp_prod: - script: flu_hosp_prod.R - store: flu_hosp_prod - use_crew: yes + script: flu_hosp_prod.R + store: flu_hosp_prod + use_crew: no covid_hosp_prod: - script: covid_hosp_prod.R - store: covid_hosp_prod - use_crew: yes + script: covid_hosp_prod.R + store: covid_hosp_prod + use_crew: no diff --git a/covid_hosp_explore.R b/covid_hosp_explore.R index 93a6641d..bd35d06f 100644 --- a/covid_hosp_explore.R +++ b/covid_hosp_explore.R @@ -1,229 +1,78 @@ -suppressPackageStartupMessages({ - library(targets) - library(tarchetypes) # Load other packages as needed. - - library(crew) - library(dplyr) - library(epipredict) - library(epieval) - library(lubridate) - library(parsnip) - library(purrr) - library(tibble) - library(tidyr) - library(rlang) - library(epidatr) -}) - -# The external scores processing causes the pipeline to exit with an error, -# apparently due to running out of memory. Set up a non-parallel `crew` -# controller to avoid. -# https://books.ropensci.org/targets/crew.html#heterogeneous-workers -main_controller <- crew_controller_local( - name = "main_controller", - workers = parallel::detectCores() - 5 -) -serial_controller <- crew_controller_local( - name = "serial_controller", - workers = 1L -) - -tar_option_set( - packages = c( - "assertthat", - "dplyr", - "epieval", - "epipredict", - "ggplot2", - "lubridate", - "parsnip", - "tibble", - "tidyr", - "epidatr" - ), # packages that your targets need to run - imports = c("epieval", "parsnip"), - format = "qs", # Optionally set the default storage format. qs is fast. - controller = crew_controller_group(main_controller, serial_controller), - # Set default crew controller. - # https://books.ropensci.org/targets/crew.html#heterogeneous-workers - resources = tar_resources( - crew = tar_resources_crew(controller = "main_controller") - ) -) - # Run the R scripts in the R/ folder with your custom functions: # tar_source() # where the forecasters and parameters are joined; see either the variable param_grid or `tar_read(forecasters)` -source("covid_hosp_explore/forecaster_instantiation.R") -source("covid_hosp_explore/data_targets.R") -# Auto-generated in run.R -source("covid_hosp_explore/dynamic_constants.R") +source("extras/targets-common.R") -forecasts_and_scores_by_ahead <- tar_map( - values = forecaster_param_grids, - names = id, - unlist = FALSE, - tar_target_raw( - name = ONE_AHEAD_FORECAST_NAME, - command = expression( - forecaster_pred( - data = joined_archive_data_2022, - outcome = "hhs", - extra_sources = "", - forecaster = forecaster, - n_training_pad = 30L, - forecaster_args = params, - forecaster_args_names = param_names - ) - ) - ), - tar_target_raw( - name = ONE_AHEAD_SCORE_NAME, - command = expression( - run_evaluation_measure( - data = forecast_by_ahead, - evaluation_data = hhs_evaluation_data, - measure = list( - wis = weighted_interval_score, - ae = absolute_error, - cov_80 = interval_coverage(0.8) - ) - ) +# Add custom parameter combinations in the list below. +make_unique_grids <- function() { + list( + tidyr::expand_grid( + forecaster = "scaled_pop", + trainer = c("linreg", "quantreg"), + ahead = 1:4, + pop_scaling = c(TRUE) + ), + tidyr::expand_grid( + forecaster = "scaled_pop", + trainer = c("linreg", "quantreg"), + ahead = 5:7, + lags = list(c(0, 3, 5, 7, 14), c(0, 7, 14)), + pop_scaling = c(TRUE) ) ) -) +} -forecasts_and_scores <- tar_map( - values = forecaster_parent_id_map, - names = parent_id, - tar_target( - name = forecast, - command = { - bind_rows(forecast_component_ids) %>% - mutate(parent_forecaster = parent_id) - } - ), - tar_target( - name = score, - command = { - bind_rows(score_component_ids) %>% - mutate(parent_forecaster = parent_id) - } +# TODO: Find a way to clean all this stuff about param grids up. +param_grid <- append( + make_shared_grids(), + make_unique_grids() +) %>% + map(add_id) %>% + bind_rows() %>% + relocate(parent_id, id, .after = last_col()) +forecaster_parent_id_map <- param_grid %>% + group_by(parent_id) %>% + summarize( + forecast_component_ids = list(syms(paste0(ONE_AHEAD_FORECAST_NAME, "_", gsub(" ", ".", id, fixed = TRUE)))), + score_component_ids = list(syms(paste0(ONE_AHEAD_SCORE_NAME, "_", gsub(" ", ".", id, fixed = TRUE)))) ) -) - -ensemble_keys <- list(a = c(300, 15)) -ensemble_targets <- tar_target( - name = ensembles, - command = { - ensemble_keys - } -) +targets_param_grid <- make_target_param_grid(param_grid) %>% + ## TODO This forecaster is hanging. Filter it out for now. + filter(id != "necessary endless 5") -# The combine approach below is taken from the manual: -# https://books.ropensci.org/targets/static.html#combine -# The key is that the map above has unlist = FALSE. -ensemble_forecast <- tar_map( - values = ensemble_keys, - tar_combine( - name = ensemble_forecast, - # TODO: Needs a lookup table to select the right forecasters - list( - forecasts_and_scores_by_ahead[["forecast_by_ahead"]][[1]], - forecasts_and_scores_by_ahead[["forecast_by_ahead"]][[2]] - ), - command = { - bind_rows(!!!.x, .id = "forecaster") %>% - pivot_wider( - names_prefix = "forecaster", - names_from = forecaster, - values_from = value - ) %>% - mutate( - value = a + rowMeans(across(starts_with("forecaster"))) - ) %>% - select(-starts_with("forecaster")) - } - ), +# not actually used downstream, this is for lookup during plotting and human evaluation +forecaster_params_grid_target <- list( tar_target( - name = ensemble_score, + name = forecaster_params_grid, command = { - run_evaluation_measure( - data = ensemble_forecast, - evaluation_data = hhs_evaluation_data, - measures = list( - wis = weighted_interval_score, - ae = absolute_error, - cov_80 = interval_coverage(0.8) - ) - ) + param_grid } ) ) -if (LOAD_EXTERNAL_SCORES) { - external_names_and_scores <- list( - tar_target( - name = external_scores_df, - command = { - readRDS(external_scores_path) %>% - group_by(forecaster) %>% - tar_group() - }, - iteration = "group", - garbage_collection = TRUE - ), - tar_target( - name = external_names, - command = { - external_scores_df %>% - group_by(forecaster) %>% - group_keys() %>% - pull(forecaster) - }, - garbage_collection = TRUE - ), - tar_target( - name = external_scores, - pattern = map(external_scores_df), - command = { - external_scores_df - }, - # This step causes the pipeline to exit with an error, apparently due to - # running out of memory. Run this in series on a non-parallel `crew` - # controller to avoid. - # https://books.ropensci.org/targets/crew.html#heterogeneous-workers - resources = tar_resources( - crew = tar_resources_crew(controller = "serial_controller") - ), - memory = "transient", - garbage_collection = TRUE - ) - ) -} else { - external_names_and_scores <- list( - tar_target( - name = external_names, - command = { - c() - } - ), - tar_target( - name = external_scores, - command = { - data.frame() - } - ) - ) -} + +# These globals are needed by the function below (and they need to persist +# during the actual targets run, since the commands are frozen as expressions). +hhs_signal <- "confirmed_admissions_covid_1d" +chng_signal <- "smoothed_adj_outpatient_covid" +fetch_args <- epidatr::fetch_args_list(return_empty = TRUE, timeout_seconds = 200) +data_targets <- make_data_targets() + + +# These globals are needed by the function below (and they need to persist +# during the actual targets run, since the commands are frozen as expressions). +date_step <- 1L +forecasts_and_scores_by_ahead <- make_forecasts_and_scores_by_ahead() +forecasts_and_scores <- make_forecasts_and_scores() +ensemble_targets <- make_ensemble_targets() +external_names_and_scores <- make_external_names_and_scores() list( data_targets, - forecaster_targets, + forecaster_params_grid_target, forecasts_and_scores_by_ahead, forecasts_and_scores, ensemble_targets, - ensemble_forecast, external_names_and_scores ) diff --git a/covid_hosp_explore/data_targets.R b/covid_hosp_explore/data_targets.R deleted file mode 100644 index 441065ab..00000000 --- a/covid_hosp_explore/data_targets.R +++ /dev/null @@ -1,117 +0,0 @@ -data_targets <- local({ - geo_type <- "state" - time_type <- "day" - geo_values <- "*" - time_values <- epirange(from = "2020-01-01", to = "2024-01-01") - fetch_args <- fetch_args_list(return_empty = TRUE, timeout_seconds = 200) - issues <- "*" - - list( - tar_target( - name = hhs_latest_data, - command = { - pub_covidcast( - source = "hhs", - signals = "confirmed_admissions_covid_1d", - geo_type = geo_type, - time_type = time_type, - geo_values = geo_values, - time_values = time_values, - fetch_args = fetch_args - ) - } - ), - tar_target( - name = chng_latest_data, - command = { - pub_covidcast( - source = "chng", - signals = "smoothed_adj_outpatient_covid", - geo_type = geo_type, - time_type = time_type, - geo_values = geo_values, - time_values = time_values, - fetch_args = fetch_args - ) - } - ), - tar_target( - name = hhs_evaluation_data, - command = { - hhs_latest_data %>% - rename( - actual = value, - target_end_date = time_value - ) - } - ), - tar_target( - name = hhs_latest_data_2022, - command = { - hhs_latest_data %>% - filter(time_value >= "2022-01-01") - } - ), - tar_target( - name = chng_latest_data_2022, - command = { - chng_latest_data %>% - filter(time_value >= "2022-01-01") - } - ), - tar_target( - name = hhs_archive_data_2022, - command = { - pub_covidcast( - source = "hhs", - signals = "confirmed_admissions_covid_1d", - geo_type = geo_type, - time_type = time_type, - geo_values = geo_values, - time_values = time_values, - issues = "*", - fetch_args = fetch_args - ) - } - ), - tar_target( - name = chng_archive_data_2022, - command = { - pub_covidcast( - source = "chng", - signals = "smoothed_adj_outpatient_covid", - geo_type = geo_type, - time_type = time_type, - geo_values = geo_values, - time_values = time_values, - issues = "*", - fetch_args = fetch_args - ) - } - ), - tar_target( - name = joined_archive_data_2022, - command = { - hhs_archive_data_2022 %<>% - select(geo_value, time_value, value, issue) %>% - rename("hhs" := value) %>% - rename(version = issue) %>% - as_epi_archive( - geo_type = "state", - time_type = "day", - compactify = TRUE - ) - chng_archive_data_2022 %<>% - select(geo_value, time_value, value, issue) %>% - rename("chng" := value) %>% - rename(version = issue) %>% - as_epi_archive( - geo_type = "state", - time_type = "day", - compactify = TRUE - ) - epix_merge(hhs_archive_data_2022, chng_archive_data_2022, sync = "locf") - } - ) - ) -}) diff --git a/covid_hosp_explore/forecaster_instantiation.R b/covid_hosp_explore/forecaster_instantiation.R deleted file mode 100644 index 99757dea..00000000 --- a/covid_hosp_explore/forecaster_instantiation.R +++ /dev/null @@ -1,45 +0,0 @@ -linreg <- parsnip::linear_reg() -quantreg <- epipredict::quantile_reg() - -grids <- list( - tidyr::expand_grid( - forecaster = "scaled_pop", - trainer = c("linreg", "quantreg"), - ahead = 1:4, - pop_scaling = c(TRUE, FALSE) - ), - tidyr::expand_grid( - forecaster = "scaled_pop", - trainer = c("linreg", "quantreg"), - ahead = 5:7, - lags = list(c(0, 3, 5, 7, 14), c(0, 7, 14)), - pop_scaling = c(TRUE, FALSE) - ) -) - -# bind them together and give static ids; if you add a new field to a given -# expand_grid, everything will get a new id, so it's better to add a new -# expand_grid instead -param_grid <- bind_rows(map(grids, add_id)) %>% - relocate(parent_id, id, .after = last_col()) - -ONE_AHEAD_FORECAST_NAME <- "forecast_by_ahead" -ONE_AHEAD_SCORE_NAME <- "score_by_ahead" -forecaster_parent_id_map <- param_grid %>% - group_by(parent_id) %>% - summarize( - forecast_component_ids = list(syms(paste0(ONE_AHEAD_FORECAST_NAME, "_", gsub(" ", ".", id, fixed = TRUE)))), - score_component_ids = list(syms(paste0(ONE_AHEAD_SCORE_NAME, "_", gsub(" ", ".", id, fixed = TRUE)))) - ) - -forecaster_param_grids <- make_target_param_grid(select(param_grid, -parent_id)) - -# not actually used downstream, this is for lookup during plotting and human evaluation -forecaster_targets <- list( - tar_target( - name = forecasters, - command = { - param_grid - } - ) -) diff --git a/extras/targets-common.R b/extras/targets-common.R new file mode 100644 index 00000000..9be40906 --- /dev/null +++ b/extras/targets-common.R @@ -0,0 +1,56 @@ +suppressPackageStartupMessages({ + library(targets) + library(tarchetypes) # Load other packages as needed. + + library(crew) + library(dplyr) + library(epipredict) + library(epieval) + library(lubridate) + library(parsnip) + library(purrr) + library(tibble) + library(tidyr) + library(rlang) +}) + +# The external scores processing causes the pipeline to exit with an error, +# apparently due to running out of memory. Set up a non-parallel `crew` +# controller to avoid. +# https://books.ropensci.org/targets/crew.html#heterogeneous-workers +main_controller <- crew_controller_local( + name = "main_controller", + workers = parallel::detectCores() - 5 +) +serial_controller <- crew_controller_local( + name = "serial_controller", + workers = 1L +) + +tar_option_set( + packages = c( + "assertthat", + "dplyr", + "epieval", + "epipredict", + "ggplot2", + "lubridate", + "parsnip", + "tibble", + "tidyr", + "epidatr" + ), # packages that your targets need to run + imports = c("epieval"), + format = "qs", # Optionally set the default storage format. qs is fast. + controller = crew_controller_group(main_controller, serial_controller), + # Set default crew controller. + # https://books.ropensci.org/targets/crew.html#heterogeneous-workers + resources = tar_resources( + crew = tar_resources_crew(controller = "main_controller") + ) +) + +linreg <- parsnip::linear_reg() +quantreg <- epipredict::quantile_reg() +ONE_AHEAD_FORECAST_NAME <- "forecast_by_ahead" +ONE_AHEAD_SCORE_NAME <- "score_by_ahead" diff --git a/flu_hosp_explore.R b/flu_hosp_explore.R index 90fa1ad4..0103751a 100644 --- a/flu_hosp_explore.R +++ b/flu_hosp_explore.R @@ -1,270 +1,63 @@ -suppressPackageStartupMessages({ - library(targets) - library(tarchetypes) # Load other packages as needed. - - library(crew) - library(dplyr) - library(epipredict) - library(epieval) - library(lubridate) - library(parsnip) - library(purrr) - library(tibble) - library(tidyr) - library(rlang) - library(epidatr) -}) - -tar_option_set( - packages = c( - "assertthat", - "dplyr", - "epieval", - "epipredict", - "ggplot2", - "lubridate", - "parsnip", - "tibble", - "tidyr", - "epidatr" - ), # packages that your targets need to run - imports = c("epieval", "parsnip"), - format = "qs", # Optionally set the default storage format. qs is fast. - controller = crew::crew_controller_local(workers = parallel::detectCores() - 5), -) # Run the R scripts in the R/ folder with your custom functions: # tar_source() -linreg <- parsnip::linear_reg() -quantreg <- epipredict::quantile_reg() - -grids <- list( - tidyr::expand_grid( - forecaster = "scaled_pop", - trainer = c("linreg", "quantreg"), - ahead = 1:4, - pop_scaling = c(TRUE, FALSE) - ), - tidyr::expand_grid( - forecaster = "scaled_pop", - trainer = c("linreg", "quantreg"), - ahead = 5:7, - lags = list(c(0, 3, 5, 7, 14), c(0, 7, 14)), - pop_scaling = c(TRUE, FALSE) +# where the forecasters and parameters are joined; see either the variable param_grid or `tar_read(forecasters)` +source("extras/targets-common.R") + +# Add custom parameter combinations in the list below. +make_unique_grids <- function() { + list() +} + +# TODO: Find a way to clean all this stuff about param grids up. +param_grid <- append( + make_shared_grids(), + make_unique_grids() +) %>% + map(add_id) %>% + bind_rows() %>% + relocate(parent_id, id, .after = last_col()) +forecaster_parent_id_map <- param_grid %>% + group_by(parent_id) %>% + summarize( + forecast_component_ids = list(syms(paste0(ONE_AHEAD_FORECAST_NAME, "_", gsub(" ", ".", id, fixed = TRUE)))), + score_component_ids = list(syms(paste0(ONE_AHEAD_SCORE_NAME, "_", gsub(" ", ".", id, fixed = TRUE)))) ) -) -# bind them together and give static ids; if you add a new field to a given -# expand_grid, everything will get a new id, so it's better to add a new -# expand_grid instead -param_grid <- bind_rows(map(grids, add_id)) %>% relocate(id, .after = last_col()) - -forecaster_param_grids <- make_target_param_grid(param_grid) %>% +targets_param_grid <- make_target_param_grid(param_grid) %>% ## TODO This forecaster is hanging. Filter it out for now. filter(id != "necessary endless 5") # not actually used downstream, this is for lookup during plotting and human evaluation -forecasters <- list( +forecaster_params_grid_target <- list( tar_target( - name = forecasters, + name = forecaster_params_grid, command = { param_grid } ) ) -response_signal <- "confirmed_admissions_influenza_1d_prop_7dav" -target_range <- epirange(from = "20211001", to = "20220401") -data <- list( - tar_target( - name = hhs_evaluation_data, - command = { - epidatr::pub_covidcast( - source = "hhs", - signals = response_signal, - geo_type = "state", - time_type = "day", - geo_values = "*", - time_values = epirange(from = "2020-01-01", to = "2024-01-01"), - ) %>% - rename( - actual = value, - target_end_date = time_value - ) - } - ), - tar_target( - name = hhs_archive_data_2022, - command = { - epidatr::pub_covidcast( - source = "hhs", - signals = response_signal, - geo_type = "state", - time_type = "day", - geo_values = "*", - time_values = target_range, - issues = "*", - fetch_params = fetch_params_list(return_empty = TRUE, timeout_seconds = 100) - ) - } - ), - tar_target( - name = chng_archive_data_2022, - command = { - epidatr::pub_covidcast( - source = "chng", - signals = "smoothed_adj_outpatient_flu", - geo_type = "state", - time_type = "day", - geo_values = "*", - time_values = target_range, - issues = "*", - fetch_params = fetch_params_list(return_empty = TRUE, timeout_seconds = 100) - ) - } - ), - tar_target( - name = joined_archive_data_2022, - command = { - hhs_archive_data_2022 %<>% - select(geo_value, time_value, value, issue) %>% - rename("hhs" := value) %>% - rename(version = issue) %>% - as_epi_archive( - geo_type = "state", - time_type = "day", - compactify = TRUE - ) - chng_archive_data_2022 %<>% - select(geo_value, time_value, value, issue) %>% - rename("chng" := value) %>% - rename(version = issue) %>% - as_epi_archive( - geo_type = "state", - time_type = "day", - compactify = TRUE - ) - epix_merge(hhs_archive_data_2022, chng_archive_data_2022, sync = "locf") - } - ), - tar_target( - name = hhs_latest_data_2022, - command = { - epidatr::pub_covidcast( - source = "hhs", - signals = "confirmed_admissions_covid_1d", - geo_type = "state", - time_type = "day", - geo_values = "*", - time_values = epirange(from = "20220101", to = "20220401"), - fetch_params = fetch_params_list(return_empty = TRUE, timeout_seconds = 100) - ) - } - ), - tar_target( - name = chng_latest_data_2022, - command = { - epidatr::pub_covidcast( - source = "chng", - signals = "smoothed_adj_outpatient_covid", - geo_type = "state", - time_type = "day", - geo_values = "*", - time_values = epirange(from = "20220101", to = "20220401"), - fetch_params = fetch_params_list(return_empty = TRUE, timeout_seconds = 100) - ) - } - ) -) +# These globals are needed by the function below (and they need to persist +# during the actual targets run, since the commands are frozen as expressions). +hhs_signal <- "confirmed_admissions_influenza_1d_prop_7dav" +chng_signal <- "smoothed_adj_outpatient_flu" +fetch_args <- epidatr::fetch_args_list(return_empty = TRUE, timeout_seconds = 300) +data_targets <- make_data_targets() -forecasts_and_scores <- tar_map( - values = forecaster_param_grids, - names = id, - unlist = FALSE, - tar_target( - name = forecast, - command = { - forecaster_pred( - data = joined_archive_data_2022, - outcome = "hhs", - extra_sources = "", - forecaster = forecaster, - n_training_pad = 30L, - forecaster_args = params, - forecaster_args_names = param_names - ) - } - ), - tar_target( - name = score, - command = { - run_evaluation_measure( - data = forecast, - evaluation_data = hhs_evaluation_data, - measure = list( - wis = weighted_interval_score, - ae = absolute_error, - cov_80 = interval_coverage(0.8) - ) - ) - } - ) -) +# These globals are needed by the function below (and they need to persist +# during the actual targets run, since the commands are frozen as expressions). +date_step <- 7L +forecasts_and_scores_by_ahead <- make_forecasts_and_scores_by_ahead() +forecasts_and_scores <- make_forecasts_and_scores() +ensemble_targets <- make_ensemble_targets() +external_names_and_scores <- make_external_names_and_scores() -ensemble_keys <- list(a = c(300, 15)) -ensembles <- list( - tar_target( - name = ensembles, - command = { - ensemble_keys - } - ) -) - -# The combine approach below is taken from the manual: -# https://books.ropensci.org/targets/static.html#combine -# The key is that the map above has unlist = FALSE. -ensemble_forecast <- tar_map( - values = ensemble_keys, - tar_combine( - name = ensemble_forecast, - # TODO: Needs a lookup table to select the right forecasters - list( - forecasts_and_scores[["forecast"]][[1]], - forecasts_and_scores[["forecast"]][[2]] - ), - command = { - bind_rows(!!!.x, .id = "forecaster") %>% - pivot_wider( - names_prefix = "forecaster", - names_from = forecaster, - values_from = value - ) %>% - mutate( - value = a + rowMeans(across(starts_with("forecaster"))) - ) %>% - select(-starts_with("forecaster")) - } - ), - tar_target( - name = ensemble_score, - command = { - run_evaluation_measure( - data = ensemble_forecast, - evaluation_data = hhs_evaluation_data, - measures = list( - wis = weighted_interval_score, - ae = absolute_error, - cov_80 = interval_coverage(0.8) - ) - ) - } - ) -) list( - data, - forecasters, + data_targets, + forecaster_params_grid_target, + forecasts_and_scores_by_ahead, forecasts_and_scores, - ensembles, - ensemble_forecast + ensemble_targets, + external_names_and_scores ) diff --git a/man/add_id.Rd b/man/add_id.Rd index 74f7e98c..9198580c 100644 --- a/man/add_id.Rd +++ b/man/add_id.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/small_utils.R +% Please edit documentation in R/utils.R \name{add_id} \alias{add_id} \title{add a unique id based on the column contents} diff --git a/man/clear_lastminute_nas.Rd b/man/clear_lastminute_nas.Rd new file mode 100644 index 00000000..6cd4ffa0 --- /dev/null +++ b/man/clear_lastminute_nas.Rd @@ -0,0 +1,14 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/utils.R +\name{clear_lastminute_nas} +\alias{clear_lastminute_nas} +\title{temporary patch that pulls \code{NA}'s out of an epi_df} +\usage{ +clear_lastminute_nas(epi_data) +} +\arguments{ +\item{epi_data}{the epi_df to be fixed} +} +\description{ +just delete rows that have NA's in them. eventually epipredict should directly handle this so we don't have to +} diff --git a/man/confirm_insufficient_data.Rd b/man/confirm_sufficient_data.Rd similarity index 76% rename from man/confirm_insufficient_data.Rd rename to man/confirm_sufficient_data.Rd index 673463c7..7b1d41a0 100644 --- a/man/confirm_insufficient_data.Rd +++ b/man/confirm_sufficient_data.Rd @@ -1,10 +1,10 @@ % Generated by roxygen2: do not edit by hand % Please edit documentation in R/forecaster.R -\name{confirm_insufficient_data} -\alias{confirm_insufficient_data} +\name{confirm_sufficient_data} +\alias{confirm_sufficient_data} \title{confirm that there's enough data to run this model} \usage{ -confirm_insufficient_data(epi_data, ahead, args_input, buffer = 9) +confirm_sufficient_data(epi_data, ahead, args_input, buffer = 9) } \arguments{ \item{epi_data}{the input data} @@ -21,6 +21,5 @@ rank deficient)} } \description{ epipredict is a little bit fragile about having enough data to train; we want -to be able to return a null result rather than error out; this check say to -return a null +to be able to return a null result rather than error out. } diff --git a/man/covidhub_probs.Rd b/man/covidhub_probs.Rd index 302be44d..ff88d62c 100644 --- a/man/covidhub_probs.Rd +++ b/man/covidhub_probs.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/small_utils.R +% Please edit documentation in R/utils.R \name{covidhub_probs} \alias{covidhub_probs} \title{the quantile levels used by the covidhub repository} diff --git a/man/forecaster_pred.Rd b/man/forecaster_pred.Rd index 2eb28158..ec574766 100644 --- a/man/forecaster_pred.Rd +++ b/man/forecaster_pred.Rd @@ -12,7 +12,8 @@ forecaster_pred( slide_training = 0, n_training_pad = 5, forecaster_args = list(), - forecaster_args_names = list() + forecaster_args_names = list(), + date_range_step_size = 1L ) } \arguments{ @@ -39,6 +40,9 @@ contain \code{ahead}} \item{forecaster_args_names}{a bit of a hack around targets, it contains the names of the \code{forecaster_args}.} + +\item{date_range_step_size}{the step size (in days) to use when generating +the forecast dates.} } \description{ a wrapper that turns a forecaster, parameters, data diff --git a/man/id_ahead_ensemble_grid.Rd b/man/id_ahead_ensemble_grid.Rd index 29c0e2ab..f08c70c2 100644 --- a/man/id_ahead_ensemble_grid.Rd +++ b/man/id_ahead_ensemble_grid.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/small_utils.R +% Please edit documentation in R/utils.R \name{id_ahead_ensemble_grid} \alias{id_ahead_ensemble_grid} \title{add aheads, forecaster_ids, and ids to a list of ensemble models} diff --git a/man/lookup_ids.Rd b/man/lookup_ids.Rd index 86316a3e..0fcf09ac 100644 --- a/man/lookup_ids.Rd +++ b/man/lookup_ids.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/small_utils.R +% Please edit documentation in R/utils.R \name{lookup_ids} \alias{lookup_ids} \title{given target name(s), lookup the corresponding parameters} diff --git a/man/make_data_targets.Rd b/man/make_data_targets.Rd new file mode 100644 index 00000000..e9f344af --- /dev/null +++ b/man/make_data_targets.Rd @@ -0,0 +1,16 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/targets_utils.R +\name{make_data_targets} +\alias{make_data_targets} +\title{Make common targets for fetching data} +\usage{ +make_data_targets() +} +\description{ +Relies on the following globals: +\itemize{ +\item \code{hhs_signal} +\item \code{chng_signal} +\item \code{fetch_args} +} +} diff --git a/man/make_ensemble_targets.Rd b/man/make_ensemble_targets.Rd new file mode 100644 index 00000000..c9d4b0ad --- /dev/null +++ b/man/make_ensemble_targets.Rd @@ -0,0 +1,11 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/targets_utils.R +\name{make_ensemble_targets} +\alias{make_ensemble_targets} +\title{Make ensemble targets} +\usage{ +make_ensemble_targets() +} +\description{ +Make ensemble targets +} diff --git a/man/make_external_names_and_scores.Rd b/man/make_external_names_and_scores.Rd new file mode 100644 index 00000000..94c9f705 --- /dev/null +++ b/man/make_external_names_and_scores.Rd @@ -0,0 +1,11 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/targets_utils.R +\name{make_external_names_and_scores} +\alias{make_external_names_and_scores} +\title{Make external names and scores targets} +\usage{ +make_external_names_and_scores() +} +\description{ +Make external names and scores targets +} diff --git a/man/make_forecasts_and_scores.Rd b/man/make_forecasts_and_scores.Rd new file mode 100644 index 00000000..46bf2cef --- /dev/null +++ b/man/make_forecasts_and_scores.Rd @@ -0,0 +1,11 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/targets_utils.R +\name{make_forecasts_and_scores} +\alias{make_forecasts_and_scores} +\title{Make forecasts and scores targets} +\usage{ +make_forecasts_and_scores() +} +\description{ +Make forecasts and scores targets +} diff --git a/man/make_forecasts_and_scores_by_ahead.Rd b/man/make_forecasts_and_scores_by_ahead.Rd new file mode 100644 index 00000000..127ca7c0 --- /dev/null +++ b/man/make_forecasts_and_scores_by_ahead.Rd @@ -0,0 +1,11 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/targets_utils.R +\name{make_forecasts_and_scores_by_ahead} +\alias{make_forecasts_and_scores_by_ahead} +\title{Make forecasts and scores by ahead targets} +\usage{ +make_forecasts_and_scores_by_ahead() +} +\description{ +Make forecasts and scores by ahead targets +} diff --git a/man/make_shared_grids.Rd b/man/make_shared_grids.Rd new file mode 100644 index 00000000..4c16869c --- /dev/null +++ b/man/make_shared_grids.Rd @@ -0,0 +1,11 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/targets_utils.R +\name{make_shared_grids} +\alias{make_shared_grids} +\title{Make common targets for forecasting experiments} +\usage{ +make_shared_grids() +} +\description{ +Make common targets for forecasting experiments +} diff --git a/man/single_id.Rd b/man/single_id.Rd index d78f7bab..c293414d 100644 --- a/man/single_id.Rd +++ b/man/single_id.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/small_utils.R +% Please edit documentation in R/utils.R \name{single_id} \alias{single_id} \title{generate an id from a simple list of parameters} diff --git a/run.R b/run.R index 11170d1c..996edc82 100644 --- a/run.R +++ b/run.R @@ -25,12 +25,13 @@ # # Save to disk # saveRDS(scorecards, "exploration-scorecards-2023-10-04.RDS") -readline_wrapper <- function(msg = "which project would you like to run? -1: covid_hosp_explore -2: flu_hosp_explore -3: covid_hosp_prod -4: flu_hosp_prod -input: ") { +print("Reading environment variables (TAR_PROJECT, EXTERNAL_SCORES_PATH, DEBUG_MODE, USE_SHINY)...") +tar_project <- Sys.getenv("TAR_PROJECT", "") +external_scores_path <- Sys.getenv("EXTERNAL_SCORES_PATH", "") +debug_mode <- Sys.getenv("DEBUG_MODE", "") +use_shiny <- Sys.getenv("USE_SHINY", "") + +readline_wrapper <- function(msg) { if (interactive()) { txt <- readline(msg) } else { @@ -39,52 +40,71 @@ input: ") { } return(txt) } -project_selection <- readline_wrapper() -external_scores_path <- readline_wrapper("path to RDS file containing external forecast scores, if desired:") +if (tar_project == "") { + project_selection <- readline_wrapper("Which project would you like to run? +1: covid_hosp_explore +2: flu_hosp_explore +3: covid_hosp_prod +4: flu_hosp_prod +Input: ") + tar_project <- switch(as.character(project_selection), + "1" = "covid_hosp_explore", + "2" = "flu_hosp_explore", + "3" = "covid_hosp_prod", + "4" = "flu_hosp_prod", + # else + stop("selection `", project_selection, "` is invalid") + ) +} else { + cat("Using project: ", tar_project, "\n") +} +Sys.setenv(TAR_PROJECT = tar_project) + + +if (external_scores_path == "") { + external_scores_path <- readline_wrapper("Path to RDS file containing external forecast scores, if desired:") +} else { + cat("Using external scores from ", external_scores_path, "\n") +} +Sys.setenv(EXTERNAL_SCORES_PATH = external_scores_path) + +if (debug_mode == "") { + debug_mode <- readline_wrapper("Would you like to run debug mode? (y/[N]): ") +} else { + cat("Debug mode: ", debug_mode, "\n") + if (as.logical(debug_mode)) { + debug_mode <- "y" + } +} + +if (use_shiny == "") { + use_shiny <- readline_wrapper("Would you like to run the shiny app? (y/[N]): ") +} else { + cat("Use shiny: ", use_shiny, "\n") + if (as.logical(use_shiny)) { + use_shiny <- "y" + } +} + suppressPackageStartupMessages({ library(targets) library(shiny) }) -TAR_PROJECT <- switch(as.character(project_selection), - "1" = "covid_hosp_explore", - "2" = "flu_hosp_explore", - "3" = "covid_hosp_prod", - "4" = "flu_hosp_prod", - # else - stop("selection `", project_selection, "` is invalid") -) -Sys.setenv(TAR_PROJECT = TAR_PROJECT) - # targets needs the output dir to already exist. store_dir <- tar_path_store() if (!dir.exists(store_dir)) dir.create(store_dir) -# Load external scores file, if provided -if (external_scores_path == "") { - LOAD_EXTERNAL_SCORES <- FALSE +tar_manifest() +if (debug_mode == "y") { + tar_make(callr_function = NULL) } else { - LOAD_EXTERNAL_SCORES <- TRUE + tar_make() } - -# Dynamically define the external files constants for access within the `targets` session. -# https://stackoverflow.com/questions/72096149/how-to-pass-values-into-targets-r-or-use-dynamic-variables -tar_helper( - here::here(file.path(store_dir, "dynamic_constants.R")), - { - LOAD_EXTERNAL_SCORES <- !!LOAD_EXTERNAL_SCORES - external_scores_path <- !!external_scores_path - } -) - -tar_manifest() -tar_make() # tar_make_clustermq(workers = 2) # nolint # tar_make_future(workers = 2) # nolint - -use_shiny <- readline_wrapper("Would you like to run the shiny app? (y/[N]): ") if (use_shiny == "y") { # Prevent functions defined in /R dir from being loaded unnecessarily options(shiny.autoload.r = FALSE) diff --git a/tests/testthat/test-forecasters-basics.R b/tests/testthat/test-forecasters-basics.R index d13ceb2d..1be626fe 100644 --- a/tests/testthat/test-forecasters-basics.R +++ b/tests/testthat/test-forecasters-basics.R @@ -5,7 +5,7 @@ forecasters <- list( c("flatline_fc", flatline_fc) ) for (forecaster in forecasters) { - test_that(forecaster[[1]], { + test_that(paste(forecaster[[1]], "gets the date and columns right"), { jhu <- epipredict::case_death_rate_subset %>% dplyr::filter(time_value >= as.Date("2021-12-01")) # the as_of for this is wildly far in the future @@ -19,14 +19,61 @@ for (forecaster in forecasters) { res$target_end_date == as.Date("2022-01-01") )) - # any forecaster specific tests - if (forecaster[[1]] == "scaled_pop") { + }) + + test_that(paste(forecaster[[1]], "deals with no as_of"), { + jhu <- epipredict::case_death_rate_subset %>% + dplyr::filter(time_value >= as.Date("2021-12-01")) + # what if we have no as_of date? assume they mean the last available data + attributes(jhu)$metadata$as_of <- NULL + expect_no_error(res <- forecaster[[2]](jhu, "case_rate", c("death_rate"), 2L)) + expect_equal(res$target_end_date %>% unique(), max(jhu$time_value) + 2) + }) + + test_that(paste(forecaster[[1]], "handles last second NA's"), { + # if the last entries are NA, we should still predict + # TODO: currently this checks that we DON'T predict + jhu <- epipredict::case_death_rate_subset %>% + dplyr::filter(time_value >= as.Date("2021-12-01")) + geo_values <- jhu$geo_value %>% unique() + one_day_nas <- tibble( + geo_value = geo_values, + time_value = as.Date("2022-01-01"), + case_rate = NA, + death_rate = runif(length(geo_values)) + ) + second_day_nas <- one_day_nas %>% + mutate(time_value = as.Date("2022-01-02")) + jhu_nad <- jhu %>% + as_tibble() %>% + bind_rows(one_day_nas, second_day_nas) %>% + epiprocess::as_epi_df() + attributes(jhu_nad)$metadata$as_of <- max(jhu_nad$time_value) + 3 + expect_no_error(nas_forecast <- forecaster[[2]](jhu_nad, "case_rate", c("death_rate"))) + # TODO: this shouldn't actually be null, it should be a bit further delayed + # predicting from 3 days later + expect_equal(nas_forecast$forecast_date %>% unique(), as.Date("2022-01-05")) + # predicting 1 day into the future + expect_equal(nas_forecast$target_end_date %>% unique(), as.Date("2022-01-06")) + # every state and quantile has a prediction + expect_equal(nrow(nas_forecast), length(covidhub_probs()) * length(jhu$geo_value %>% unique())) + }) + + ################################# + # any forecaster specific tests + if (forecaster[[1]] == "scaled_pop") { + test_that(paste(forecaster[[1]], "scaled and unscaled don't make the same predictions"), { + jhu <- epipredict::case_death_rate_subset %>% + dplyr::filter(time_value >= as.Date("2021-12-01")) + # the as_of for this is wildly far in the future + attributes(jhu)$metadata$as_of <- max(jhu$time_value) + 3 + res <- forecaster[[2]](jhu, "case_rate", c("death_rate"), -2L) # confirm scaling produces different results res_unscaled <- forecaster[[2]](jhu, "case_rate", c("death_rate"), -2L, - pop_scaling = FALSE + pop_scaling = FALSE, ) expect_false(res_unscaled %>% full_join(res, @@ -35,10 +82,19 @@ for (forecaster in forecasters) { ) %>% mutate(equal = value.unscaled == value.scaled) %>% summarize(all(equal)) %>% pull(`all(equal)`)) - } - # TODO confirming that it produces exactly the same result as arx_forecaster - # test case where extra_sources is "empty" - # test case where the epi_df is empty + }) + } + # TODO confirming that it produces exactly the same result as arx_forecaster + # test case where extra_sources is "empty" + # test case where the epi_df is empty + test_that(paste(forecaster[[1]], "empty epi_df predicts nothing"), { + jhu <- epipredict::case_death_rate_subset %>% + dplyr::filter(time_value >= as.Date("2021-12-01")) + # the as_of for this is wildly far in the future + attributes(jhu)$metadata$as_of <- max(jhu$time_value) + 3 + res <- forecaster[[2]](jhu, "case_rate", c("death_rate"), -2L) + # the as_of for this is wildly far in the future + attributes(jhu)$metadata$as_of <- max(jhu$time_value) + 3 null_jhu <- jhu %>% filter(time_value < as.Date("0009-01-01")) expect_no_error(null_res <- forecaster[[2]](null_jhu, "case_rate", c("death_rate"))) expect_identical(names(null_res), names(res)) diff --git a/tests/testthat/test-target-utils.R b/tests/testthat/test-target-utils.R index 6bf216d4..ea807bee 100644 --- a/tests/testthat/test-target-utils.R +++ b/tests/testthat/test-target-utils.R @@ -1,14 +1,14 @@ test_that("target param generation works", { ex_frame <- tribble( - ~forecaster, ~id, ~someNAs, ~someNULLs, - "scaled_pop", "linreg", TRUE, 35, - "scaled_pop", "linreg", NA, 35, - "scaled_pop", "linreg", FALSE, NULL, + ~forecaster, ~id, ~someNAs, ~someNULLs, ~someList, ~someYoDog, + "scaled_pop", "linreg", TRUE, 35, c(1, 3, 5), list(c(1, 2), c(3, 4)), + "scaled_pop", "linreg", NA, 35, c(1, 3, 5), list(c(1, 2), c(3, 4)), + "scaled_pop", "linreg", FALSE, NULL, c(1, 3, 5), list(c(1, 2), c(3, 4)) ) list_version <- list( - list(someNAs = TRUE, someNULLs = 35), - list(someNULLs = 35), - list(someNAs = FALSE) + list(someNAs = TRUE, someNULLs = 35, someList = c(1, 3, 5), someYoDog = list(c(1, 2), c(3, 4))), + list(someNULLs = 35, someList = c(1, 3, 5), someYoDog = list(c(1, 2), c(3, 4))), + list(someNAs = FALSE, someList = c(1, 3, 5), someYoDog = list(c(1, 2), c(3, 4))) ) expect_equal(lists_of_real_values(ex_frame), list_version) })