diff --git a/R/archive.R b/R/archive.R index e81315a7..79d08ddf 100644 --- a/R/archive.R +++ b/R/archive.R @@ -286,23 +286,24 @@ epi_archive = #' details. #' @importFrom data.table key #' @importFrom rlang !! !!! enquo enquos is_quosure sym syms - slide = function(f, ..., n = 7, group_by, ref_time_values, + slide = function(f, ..., max_version_gap, group_by, + ref_versions, time_step, new_col_name = "slide_value", as_list_col = FALSE, names_sep = "_", all_rows = FALSE) { # If missing, then set ref time values to be everything; else make # sure we intersect with observed time values - if (missing(ref_time_values)) { - ref_time_values = unique(self$DT$time_value) + if (missing(ref_versions)) { + ref_versions = unique(self$DT$version) } else { - ref_time_values = ref_time_values[ref_time_values %in% + ref_versions = ref_versions[ref_versions %in% unique(self$DT$time_value)] } # If a custom time step is specified, then redefine units - before_num = n-1 - if (!missing(time_step)) before_num = time_step(n-1) + before_num = max_version_gap-1 + if (!missing(time_step)) before_num = time_step(max_version_gap-1) # What to group by? If missing, set according to internal keys if (missing(group_by)) { @@ -324,7 +325,7 @@ epi_archive = # Computation for one group, one time value comp_one_grp = function(.data_group, f, ..., - time_value, + version, key_vars, new_col) { # Carry out the specified computation @@ -370,21 +371,22 @@ epi_archive = # Note that we've already recycled comp value to make size stable, # so tibble() will just recycle time value appropriately - return(tibble::tibble(time_value = time_value, + return(tibble::tibble(version = version, !!new_col := comp_value)) } # If f is not missing, then just go ahead, slide by group if (!missing(f)) { + if (rlang::is_formula(f)) f = rlang::as_function(f) - x = purrr::map_dfr(ref_time_values, function(t) { + x = purrr::map_dfr(ref_versions, function(t) { self$as_of(t, min_time_value = t - before_num) %>% tibble::as_tibble() %>% dplyr::group_by(!!!group_by) %>% dplyr::group_modify(comp_one_grp, f = f, ..., - time_value = t, + version = t, key_vars = key_vars, new_col = new_col, .keep = TRUE) %>% @@ -406,13 +408,13 @@ epi_archive = f = function(x, quo, ...) rlang::eval_tidy(quo, x) new_col = sym(names(rlang::quos_auto_name(quos))) - x = purrr::map_dfr(ref_time_values, function(t) { + x = purrr::map_dfr(ref_versions, function(t) { self$as_of(t, min_time_value = t - before_num) %>% tibble::as_tibble() %>% dplyr::group_by(!!!group_by) %>% dplyr::group_modify(comp_one_grp, f = f, quo = quo, - time_value = t, + version = t, key_vars = key_vars, new_col = new_col, .keep = TRUE) %>% diff --git a/R/methods-epi_archive.R b/R/methods-epi_archive.R index 4a1cd375..3a5af3e0 100644 --- a/R/methods-epi_archive.R +++ b/R/methods-epi_archive.R @@ -112,15 +112,17 @@ epix_merge = function(x, y, ..., locf = TRUE, nan = NA) { #' @param ... Additional arguments to pass to the function or formula specified #' via `f`. Alternatively, if `f` is missing, then the current argument is #' interpreted as an expression for tidy evaluation. -#' @param n Number of time steps to use in the running window. For example, if -#' `n = 7`, and one time step is one day, then to produce a value on January 7 +#' @param max_version_gap Number of time steps to use in the running window. +#' For example, if +#' `max_version_gap = 7`, and one time step is one day, then to produce a +#' value on January 7 #' we apply the given function or formula to data in between January 1 and #' 7. Default is 7. #' @param group_by The variable(s) to group by before slide computation. If #' missing, then the keys in the underlying data table, excluding `time_value` #' and `version`, will be used for grouping. To omit a grouping entirely, use #' `group_by = NULL`. -#' @param ref_time_values Time values for sliding computations, meaning, each +#' @param ref_versions Time values for sliding computations, meaning, each #' element of this vector serves as the reference time point for one sliding #' window. If missing, then this will be set to all unique time values in the #' underlying data table, by default. @@ -176,11 +178,11 @@ epix_merge = function(x, y, ..., locf = TRUE, nan = NA) { #' Finally, this is simply a wrapper around the `slide()` method of the #' `epi_archive` class, so if `x` is an `epi_archive` object, then: #' ``` -#' epix_slide(x, new_var = comp(old_var), n = 120) +#' epix_slide(x, new_var = comp(old_var), max_version_gap = 120) #' ``` #' is equivalent to: #' ``` -#' x$slide(x, new_var = comp(old_var), n = 120) +#' x$slide(new_var = comp(old_var), max_version_gap = 120) #' ``` #' #' @importFrom rlang enquo @@ -191,24 +193,25 @@ epix_merge = function(x, y, ..., locf = TRUE, nan = NA) { #' # 0 day which has no results, for 2020-06-01 #' # 1 day, for 2020-06-02 #' # 2 days, for the rest of the results -#' # never 3 days dur to data latency +#' # never 3 days due to data latency #' -#' time_values <- seq(as.Date("2020-06-01"), +#' versions <- seq(as.Date("2020-06-01"), #' as.Date("2020-06-15"), #' by = "1 day") #' epix_slide(x = archive_cases_dv_subset, #' f = ~ mean(.x$case_rate_7d_av), #' n = 3, #' group_by = geo_value, -#' ref_time_values = time_values, +#' ref_versions = versions, #' new_col_name = 'case_rate_3d_av') -epix_slide = function(x, f, ..., n = 7, group_by, ref_time_values, +epix_slide = function(x, f, ..., max_version_gap, group_by, ref_versions, time_step, new_col_name = "slide_value", as_list_col = FALSE, names_sep = "_", all_rows = FALSE) { if (!inherits(x, "epi_archive")) Abort("`x` must be of class `epi_archive`.") - return(x$slide(f, ..., n = n, + return(x$slide(f, ..., + max_version_gap = max_version_gap, group_by = enquo(group_by), - ref_time_values = ref_time_values, + ref_versions = ref_versions, time_step = time_step, new_col_name = new_col_name, as_list_col = as_list_col, diff --git a/man/epi_archive.Rd b/man/epi_archive.Rd index 84ac9406..8b0cfb2a 100644 --- a/man/epi_archive.Rd +++ b/man/epi_archive.Rd @@ -193,9 +193,9 @@ details. \if{html}{\out{
}}\preformatted{epi_archive$slide( f, ..., - n = 7, + max_version_gap, group_by, - ref_time_values, + ref_versions, time_step, new_col_name = "slide_value", as_list_col = FALSE, diff --git a/man/epix_slide.Rd b/man/epix_slide.Rd index 19da4f06..348c31a8 100644 --- a/man/epix_slide.Rd +++ b/man/epix_slide.Rd @@ -8,9 +8,9 @@ epix_slide( x, f, ..., - n = 7, + max_version_gap, group_by, - ref_time_values, + ref_versions, time_step, new_col_name = "slide_value", as_list_col = FALSE, @@ -34,8 +34,10 @@ sliding window of \code{n} time steps.} via \code{f}. Alternatively, if \code{f} is missing, then the current argument is interpreted as an expression for tidy evaluation.} -\item{n}{Number of time steps to use in the running window. For example, if -\code{n = 7}, and one time step is one day, then to produce a value on January 7 +\item{max_version_gap}{Number of time steps to use in the running window. +For example, if +\code{max_version_gap = 7}, and one time step is one day, then to produce a +value on January 7 we apply the given function or formula to data in between January 1 and 7. Default is 7.} @@ -44,7 +46,7 @@ missing, then the keys in the underlying data table, excluding \code{time_value} and \code{version}, will be used for grouping. To omit a grouping entirely, use \code{group_by = NULL}.} -\item{ref_time_values}{Time values for sliding computations, meaning, each +\item{ref_versions}{Time values for sliding computations, meaning, each element of this vector serves as the reference time point for one sliding window. If missing, then this will be set to all unique time values in the underlying data table, by default.} @@ -117,12 +119,12 @@ version-aware sliding is necessary (as it its purpose). Finally, this is simply a wrapper around the \code{slide()} method of the \code{epi_archive} class, so if \code{x} is an \code{epi_archive} object, then: -\if{html}{\out{
}}\preformatted{epix_slide(x, new_var = comp(old_var), n = 120) +\if{html}{\out{
}}\preformatted{epix_slide(x, new_var = comp(old_var), max_version_gap = 120) }\if{html}{\out{
}} is equivalent to: -\if{html}{\out{
}}\preformatted{x$slide(x, new_var = comp(old_var), n = 120) +\if{html}{\out{
}}\preformatted{x$slide(new_var = comp(old_var), max_version_gap = 120) }\if{html}{\out{
}} } \examples{ @@ -131,15 +133,15 @@ is equivalent to: # 0 day which has no results, for 2020-06-01 # 1 day, for 2020-06-02 # 2 days, for the rest of the results -# never 3 days dur to data latency +# never 3 days due to data latency -time_values <- seq(as.Date("2020-06-01"), +versions <- seq(as.Date("2020-06-01"), as.Date("2020-06-15"), by = "1 day") epix_slide(x = archive_cases_dv_subset, f = ~ mean(.x$case_rate_7d_av), n = 3, group_by = geo_value, - ref_time_values = time_values, + ref_versions = versions, new_col_name = 'case_rate_3d_av') } diff --git a/tests/testthat/test-epix_slide.R b/tests/testthat/test-epix_slide.R new file mode 100644 index 00000000..bef6c32d --- /dev/null +++ b/tests/testthat/test-epix_slide.R @@ -0,0 +1,38 @@ +library(dplyr) + +ea <- archive_cases_dv_subset$clone() + +test_that("epix_slide only works on an epi_archive",{ + expect_error(epix_slide(data.frame(x=1))) +}) + +test_that("epix_slide works as intended",{ + x2 <- ea$clone()$DT %>% + filter(geo_value == "ca", version <= as.Date("2020-06-09")) %>% + select(-percent_cli,-case_rate_7d_av) %>% + mutate(binary = 2^(row_number())) %>% + as_epi_archive() + + versions <- seq(as.Date("2020-06-01"), + as.Date("2020-06-09"), + by = "1 day") + + xx1 <- epix_slide(x = x2, + f = ~ sum(.x$binary), + max_version_gap = 5, + group_by = geo_value, + ref_versions = versions, + new_col_name = 'sum_binary') + + xx2 <- tibble(geo_value = rep("ca",7), + version = as.Date("2020-06-01") + 1:7, + sum_binary = c(2^1, + 2^6+2^1, + 2^11+2^6+2^1, + 2^16+2^11+2^6+2^1, + 2^19+2^16+2^12+2^7, + 2^21+2^19+2^16+2^13, + 2^22+2^21+2^19+2^17)) + + expect_identical(xx1,xx2) +}) diff --git a/vignettes/advanced.Rmd b/vignettes/advanced.Rmd index a76c5225..9074413f 100644 --- a/vignettes/advanced.Rmd +++ b/vignettes/advanced.Rmd @@ -15,7 +15,7 @@ ensure the result of a slide operation is *size stable*, meaning, it will return something whose length is the same as the number of appearances of reference time values for the slide computation in the given data frame/table (this defaults to all time values, but can be some given subset when `ref_time_values` -is specified). +or `ref_versions` is specified, respectively). The output of a slide computation should either be an atomic value/vector, or a data frame. This data frame can have multiple columns, multiple rows, or both. @@ -61,12 +61,12 @@ df %>% df %>% mutate(version = time_value) %>% as_epi_archive() %>% - epix_slide(x_2dav = mean(x), n = 2, ref_time_values = as.Date("2020-06-02")) + epix_slide(x_2dav = mean(x), max_version_gap = 2, ref_versions = as.Date("2020-06-02")) df %>% mutate(version = time_value) %>% as_epi_archive() %>% - epix_slide(~ mean(.x$x), n = 2, ref_time_values = as.Date("2020-06-02")) + epix_slide(~ mean(.x$x), max_version_gap = 2, ref_versions = as.Date("2020-06-02")) ``` When the slide computation returns an atomic vector (rather than a single value) @@ -152,8 +152,8 @@ df %>% mutate(version = time_value) %>% as_epi_archive() %>% epix_slide(a = data.frame(x_2dav = mean(x), x_2dma = mad(x)), - ref_time_values = as.Date("2020-06-02"), - n = 2, as_list_col = FALSE, names_sep = NULL) + ref_versions = as.Date("2020-06-02"), + max_version_gap = 2, as_list_col = FALSE, names_sep = NULL) ``` ## Multi-row outputs @@ -344,7 +344,7 @@ data. ```{r, message = FALSE, warning = FALSE, fig.width = 9, fig.height = 6} # Latest snapshot of data, and forecast dates x_latest <- epix_as_of(x, max_version = max(x$DT$version)) -fc_time_values <- seq(as.Date("2020-08-01"), +fc_versions <- seq(as.Date("2020-08-01"), as.Date("2021-12-01"), by = "1 month") @@ -354,15 +354,15 @@ k_week_ahead <- function(x, ahead = 7, as_of = TRUE) { x %>% epix_slide(fc = prob_arx(percent_cli, case_rate_7d_av, geo_value, time_value, args = prob_arx_args(ahead = ahead)), - n = 120, ref_time_values = fc_time_values) %>% - mutate(target_date = time_value + ahead, as_of = TRUE, + max_version_gap = 120, ref_versions = fc_versions) %>% + mutate(target_date = version + ahead, as_of = TRUE, geo_value = fc_geo_value) } else { x_latest %>% epi_slide(fc = prob_arx(percent_cli, case_rate_7d_av, geo_value, time_value, args = prob_arx_args(ahead = ahead)), - n = 120, ref_time_values = fc_time_values) %>% + n = 120, ref_time_values = fc_versions) %>% mutate(target_date = time_value + ahead, as_of = FALSE) } } diff --git a/vignettes/archive.Rmd b/vignettes/archive.Rmd index e729b32d..e82ef267 100644 --- a/vignettes/archive.Rmd +++ b/vignettes/archive.Rmd @@ -255,7 +255,7 @@ head(x$DT) ``` ```{r, echo=FALSE, message=FALSE, warning=FALSE} -x <- archive_cases_dv_subset +x <- as_epi_archive(archive_cases_dv_subset$DT) print(x) head(x$DT) ``` @@ -285,52 +285,88 @@ slide vignette to accomodate exogenous variables in the autoregressive model, which is often referred to as an ARX model. ```{r} -prob_arx <- function(x, y, lags = c(0, 7, 14), ahead = 7, min_train_window = 20, - lower_level = 0.05, upper_level = 0.95, symmetrize = TRUE, - intercept = FALSE, nonneg = TRUE) { +library(tidyr) +library(purrr) + +prob_arx_args <- function(lags = c(0, 7, 14), + ahead = 7, + min_train_window = 20, + lower_level = 0.05, + upper_level = 0.95, + symmetrize = TRUE, + intercept = FALSE, + nonneg = TRUE) { + return(list(lags = lags, + ahead = ahead, + min_train_window = min_train_window, + lower_level = lower_level, + upper_level = upper_level, + symmetrize = symmetrize, + intercept = intercept, + nonneg = nonneg)) +} + +prob_arx <- function(x, y, geo_value, time_value, args = prob_arx_args()) { # Return NA if insufficient training data - if (length(y) < min_train_window + max(lags) + ahead) { - return(data.frame(point = NA, lower = NA, upper = NA)) + if (length(y) < args$min_train_window + max(args$lags) + args$ahead) { + return(data.frame(geo_value = unique(geo_value), # Return geo value! + point = NA, lower = NA, upper = NA)) } - # Useful transformations + # Set up x, y, lags list if (!missing(x)) x <- data.frame(x, y) else x <- data.frame(y) - if (!is.list(lags)) lags <- list(lags) - lags = rep(lags, length.out = ncol(x)) + if (!is.list(args$lags)) args$lags <- list(args$lags) + args$lags = rep(args$lags, length.out = ncol(x)) # Build features and response for the AR model, and then fit it - dat <- do.call( - data.frame, - unlist( # Below we loop through and build the lagged features - purrr::map(1:ncol(x), function(i) { - purrr::map(lags[[i]], function(j) lag(x[,i], n = j)) - }), - recursive = FALSE)) - names(dat) = paste0("x", 1:ncol(dat)) - if (intercept) dat$x0 = rep(1, nrow(dat)) - dat$y <- lead(y, n = ahead) - obj <- lm(y ~ . + 0, data = dat) + dat <- + tibble(i = 1:ncol(x), lag = args$lags) %>% + unnest(lag) %>% + mutate(name = paste0("x", 1:nrow(.))) %>% + # One list element for each lagged feature + pmap(function(i, lag, name) { + tibble(geo_value = geo_value, + time_value = time_value + lag, # Shift back + !!name := x[,i]) + }) %>% + # One list element for the response vector + c(list( + tibble(geo_value = geo_value, + time_value = time_value - args$ahead, # Shift forward + y = y))) %>% + # Combine them together into one data frame + reduce(full_join, by = c("geo_value", "time_value")) %>% + arrange(time_value) + if (args$intercept) dat$x0 = rep(1, nrow(dat)) + obj <- lm(y ~ . + 0, data = select(dat, -geo_value, -time_value)) + + # Use LOCF to fill NAs in the latest feature values (do this by geo value) + setDT(dat) # Convert to a data.table object by reference + cols <- setdiff(names(dat), c("geo_value", "time_value")) + dat[, (cols) := nafill(.SD, type = "locf"), .SDcols = cols, by = "geo_value"] - # Use LOCF to fill NAs in the latest feature values, make a prediction - setDT(dat) - setnafill(dat, type = "locf") - point <- predict(obj, newdata = tail(dat, 1)) + # Make predictions + test_time_value = max(time_value) + point <- predict(obj, newdata = dat %>% + dplyr::group_by(geo_value) %>% + dplyr::filter(time_value == test_time_value)) - # Compute a band + # Compute bands r <- residuals(obj) - s <- ifelse(symmetrize, -1, NA) # Should the residuals be symmetrized? - q <- quantile(c(r, s * r), probs = c(lower_level, upper_level), na.rm = TRUE) + s <- ifelse(args$symmetrize, -1, NA) # Should the residuals be symmetrized? + q <- quantile(c(r, s * r), probs = c(args$lower, args$upper), na.rm = TRUE) lower <- point + q[1] upper <- point + q[2] # Clip at zero if we need to, then return - if (nonneg) { - point = max(point, 0) - lower = max(lower, 0) - upper = max(upper, 0) + if (args$nonneg) { + point = pmax(point, 0) + lower = pmax(lower, 0) + upper = pmax(upper, 0) } - return(data.frame(point = point, lower = lower, upper = upper)) + return(data.frame(geo_value = unique(geo_value), # Return geo value! + point = point, lower = lower, upper = upper)) } ``` @@ -338,12 +374,14 @@ Next we slide this forecaster over the working `epi_archive` object, in order to forecast COVID-19 case rates 7 days into the future. ```{r} -fc_time_values <- seq(as.Date("2020-08-01"), +fc_versions <- seq(as.Date("2020-08-01"), as.Date("2021-12-01"), by = "1 month") -z <- epix_slide(x, fc = prob_arx(x = percent_cli, y = case_rate_7d_av), n = 120, - ref_time_values = fc_time_values, group_by = geo_value) +z <- epix_slide(x, fc = prob_arx(percent_cli, case_rate_7d_av, geo_value, time_value, + args = prob_arx_args(ahead = 7)), + max_version_gap = 120, + ref_versions = fc_versions, group_by = geo_value) head(z, 10) ``` @@ -372,19 +410,22 @@ x_latest <- epix_as_of(x, max_version = max(x$DT$version)) k_week_ahead <- function(x, ahead = 7, as_of = TRUE) { if (as_of) { x %>% - epix_slide(fc = prob_arx(percent_cli, case_rate_7d_av, ahead = ahead), n = 120, - ref_time_values = fc_time_values, group_by = geo_value) %>% - mutate(target_date = time_value + ahead, as_of = TRUE) + epix_slide(fc = prob_arx(percent_cli, case_rate_7d_av, geo_value, time_value, + args = prob_arx_args(ahead = ahead)), + max_version_gap = 120, ref_versions = fc_versions) %>% + mutate(target_date = version + ahead, as_of = TRUE, + geo_value = fc_geo_value) } else { x_latest %>% - group_by(geo_value) %>% - epi_slide(fc = prob_arx(percent_cli, case_rate_7d_av, ahead = ahead), n = 120, - ref_time_values = fc_time_values) %>% + epi_slide(fc = prob_arx(percent_cli, case_rate_7d_av, geo_value, time_value, + args = prob_arx_args(ahead = ahead)), + n = 120, ref_time_values = fc_versions) %>% mutate(target_date = time_value + ahead, as_of = FALSE) } } + # Generate the forecasts, and bind them together fc <- bind_rows(k_week_ahead(x, ahead = 7, as_of = TRUE), k_week_ahead(x, ahead = 14, as_of = TRUE),