diff --git a/DESCRIPTION b/DESCRIPTION index 18a3bdfb..6f5c3c81 100755 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -46,7 +46,9 @@ Suggests: knitr, outbreaks, rmarkdown, - testthat (>= 3.0.0) + testthat (>= 3.0.0), + waldo (>= 0.3.1), + withr VignetteBuilder: knitr Remotes: diff --git a/NAMESPACE b/NAMESPACE index c6bd7d31..a290ab27 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -11,6 +11,8 @@ S3method(filter,epi_df) S3method(group_by,epi_df) S3method(group_modify,epi_df) S3method(mutate,epi_df) +S3method(next_after,Date) +S3method(next_after,integer) S3method(print,epi_df) S3method(relocate,epi_df) S3method(rename,epi_df) @@ -19,6 +21,7 @@ S3method(summary,epi_df) S3method(ungroup,epi_df) S3method(unnest,epi_df) export("%>%") +export(archive_cases_dv_subset) export(arrange) export(as_epi_archive) export(as_epi_df) @@ -38,19 +41,23 @@ export(group_modify) export(growth_rate) export(is_epi_archive) export(is_epi_df) +export(max_version_with_row_in) export(mutate) export(new_epi_df) +export(next_after) export(relocate) export(rename) export(slice) export(ungroup) export(unnest) importFrom(R6,R6Class) +importFrom(data.table,":=") +importFrom(data.table,address) importFrom(data.table,as.data.table) importFrom(data.table,between) +importFrom(data.table,copy) importFrom(data.table,key) -importFrom(data.table,merge.data.table) -importFrom(data.table,nafill) +importFrom(data.table,set) importFrom(data.table,setkeyv) importFrom(dplyr,arrange) importFrom(dplyr,filter) @@ -69,9 +76,11 @@ importFrom(rlang,"!!!") importFrom(rlang,"!!") importFrom(rlang,.data) importFrom(rlang,.env) +importFrom(rlang,arg_match) importFrom(rlang,enquo) importFrom(rlang,enquos) importFrom(rlang,is_quosure) +importFrom(rlang,quo_is_missing) importFrom(rlang,sym) importFrom(rlang,syms) importFrom(stats,cor) diff --git a/R/archive.R b/R/archive.R index d1f2b90f..bf5a38a9 100644 --- a/R/archive.R +++ b/R/archive.R @@ -6,6 +6,100 @@ # `data.table::` everywhere and not importing things. .datatable.aware = TRUE +#' Validate a version bound arg +#' +#' Expected to be used on `clobberable_versions_start`, `versions_end`, +#' and similar arguments. Some additional context-specific checks may be needed. +#' +#' @param version_bound the version bound to validate +#' @param x a data frame containing a version column with which to check +#' compatibility +#' @param na_ok Boolean; is `NULL` an acceptable "bound"? (If so, `NULL` will +#' have a special context-dependent meaning.) +#' @param version_bound_arg optional string; what to call the version bound in +#' error messages +#' +#' @section Side effects: raises an error if version bound appears invalid +#' +#' @noRd +validate_version_bound = function(version_bound, x, na_ok, + version_bound_arg = rlang::caller_arg(version_bound), + x_arg = rlang::caller_arg(version_bound)) { + # We might want some (optional?) validation here to detect internal bugs. + if (length(version_bound) != 1L) { + # Check for length-1-ness fairly early so we don't have to worry as much + # about our `if`s receiving non-length-1 "Boolean"s. + Abort(sprintf("`version_bound` must have length 1, but instead was length %d", + length(version_bound)), + class=sprintf("epiprocess__%s_is_not_length_1", version_bound_arg)) + } else if (is.na(version_bound)) { + # Check for NA before class&type, as any-class&type NA should be fine for + # our purposes, and some version classes&types might not have their own NA + # value to pass in. + if (na_ok) { + # Looks like a valid version bound; exit without error. + return(invisible(NULL)) + } else { + Abort(sprintf( + '`%s` must not satisfy `is.na` (NAs are not allowed for this kind of version bound)', + version_bound_arg + ), class=sprintf("epiprocess__%s_is_na", version_bound_arg)) + } + } else if (!identical(class(version_bound), class(x[["version"]])) || + !identical(typeof(version_bound), typeof(x[["version"]]))) { + Abort(sprintf( + '`class(%1$s)` must be identical to `class(%2$s)` and `typeof(%1$s)` must be identical to `typeof(%2$s)`', + version_bound_arg, + # '{x_arg}[["version"]]' except adding parentheses if needed: + rlang::expr_deparse(rlang::new_call( + quote(`[[`), rlang::pairlist2(rlang::parse_expr(x_arg), "version") + )) + ), class=sprintf("epiprocess__%s_has_invalid_class_or_typeof", version_bound_arg)) + } else { + # Looks like a valid version bound; exit without error. + return(invisible(NULL)) + } +} + +#' Default arg helper: `max(x$version)`, with error if `x` has 0 rows +#' +#' Exported to make defaults more easily copyable. +#' +#' @param x `x` argument of [`as_epi_archive`] +#' +#' @return `max(x$version)` if it has any rows; raises error if it has 0 rows or +#' an `NA` version value +#' +#' @export +max_version_with_row_in = function(x) { + if (nrow(x) == 0L) { + Abort(sprintf("`nrow(x)==0L`, representing a data set history with no row up through the latest observed version, but we don't have a sensible guess at what version that is, or whether any of the empty versions might be clobbered in the future; if we use `x` to form an `epi_archive`, then `clobberable_versions_start` and `versions_end` must be manually specified."), + class="epiprocess__max_version_cannot_be_used") + } else { + version_col = purrr::pluck(x, "version") # error not NULL if doesn't exist + if (anyNA(version_col)) { + Abort("version values cannot be NA", + class="epiprocess__version_values_must_not_be_na") + } else { + version_bound <- max(version_col) + } + } +} + +#' Get the next possible value greater than `x` of the same type +#' +#' @param x the starting "value"(s) +#' @return same class, typeof, and length as `x` +#' +#' @export +next_after = function(x) UseMethod("next_after") + +#' @export +next_after.integer = function(x) x + 1L + +#' @export +next_after.Date = function(x) x + 1L + #' @title `epi_archive` object #' #' @description An `epi_archive` is an R6 class which contains a data table @@ -34,17 +128,31 @@ #' key variables, and thus the key variables are critical for figuring out how #' to generate a snapshot of data from the archive, as of a given version. #' -#' In general, last observation carried forward (LOCF) is used to data in -#' between recorded versions. Currently, deletions must be represented as -#' revising a row to a special state (e.g., making the entries `NA` or -#' including a special column that flags the data as removed and performing -#' some kind of post-processing), and the archive is unaware of what this -#' state is. +#' In general, the last version of each observation is carried forward (LOCF) to +#' fill in data between recorded versions, and between the last recorded +#' update and the `versions_end`. One consequence is that the `DT` +#' doesn't have to contain a full snapshot of every version (although this +#' generally works), but can instead contain only the rows that are new or +#' changed from the previous version (see `compactify`, which does this +#' automatically). Currently, deletions must be represented as revising a row +#' to a special state (e.g., making the entries `NA` or including a special +#' column that flags the data as removed and performing some kind of +#' post-processing), and the archive is unaware of what this state is. Note +#' that `NA`s *can* be introduced by `epi_archive` methods for other reasons, +#' e.g., in [`epix_fill_through_version`] and [`epix_merge`], if requested, to +#' represent potential update data that we do not yet have access to; or in +#' [`epix_merge`] to represent the "value" of an observation before the +#' version in which it was first released, or if no version of that +#' observation appears in the archive data at all. #' #' **A word of caution:** R6 objects, unlike most other objects in R, have #' reference semantics. A primary consequence of this is that objects are not #' copied when modified. You can read more about this in Hadley Wickham's -#' [Advanced R](https://adv-r.hadley.nz/r6.html#r6-semantics) book. +#' [Advanced R](https://adv-r.hadley.nz/r6.html#r6-semantics) book. In order +#' to construct a modified archive while keeping the original intact, first +#' make a clone using the `$clone` method, then overwrite the clone's `DT` +#' field with `data.table::copy(clone$DT)`, and finally perform the +#' modifications on the clone. #' #' @section Metadata: #' The following pieces of metadata are included as fields in an `epi_archive` @@ -66,7 +174,7 @@ #' `epi_df` format, which represents the most up-to-date values of the signal #' variables, as of the specified version. This is accomplished by calling the #' `as_of()` method for an `epi_archive` object `x`. More details on this -#' method are documented in the wrapper function `epix_as_of()`. +#' method are documented in the wrapper function [`epix_as_of()`]. #' #' @section Sliding Computations: #' We can run a sliding computation over an `epi_archive` object, much like @@ -76,8 +184,8 @@ #' difference: it is version-aware. That is, for an `epi_archive` object, the #' sliding computation at any given reference time point t is performed on #' **data that would have been available as of t**. More details on `slide()` -#' are documented in the wrapper function `epix_slide()`. -#' +#' are documented in the wrapper function [`epix_slide()`]. +#' #' @importFrom R6 R6Class #' @export #' @examples @@ -102,6 +210,8 @@ epi_archive = geo_type = NULL, time_type = NULL, additional_metadata = NULL, + clobberable_versions_start = NULL, + versions_end = NULL, #' @description Creates a new `epi_archive` object. #' @param x A data frame, data table, or tibble, with columns `geo_value`, #' `time_value`, `version`, and then any additional number of columns. @@ -117,6 +227,23 @@ epi_archive = #' @param additional_metadata List of additional metadata to attach to the #' `epi_archive` object. The metadata will have `geo_type` and `time_type` #' fields; named entries from the passed list or will be included as well. +#' @param compactify Optional; Boolean or `NULL`: should we remove rows that are +#' considered redundant for the purposes of `epi_archive`'s built-in methods +#' such as `as_of`? As these methods use the last version of each observation +#' carried forward (LOCF) to interpolate between the version data provided, +#' rows that don't change these LOCF results can potentially be omitted to +#' save space while maintaining the same behavior (with the help of the +#' `clobberable_versions_start` and `versions_end` fields in some +#' edge cases). `TRUE` will remove these rows, `FALSE` will not, and missing +#' or `NULL` will remove these rows and issue a warning. Generally, this can +#' be set to `TRUE`, but if you directly inspect or edit the fields of the +#' `epi_archive` such as its `DT`, you will have to determine whether +#' `compactify=TRUE` will produce the desired results. If compactification +#' here is removing a large proportion of the rows, this may indicate a +#' potential for space, time, or bandwidth savings upstream the data pipeline, +#' e.g., when fetching, storing, or preparing the input data `x` +#' @param clobberable_versions_start Optional; as in [`as_epi_archive`] +#' @param versions_end Optional; as in [`as_epi_archive`] #' @return An `epi_archive` object. #' @importFrom data.table as.data.table key setkeyv #' @@ -124,7 +251,8 @@ epi_archive = #' Refer to the documentation for [as_epi_archive()] for more information #' and examples of parameter names. initialize = function(x, geo_type, time_type, other_keys, - additional_metadata) { + additional_metadata, compactify, + clobberable_versions_start, versions_end) { # Check that we have a data frame if (!is.data.frame(x)) { Abort("`x` must be a data frame.") @@ -140,6 +268,10 @@ epi_archive = if (!("version" %in% names(x))) { Abort("`x` must contain a `version` column.") } + if (anyNA(x$version)) { + Abort("`x$version` must not contain `NA`s", + class = "epiprocess__version_values_must_not_be_na") + } # If geo type is missing, then try to guess it if (missing(geo_type)) { @@ -164,19 +296,106 @@ epi_archive = c("geo_type", "time_type"))) { Warn("`additional_metadata` names overlap with existing metadata fields \"geo_type\", \"time_type\".") } - + + # Conduct checks and apply defaults for `compactify` + if (missing(compactify)) { + compactify = NULL + } else if (!rlang::is_bool(compactify) && + !rlang::is_null(compactify)) { + Abort("compactify must be boolean or null.") + } + + # Apply defaults and conduct checks and apply defaults for + # `clobberable_versions_start`, `versions_end`: + if (missing(clobberable_versions_start)) { + clobberable_versions_start <- max_version_with_row_in(x) + } + if (missing(versions_end)) { + versions_end <- max_version_with_row_in(x) + } + validate_version_bound(clobberable_versions_start, x, na_ok=TRUE) + validate_version_bound(versions_end, x, na_ok=FALSE) + if (nrow(x) > 0L && versions_end < max(x[["version"]])) { + Abort(sprintf("`versions_end` was %s, but `x` contained + updates for a later version or versions, up through %s", + versions_end, max(x[["version"]])), + class="epiprocess__versions_end_earlier_than_updates") + } + if (!is.na(clobberable_versions_start) && clobberable_versions_start > versions_end) { + Abort(sprintf("`versions_end` was %s, but a `clobberable_versions_start` + of %s indicated that there were later observed versions", + versions_end, clobberable_versions_start), + class="epiprocess__versions_end_earlier_than_clobberable_versions_start") + } + + # --- End of validation and replacing missing args with defaults --- + # Create the data table; if x was an un-keyed data.table itself, # then the call to as.data.table() will fail to set keys, so we # need to check this, then do it manually if needed key_vars = c("geo_value", "time_value", other_keys, "version") DT = as.data.table(x, key = key_vars) if (!identical(key_vars, key(DT))) setkeyv(DT, cols = key_vars) - + + # Checks to see if a value in a vector is LOCF + is_locf <- function(vec) { + dplyr::if_else(!is.na(vec) & !is.na(dplyr::lag(vec)), + vec == dplyr::lag(vec), + is.na(vec) & is.na(dplyr::lag(vec))) + } + + # LOCF is defined by a row where all values except for the version + # differ from their respective lag values + + # Checks for LOCF's in a data frame + rm_locf <- function(df) { + dplyr::filter(df,if_any(c(everything(),-version),~ !is_locf(.))) + } + + # Keeps LOCF values, such as to be printed + keep_locf <- function(df) { + dplyr::filter(df,if_all(c(everything(),-version),~ is_locf(.))) + } + + # Runs compactify on data frame + if (is.null(compactify) || compactify == TRUE) { + elim = keep_locf(DT) + DT = rm_locf(DT) + } else { + # Create empty data frame for nrow(elim) to be 0 + elim = tibble::tibble() + } + + # Warns about redundant rows + if (is.null(compactify) && nrow(elim) > 0) { + warning_intro <- break_str(paste( + 'Found rows that appear redundant based on', + 'last (version of each) observation carried forward;', + 'these rows have been removed to "compactify" and save space:' + )) + + warning_data <- paste(collapse="\n", capture.output(print(elim, topn=3L, nrows=7L))) + + warning_outro <- break_str(paste( + "Built-in `epi_archive` functionality should be unaffected,", + "but results may change if you work directly with its fields (such as `DT`).", + "See `?as_epi_archive` for details.", + "To silence this warning but keep compactification,", + "you can pass `compactify=TRUE` when constructing the archive." + )) + + warning_message <- paste(sep="\n", warning_intro, warning_data, warning_outro) + + rlang::warn(warning_message, class="epiprocess__compactify_default_removed_rows") + } + # Instantiate all self variables self$DT = DT self$geo_type = geo_type self$time_type = time_type self$additional_metadata = additional_metadata + self$clobberable_versions_start = clobberable_versions_start + self$versions_end = versions_end }, print = function() { cat("An `epi_archive` object, with metadata:\n") @@ -194,14 +413,20 @@ epi_archive = min_time = Min(self$DT$time_value) max_time = Max(self$DT$time_value) } - cat(sprintf("* %-14s = %s\n", "min time value", - min_time)) - cat(sprintf("* %-14s = %s\n", "max time value", - max_time)) - cat(sprintf("* %-14s = %s\n", "min version", + cat(sprintf("* %-14s = %s\n", "min time value", min_time)) + cat(sprintf("* %-14s = %s\n", "max time value", max_time)) + cat(sprintf("* %-14s = %s\n", "first version with update", min(self$DT$version))) - cat(sprintf("* %-14s = %s\n", "max version", + cat(sprintf("* %-14s = %s\n", "last version with update", max(self$DT$version))) + if (is.na(self$clobberable_versions_start)) { + cat("No clobberable versions\n") + } else { + cat(sprintf("* %-14s = %s\n", "clobberable versions start", + self$clobberable_versions_start)) + } + cat(sprintf("* %-14s = %s\n", "versions end", + self$versions_end)) cat("----------\n") cat(sprintf("Data archive (stored in DT field): %i x %i\n", nrow(self$DT), ncol(self$DT))) @@ -213,31 +438,36 @@ epi_archive = cat("----------\n") cat(sprintf("Public methods: %s\n", paste(names(epi_archive$public_methods), - collapse = ", "))) + collapse = ", ")),"\n") + }, ##### #' @description Generates a snapshot in `epi_df` format as of a given version. -#' See the documentation for the wrapper function `epix_as_of()` for details. +#' See the documentation for the wrapper function [`epix_as_of()`] for details. #' @importFrom data.table between key as_of = function(max_version, min_time_value = -Inf) { # Self max version and other keys - self_max = max(self$DT$version) other_keys = setdiff(key(self$DT), c("geo_value", "time_value", "version")) if (length(other_keys) == 0) other_keys = NULL # Check a few things on max_version - if (!identical(class(max_version), class(self$DT$version))) { - Abort("`max_version` and `DT$version` must have same class.") + if (!identical(class(max_version), class(self$DT$version)) || + !identical(typeof(max_version), typeof(self$DT$version))) { + Abort("`max_version` and `DT$version` must have same `class` and `typeof`.") } if (length(max_version) != 1) { Abort("`max_version` cannot be a vector.") } - if (max_version > self_max) { - Abort("`max_version` must be at most `max(DT$max_version)`.") + if (is.na(max_version)) { + Abort("`max_version` must not be NA.") + } + if (max_version > self$versions_end) { + Abort("`max_version` must be at most `self$versions_end`.") } - if (max_version == self_max) { - Warn("Getting data as of the latest version possible. For a variety of reasons, it is possible that we only have a preliminary picture of this version (e.g., the upstream source has updated it but we have not seen it due to latency in synchronization). Thus, the snapshot that we produce here might not be reproducible at a later time (e.g., when the archive has caught up in terms of synchronization).") + if (!is.na(self$clobberable_versions_start) && max_version >= self$clobberable_versions_start) { + Warn('Getting data as of some "clobberable" version that might be hotfixed, synced, or otherwise replaced later with different data using the same version tag. Thus, the snapshot that we produce here might not be reproducible later. See `?epi_archive` for more info and `?epix_as_of` on how to muffle.', + class="epiprocess__snapshot_as_of_clobberable_version") } # Filter by version and return @@ -257,41 +487,103 @@ epi_archive = ) }, ##### -#' @description Merges another `data.table` with the current one, and allows for -#' a post-filling of `NA` values by last observation carried forward (LOCF). -#' See the documentation for the wrapper function `epix_merge()` for details. -#' @importFrom data.table key merge.data.table nafill - merge = function(y, ..., locf = TRUE, nan = NA) { - # Check we have a `data.table` object - if (!(inherits(y, "data.table") || inherits(y, "epi_archive"))) { - Abort("`y` must be of class `data.table` or `epi_archive`.") +#' @description Fill in unobserved history using requested scheme by mutating +#' `self` and potentially reseating its fields. See +#' [`epix_fill_through_version`] for a full description of the non-R6-method +#' version, which doesn't mutate the input archive but might alias its fields. +#' +#' @param fill_versions_end as in [`epix_fill_through_version`] +#' @param how as in [`epix_fill_through_version`] +#' +#' @importFrom data.table key setkeyv := address copy +#' @importFrom rlang arg_match + fill_through_version = function(fill_versions_end, + how=c("na", "locf")) { + validate_version_bound(fill_versions_end, self$DT, na_ok=FALSE) + how <- arg_match(how) + if (self$versions_end < fill_versions_end) { + new_DT = switch( + how, + "na" = { + # old DT + a version consisting of all NA observations + # immediately after the last currently/actually-observed + # version. Note that this NA-observation version must only be + # added if `self` is outdated. + nonversion_key_cols = setdiff(key(self$DT), "version") + nonkey_cols = setdiff(names(self$DT), key(self$DT)) + next_version_tag = next_after(self$versions_end) + if (next_version_tag > fill_versions_end) { + Abort(sprintf(paste( + "Apparent problem with `next_after` implementation:", + "archive contained observations through version %s", + "and the next possible version was supposed to be %s,", + "but this appeared to jump from a version < %3$s", + "to one > %3$s, implying at least one version in between." + ), self$versions_end, next_version_tag, fill_versions_end)) + } + nonversion_key_vals_ever_recorded = unique(self$DT, by=nonversion_key_cols) + # In edge cases, the `unique` result can alias the original + # DT; detect and copy if necessary: + if (identical(address(self$DT), address(nonversion_key_vals_ever_recorded))) { + nonversion_key_vals_ever_recorded <- copy(nonversion_key_vals_ever_recorded) + } + next_version_DT = nonversion_key_vals_ever_recorded[ + , version := next_version_tag][ + # this makes the class of these columns logical (`NA` is a + # logical NA; we're relying on the rbind below to convert to + # the proper class&typeof) + , (nonkey_cols) := NA] + # full result DT: + setkeyv(rbind(self$DT, next_version_DT), key(self$DT))[] + }, + "locf" = { + # just the old DT; LOCF is built into other methods: + self$DT + } + ) + new_versions_end = fill_versions_end + # Update `self` all at once with simple, error-free operations + + # return below: + self$DT <- new_DT + self$versions_end <- new_versions_end + } else { + # Already sufficiently up to date; nothing to do. } + return (invisible(self)) + }, + ##### +#' @description Merges another `epi_archive` with the current one, mutating the +#' current one by reseating its `DT` and several other fields, but avoiding +#' mutation of the old `DT`; returns the current archive +#' [invisibly][base::invisible]. See [`epix_merge`] for a full description +#' of the non-R6-method version, which does not mutate either archive, and +#' does not alias either archive's `DT`. +#' @param y as in [`epix_merge`] +#' @param sync as in [`epix_merge`] +#' @param compactify as in [`epix_merge`] + merge = function(y, sync = c("forbid","na","locf","truncate"), compactify = TRUE) { + result = epix_merge(self, y, + sync = sync, + compactify = compactify) - # Use the data.table merge function, carrying through ... args - if (inherits(y, "data.table")) self$DT = merge(self$DT, y, ...) - else self$DT = merge(self$DT, y$DT, ...) + if (length(epi_archive$private_fields) != 0L) { + Abort("expected no private fields in epi_archive", + internal=TRUE) + } - # Now use the data.table locf function, if we're asked to - if (locf) { - key_vars = key(self$DT) - cols = setdiff(names(self$DT), key_vars) - by = setdiff(key_vars, "version") + # Mutate fields all at once, trying to avoid any potential errors: + for (field_name in names(epi_archive$public_fields)) { + self[[field_name]] <- result[[field_name]] + } - # Important: use nafill and not setnafill because the latter - # returns the entire data frame by reference, and the former can - # be set to act on particular columns by reference using := - self$DT[, - (cols) := nafill(.SD, type = "locf", nan = nan), - .SDcols = cols, - by = by] - } - }, + return (invisible(self)) + }, ##### #' @description Slides a given function over variables in an `epi_archive` -#' object. See the documentation for the wrapper function `epix_as_of()` for +#' object. See the documentation for the wrapper function [`epix_slide()`] for #' details. #' @importFrom data.table key -#' @importFrom rlang !! !!! enquo enquos is_quosure sym syms +#' @importFrom rlang !! !!! enquo quo_is_missing enquos is_quosure sym syms slide = function(f, ..., n, group_by, ref_time_values, time_step, new_col_name = "slide_value", as_list_col = FALSE, names_sep = "_", @@ -310,19 +602,16 @@ epi_archive = before_num = n-1 if (!missing(time_step)) before_num = time_step(n-1) - # What to group by? If missing, set according to internal keys - if (missing(group_by)) { - group_by = setdiff(key(self$DT), c("time_value", "version")) + # What to group by? If missing, set according to internal keys; + # otherwise, tidyselect. + if (quo_is_missing(enquo(group_by))) { + group_by <- syms(setdiff(key(self$DT), c("time_value", "version"))) + } else { + group_by <- syms(names(eval_select(enquo(group_by), self$DT))) } - # Symbolize column name, defuse grouping variables. We have to do - # the middle step here which is a bit complicated (unfortunately) - # since the function epix_slide() could have called the current one, - # and in doing so, it may have already needed to defuse the grouping - # variables + # Symbolize column name new_col = sym(new_col_name) - if (!is_quosure(group_by)) group_by = enquo(group_by) - group_by = syms(names(eval_select(group_by, self$DT))) # Key variable names, apart from time value and version key_vars = setdiff(key(self$DT), c("time_value", "version")) @@ -463,6 +752,50 @@ epi_archive = #' @param additional_metadata List of additional metadata to attach to the #' `epi_archive` object. The metadata will have `geo_type` and `time_type` #' fields; named entries from the passed list or will be included as well. +#' @param compactify Optional; Boolean or `NULL`: should we remove rows that are +#' considered redundant for the purposes of `epi_archive`'s built-in methods +#' such as `as_of`? As these methods use the last version of each observation +#' carried forward (LOCF) to interpolate between the version data provided, +#' rows that don't change these LOCF results can potentially be omitted to +#' save space. `TRUE` will remove these rows, `FALSE` will not, and missing or +#' `NULL` will remove these rows and issue a warning. Generally, this can be +#' set to `TRUE`, but if you directly inspect or edit the fields of the +#' `epi_archive` such as its `DT`, you will have to determine whether +#' `compactify=TRUE` will produce the desired results. If compactification +#' here is removing a large proportion of the rows, this may indicate a +#' potential for space, time, or bandwidth savings upstream the data pipeline, +#' e.g., when fetching, storing, or preparing the input data `x` +#' @param clobberable_versions_start Optional; `length`-1; either a value of the +#' same `class` and `typeof` as `x$version`, or an `NA` of any `class` and +#' `typeof`: specifically, either (a) the earliest version that could be +#' subject to "clobbering" (being overwritten with different update data, but +#' using the same version tag as the old update data), or (b) `NA`, to +#' indicate that no versions are clobberable. There are a variety of reasons +#' why versions could be clobberable, such as upstream hotfixes to the latest +#' version, or delays in data synchronization that were mistaken for versions +#' with no updates; potential causes vary between different data pipelines. +#' The default value is `max_version_with_row_in(x)`; this default assumes +#' that (i) if a row in `x` (even one that `compactify` would consider +#' redundant) is present with version `ver`, then all previous versions must +#' be finalized and non-clobberable, although `ver` (and onward) might still +#' be modified, (ii) even if we have "observed" empty updates for some +#' versions beyond `max(x$version)` (as indicated by `versions_end`; +#' see below), we can't assume `max(x$version)` has been finalized, because we +#' might see a nonfinalized version + empty subsequent versions due to +#' upstream database replication delays in combination with the upstream +#' replicas using last-version-carried-forward to extrapolate that there were +#' no updates, (iii) "redundant" update rows that would be removed by +#' `compactify` are not redundant, and actually come from an explicit version +#' release that indicates that preceding versions are finalized. If `nrow(x) +#' == 0`, then this argument is mandatory. +#' @param versions_end Optional; length-1, same `class` and `typeof` as +#' `x$version`: what is the last version we have observed? The default is +#' `max_version_with_row_in(x)`, but values greater than this could also be +#' valid, and would indicate that we observed additional versions of the data +#' beyond `max(x$version)`, but they all contained empty updates. (The default +#' value of `clobberable_versions_start` does not fully trust these empty +#' updates, and assumes that any version `>= max(x$version)` could be +#' clobbered.) If `nrow(x) == 0`, then this argument is mandatory. #' @return An `epi_archive` object. #' #' @details This simply a wrapper around the `new()` method of the `epi_archive` @@ -495,22 +828,26 @@ epi_archive = #' df <- data.frame (geo_value = c(replicate(2, "ca"), replicate(2, "fl")), #' county = c(1, 3, 2, 5), #' time_value = c("2020-06-01", -#' "2020-06-02", -#' "2020-06-01", -#' "2020-06-02"), +#' "2020-06-02", +#' "2020-06-01", +#' "2020-06-02"), #' version = c("2020-06-02", -#' "2020-06-03", -#' "2020-06-02", -#' "2020-06-03"), +#' "2020-06-03", +#' "2020-06-02", +#' "2020-06-03"), #' cases = c(1, 2, 3, 4), #' cases_rate = c(0.01, 0.02, 0.01, 0.05)) #' #' x <- df %>% as_epi_archive(geo_type = "state", -#' time_type = "day", -#' other_keys = "county") +#' time_type = "day", +#' other_keys = "county") as_epi_archive = function(x, geo_type, time_type, other_keys, - additional_metadata = list()) { - epi_archive$new(x, geo_type, time_type, other_keys, additional_metadata) + additional_metadata = list(), + compactify = NULL, + clobberable_versions_start = max_version_with_row_in(x), + versions_end = max_version_with_row_in(x)) { + epi_archive$new(x, geo_type, time_type, other_keys, additional_metadata, + compactify, clobberable_versions_start, versions_end) } #' Test for `epi_archive` format diff --git a/R/data.R b/R/data.R index f281803e..f642ac47 100644 --- a/R/data.R +++ b/R/data.R @@ -67,8 +67,129 @@ #' * \href{https://cmu-delphi.github.io/delphi-epidata/api/covidcast-signals/doctor-visits.html}{From the COVIDcast Doctor Visits API}: The signal `percent_cli` is taken directly from the API without changes. #' * \href{https://cmu-delphi.github.io/delphi-epidata/api/covidcast-signals/jhu-csse.html}{From the COVIDcast Epidata API}: `case_rate_7d_av` is taken directly from the JHU CSSE \href{https://github.com/CSSEGISandData/COVID-19}{COVID-19 GitHub repository} without changes. The 7-day average signals are computed by Delphi by calculating moving averages of the preceding 7 days, so the signal for June 7 is the average of the underlying data for June 1 through 7, inclusive. #' * Furthermore, the data is a subset of the full dataset, the signal names slightly altered, and formatted into a tibble. +#' +#' @export "archive_cases_dv_subset" +#' Detect whether `pkgload` is unregistering a package (with some unlikely false positives) +#' +#' More precisely, detects the presence of a call to an `unregister` or +#' `unregister_namespace` function from any package in the indicated part of the +#' function call stack. +#' +#' @param parent_n optional, single non-`NA` non-negative integer; how many +#' "parent"/"ancestor" calls should we skip inspecting? Default of `0L` will +#' check everything up to, but not including the call to this function. If +#' building wrappers or utilities around this function it may be useful to use +#' this default to ignore those wrappers, especially if they might trigger +#' false positives now or in some future version of this function with a looser +#' function name test. +#' +#' @return Boolean +#' +#' @noRd +some_package_is_being_unregistered = function(parent_n = 0L) { + calls = sys.calls() + # `calls` will include the call to this function; strip out this call plus + # `parent_n` additional requested calls to make it like we're reasoning about + # the desired call. This could prevent potential false positives from + # triggering if, in a later version, we decide to loosen the `call_name` + # checks below to something that would be `TRUE` for the name of this function + # or one of the undesired call ancestors. + calls_to_inspect = utils::head(calls, n = -(parent_n + 1L)) + # Note that `utils::head(sys.calls(), n=-1L)` isn't equivalent, due to lazy + # argument evaluation. Note that copy-pasting the body of this function + # without this `utils::head` operation isn't always equivalent to calling it; + # e.g., within the `value` argument of a package-level `delayedAssign`, + # `sys.calls()` will return `NULL` is some or all cases, including when its + # evaluation has been triggered via `unregister`. + simple_call_names = purrr::map_chr(calls_to_inspect, function(call) { + maybe_simple_call_name = rlang::call_name(call) + if (is.null(maybe_simple_call_name)) NA_character_ else maybe_simple_call_name + }) + # `pkgload::unregister` is an (the?) exported function that forces + # package-level promises, while `pkgload:::unregister_namespace` is the + # internal function that does this package-level promise. Check for both just + # in case there's another exported function that calls `unregister_namespace` + # or other `pkgload` versions don't use the `unregister_namespace` internal. + # (Note that `NA_character_ %in% ` is `FALSE` rather + # than `NA`, giving the desired semantics and avoiding potential `NA`s in the + # argument to `any`.) + any(simple_call_names %in% c("unregister", "unregister_namespace")) +} + +#' [`base::delayedAssign`] with [`pkgload::unregister`] awareness, injection support +#' +#' Provides better feedback on errors during promise evaluation while a package +#' is being unregistered, to help package developers escape from a situation +#' where a buggy promise prevents package reloading. Also provide `rlang` +#' injection support (like [`rlang::env_bind_lazy`]). The call stack will look +#' different than when using `delayedAssign` directly. +#' +#' @noRd +delayed_assign_with_unregister_awareness = function(x, value, + eval.env = rlang::caller_env(), + assign.env = rlang::caller_env()) { + value_quosure = rlang::as_quosure(rlang::enexpr(value), eval.env) + this_env = environment() + delayedAssign(x, eval.env = this_env, assign.env = assign.env, value = { + if (some_package_is_being_unregistered()) { + withCallingHandlers( + # `rlang::eval_tidy(value_quosure)` is shorter and would sort of work, + # but doesn't give the same `ls`, `rm`, and top-level `<-` behavior as + # we'd have with `delayedAssign`; it doesn't seem to actually evaluate + # quosure's expr in the quosure's env. Using `rlang::eval_bare` instead + # seems to do the trick. (We also could have just used a `value_expr` + # and `eval.env` together rather than introducing `value_quosure` at + # all.) + rlang::eval_bare(rlang::quo_get_expr(value_quosure), rlang::quo_get_env(value_quosure)), + error = function(err) { + Abort(paste("An error was raised while attempting to evaluate a promise", + "(prepared with `delayed_assign_with_unregister_awareness`)", + "while an `unregister` or `unregister_namespace` call", + "was being evaluated.", + "This can happen, for example, when `devtools::load_all`", + "reloads a package that contains a buggy promise,", + "because reloading can cause old package-level promises to", + "be forced via `pkgload::unregister` and", + "`pkgload:::unregister_namespace`, due to", + "https://github.com/r-lib/pkgload/pull/157.", + "If this is the current situation, you might be able to", + "be successfully reload the package again after", + "`unloadNamespace`-ing it (but this situation will", + "keep re-occurring every other `devtools::load`", + "and every `devtools:document` until the bug or situation", + "generating the promise's error has been resolved)." + ), + class = "epiprocess__promise_evaluation_error_during_unregister", + parent = err) + }) + } else { + rlang::eval_bare(rlang::quo_get_expr(value_quosure), rlang::quo_get_env(value_quosure)) + } + }) +} + +# Like normal data objects, set `archive_cases_dv_subset` up as a promise, so it +# doesn't take unnecessary space before it's evaluated. This also avoids a need +# for @include tags. However, this pattern will use unnecessary space after this +# promise is evaluated, because `as_epi_archive` clones `archive_cases_dv_subset_dt` +# and `archive_cases_dv_subset_dt` will stick around along with `archive_cases_dv_subset` +# after they have been evaluated. We may want to add an option to avoid cloning +# in `as_epi_archive` and make use of it here. But we may also want to change +# this into an active binding that clones every time, unless we can hide the +# `DT` field from the user (make it non-`public` in general) or make it +# read-only (in this specific case), so that the user cannot modify the `DT` +# here and potentially mess up examples that they refer to later on. +# +# During development, note that reloading the package and re-evaluating this +# promise should prepare the archive from the DT using any changes that have +# been made to `as_epi_archive`; however, if earlier, any field of +# `archive_cases_dv_subset` was modified using `<-`, a global environment +# binding may have been created with the same name as the package promise, and +# this binding will stick around even when the package is reloaded, and will +# need to be `rm`-d to easily access the refreshed package promise. +delayed_assign_with_unregister_awareness("archive_cases_dv_subset", as_epi_archive(archive_cases_dv_subset_dt, compactify=FALSE)) #' Subset of JHU daily cases from California and Florida #' @@ -120,4 +241,4 @@ #' Modifications: #' * \href{https://cmu-delphi.github.io/delphi-epidata/api/covidcast-signals/jhu-csse.html}{From the COVIDcast Epidata API}: These signals are taken directly from the JHU CSSE \href{https://github.com/CSSEGISandData/COVID-19}{COVID-19 GitHub repository} without changes. The 7-day average signals are computed by Delphi by calculating moving averages of the preceding 7 days, so the signal for June 7 is the average of the underlying data for June 1 through 7, inclusive. #' * Furthermore, the data has been limited to a very small number of rows, the signal names slightly altered, and formatted into a tibble. -"jhu_csse_county_level_subset" \ No newline at end of file +"jhu_csse_county_level_subset" diff --git a/R/methods-epi_archive.R b/R/methods-epi_archive.R index 9a03f5b9..3387c935 100644 --- a/R/methods-epi_archive.R +++ b/R/methods-epi_archive.R @@ -32,62 +32,308 @@ #' epix_as_of(x = archive_cases_dv_subset, #' max_version = max(archive_cases_dv_subset$DT$version)) #' -#' # no warning shown -#' epix_as_of(archive_cases_dv_subset, max_version = as.Date("2020-06-10")) +#' @export +#' @examples +#' +#' range(archive_cases_dv_subset$DT$version) # 2020-06-02 -- 2021-12-01 +#' +#' epix_as_of(x = archive_cases_dv_subset, +#' max_version = as.Date("2020-06-12")) +#' +#' # When fetching a snapshot as of the latest version with update data in the +#' # archive, a warning is issued by default, as this update data might not yet +#' # be finalized (for example, if data versions are labeled with dates, these +#' # versions might be overwritten throughout the corresponding days with +#' # additional data or "hotfixes" of erroroneous data; when we build an archive +#' # based on database queries, the latest available update might still be +#' # subject to change, but previous versions should be finalized). We can +#' # muffle such warnings with the following pattern: +#' withCallingHandlers({ +#' epix_as_of(x = archive_cases_dv_subset, +#' max_version = max(archive_cases_dv_subset$DT$version)) +#' }, epiprocess__snapshot_as_of_clobberable_version = function(wrn) invokeRestart("muffleWarning")) +#' # Since R 4.0, there is a `globalCallingHandlers` function that can be used +#' # to globally toggle these warnings. epix_as_of = function(x, max_version, min_time_value = -Inf) { if (!inherits(x, "epi_archive")) Abort("`x` must be of class `epi_archive`.") return(x$as_of(max_version, min_time_value)) } +#' `epi_archive` with unobserved history filled in (won't mutate, might alias) +#' +#' Sometimes, due to upstream data pipeline issues, we have to work with a +#' version history that isn't completely up to date, but with functions that +#' expect archives that are completely up to date, or equally as up-to-date as +#' another archive. This function provides one way to approach such mismatches: +#' pretend that we've "observed" additional versions, filling in these versions +#' with NAs or extrapolated values. +#' +#' '`epix_fill_through_version` will not mutate its `x` argument, but its result +#' might alias fields of `x` (e.g., mutating the result's `DT` might mutate +#' `x$DT`). The R6 method variant, `x$fill_through_version`, will mutate `x` to +#' give the result, but might reseat its fields (e.g., references to the old +#' `x$DT` might not be updated by this function or subsequent operations on +#' `x`), and returns the updated `x` [invisibly][base::invisible]. +#' +#' @param x An `epi_archive` +#' @param fill_versions_end Length-1, same class&type as `%s$version`: the +#' version through which to fill in missing version history; this will be the +#' result's `$versions_end` unless it already had a later +#' `$versions_end`. +#' @param how Optional; `"na"` or `"locf"`: `"na"` will fill in any missing +#' required version history with `NA`s, by inserting (if necessary) an update +#' immediately after the current `$versions_end` that revises all +#' existing measurements to be `NA` (this is only supported for `version` +#' classes with a `next_after` implementation); `"locf"` will fill in missing +#' version history with the last version of each observation carried forward +#' (LOCF), by leaving the update `$DT` alone (other `epi_archive` methods are +#' based on LOCF). Default is `"na"`. +#' @return An `epi_archive` +epix_fill_through_version = function(x, fill_versions_end, + how=c("na", "locf")) { + if (!inherits(x, "epi_archive")) Abort("`x` must be of class `epi_archive`.") + # Enclosing parentheses drop the invisibility flag. See description above of + # potential mutation and aliasing behavior. + ( x$clone()$fill_through_version(fill_versions_end, how=how) ) +} + #' Merge two `epi_archive` objects #' -#' Merges the underlying data tables in two `epi_archive` objects, allows for -#' post-filling of `NA` values by last observation carried forward (LOCF), and -#' **overwrites** the first data table with the merged one. See the [archive -#' vignette](https://cmu-delphi.github.io/epiprocess/articles/archive.html) for -#' examples. +#' Merges two `epi_archive`s that share a common `geo_value`, `time_value`, and +#' set of key columns. When they also share a common `versions_end`, +#' using `$as_of` on the result should be the same as using `$as_of` on `x` and +#' `y` individually, then performing a full join of the `DT`s on the non-version +#' key columns (potentially consolidating multiple warnings about clobberable +#' versions). If the `versions_end` values differ, the +#' `sync` parameter controls what is done. #' -#' @param x,y Two `epi_archive` objects to join together, or more specifically, -#' whose underlying data tables are to be joined together. The data table in -#' `x` will be overwritten with the joined data table. For convenience, we -#' also allow `y` to be passed in directly as a `data.table` (need not be an -#' `epi_archive` object). -#' @param ... Named arguments to pass to `data.table::merge.data.table()`, which -#' is used for the join (with all default settings as in this function). For -#' example, passing `all = TRUE` will perform a full join. -#' @param locf Should LOCF be used after joining on all non-key columns? This -#' will take the latest version of each signal value and propogate it forward -#' to fill in gaps that appear after merging. Default is `TRUE`. -#' @param nan Should `NaN` values be treated as `NA` values in the post-filling -#' step? Default is `NA`, which means that they are treated as `NA` values; if -# `NaN`, then they are treated as distinct. -#' @return Nothing; the data table in `x` is overwritten with the merged one. -#' -#' @details This is simply a wrapper around the `merge()` method of the -#' `epi_archive` class, so if `x` and `y` are an `epi_archive` objects, then: -#' ``` -#' epix_merge(x, y) -#' ``` -#' is equivalent to: -#' ``` -#' x$merge(y) -#' ``` +#' This function, [`epix_merge`], does not mutate its inputs and will not alias +#' either archive's `DT`, but may alias other fields; `x$merge` will overwrite +#' `x` with the result of the merge, reseating its `DT` and several other fields +#' (making them point to different objects), but avoiding mutation of the +#' contents of the old `DT` (only relevant if you have another reference to the +#' old `DT` in another object). +#' +#' @param x,y Two `epi_archive` objects to join together. +#' @param sync Optional; `"forbid"`, `"na"`, `"locf"`, or `"truncate"`; in the +#' case that `x$versions_end` doesn't match `y$versions_end`, what do we do?: +#' `"forbid"`: emit an error; "na": use `max(x$versions_end, y$versions_end)` +#' as the result's `versions_end`, but ensure that, if we request a snapshot +#' as of a version after `min(x$versions_end, y$versions_end)`, the +#' observation columns from the less up-to-date archive will be all NAs (i.e., +#' imagine there was an update immediately after its `versions_end` which +#' revised all observations to be `NA`); `"locf"`: use `max(x$versions_end, +#' y$versions_end)` as the result's `versions_end`, allowing the last version +#' of each observation to be carried forward to extrapolate unavailable +#' versions for the less up-to-date input archive (i.e., imagining that in the +#' less up-to-date archive's data set remained unchanged between its actual +#' `versions_end` and the other archive's `versions_end`); or `"truncate"`: +#' use `min(x$versions_end, y$versions_end)` as the result's `versions_end`, +#' and discard any rows containing update rows for later versions. +#' @param compactify Optional; `TRUE`, `FALSE`, or `NULL`; should the result be +#' compactified? See [`as_epi_archive`] for an explanation of what this means. +#' Default here is `TRUE`. +#' @return the resulting `epi_archive` +#' +#' @details In all cases, `additional_metadata` will be an empty list, and +#' `clobberable_versions_start` will be set to the earliest version that could +#' be clobbered in either input archive. #' -#' @export #' @examples #' # create two example epi_archive datasets #' x <- archive_cases_dv_subset$DT %>% #' dplyr::select(geo_value,time_value,version,case_rate_7d_av) %>% -#' as_epi_archive() +#' as_epi_archive(compactify=TRUE) #' y <- archive_cases_dv_subset$DT %>% #' dplyr::select(geo_value,time_value,version,percent_cli) %>% -#' as_epi_archive() +#' as_epi_archive(compactify=TRUE) +#' # merge results stored in a third object: +#' xy = epix_merge(x, y) +#' # vs. mutating x to hold the merge result: +#' x$merge(y) #' -#' # a full join stored in x -#' epix_merge(x, y, all = TRUE) -epix_merge = function(x, y, ..., locf = TRUE, nan = NA) { - if (!inherits(x, "epi_archive")) Abort("`x` must be of class `epi_archive`.") - return(x$merge(y, ..., locf = locf, nan = nan)) +#' @importFrom data.table key set +#' @export +epix_merge = function(x, y, + sync = c("forbid","na","locf","truncate"), + compactify = TRUE) { + if (!inherits(x, "epi_archive")) { + Abort("`x` must be of class `epi_archive`.") + } + + if (!inherits(y, "epi_archive")) { + Abort("`y` must be of class `epi_archive`.") + } + + sync <- rlang::arg_match(sync) + + if (!identical(x$geo_type, y$geo_type)) { + Abort("`x` and `y` must have the same `$geo_type`") + } + + if (!identical(x$time_type, y$time_type)) { + Abort("`x` and `y` must have the same `$time_type`") + } + + if (length(x$additional_metadata) != 0L) { + Warn("x$additional_metadata won't appear in merge result", + class = "epiprocess__epix_merge_ignores_additional_metadata") + } + if (length(y$additional_metadata) != 0L) { + Warn("y$additional_metadata won't appear in merge result", + class = "epiprocess__epix_merge_ignores_additional_metadata") + } + result_additional_metadata = list() + + result_clobberable_versions_start = + if (all(is.na(c(x$clobberable_versions_start, y$clobberable_versions_start)))) { + NA # (any type of NA is fine here) + } else { + Min(c(x$clobberable_versions_start, y$clobberable_versions_start)) + } + + # The actual merge below may not succeed 100% of the time, so do this + # preprocessing using non-mutating (but potentially aliasing) functions. This + # approach potentially uses more memory, but won't leave behind a + # partially-mutated `x` on failure. + if (sync == "forbid") { + if (!identical(x$versions_end, y$versions_end)) { + Abort(paste( + "`x` and `y` were not equally up to date version-wise:", + "`x$versions_end` was not identical to `y$versions_end`;", + "either ensure that `x` and `y` are equally up to date before merging,", + "or specify how to deal with this using `sync`" + ), class="epiprocess__epix_merge_unresolved_sync") + } else { + new_versions_end = x$versions_end + x_DT = x$DT + y_DT = y$DT + } + } else if (sync %in% c("na", "locf")) { + new_versions_end = max(x$versions_end, y$versions_end) + x_DT = epix_fill_through_version(x, new_versions_end, sync)$DT + y_DT = epix_fill_through_version(y, new_versions_end, sync)$DT + } else if (sync == "truncate") { + new_versions_end = min(x$versions_end, y$versions_end) + x_DT = x$DT[x[["DT"]][["version"]] <= new_versions_end, with=FALSE] + y_DT = y$DT[y[["DT"]][["version"]] <= new_versions_end, with=FALSE] + } else Abort("unimplemented") + + if (!identical(key(x$DT), key(x_DT)) || !identical(key(y$DT), key(y_DT))) { + Abort("preprocessing of data tables in merge changed the key unexpectedly", + internal=TRUE) + } + ## key(x_DT) should be the same as key(x$DT) and key(y_DT) should be the same + ## as key(y$DT). If we want to break this function into parts it makes sense + ## to use {x,y}_DT below, but this makes the error checks and messages look a + ## little weird and rely on the key-matching assumption above. + if (!identical(sort(key(x_DT)), sort(key(y_DT)))) { + Abort(" + The archives must have the same set of key column names; if the + key columns represent the same things, just with different + names, please retry after manually renaming to match; if they + represent different things (e.g., x has an age breakdown + but y does not), please retry after processing them to share + the same key (e.g., by summarizing x to remove the age breakdown, + or by applying a static age breakdown to y). + ", class="epiprocess__epix_merge_x_y_must_have_same_key_set") + } + # `by` cols = result (and each input's) `key` cols, and determine + # the row set, determined using a full join via `merge` + # + # non-`by` cols = "value"-ish cols, and are looked up with last + # version carried forward via rolling joins + by = key(x_DT) # = some perm of key(y_DT) + if (!all(c("geo_value","time_value","version") %in% key(x_DT))) { + Abort('Invalid `by`; `by` is currently set to the common `key` of + the two archives, and is expected to contain + "geo_value", "time_value", and "version".', + class="epiprocess__epi_archive_must_have_required_key_cols") + } + if (length(by) < 1L || utils::tail(by, 1L) != "version") { + Abort('Invalid `by`; `by` is currently set to the common `key` of + the two archives, and is expected to have a "version" as + the last key col.', + class="epiprocess__epi_archive_must_have_version_at_end_of_key") + } + x_nonby_colnames = setdiff(names(x_DT), by) + y_nonby_colnames = setdiff(names(y_DT), by) + if (length(intersect(x_nonby_colnames, y_nonby_colnames)) != 0L) { + Abort(" + `x` and `y` DTs have overlapping non-by column names; + this is currently not supported; please manually fix up first: + any overlapping columns that can are key-like should be + incorporated into the key, and other columns should be renamed. + ", class="epiprocess__epix_merge_x_y_must_not_have_overlapping_nonby_colnames") + } + x_by_vals = x_DT[, by, with=FALSE] + if (anyDuplicated(x_by_vals) != 0L) { + Abort(" + The `by` columns must uniquely determine rows of `x$DT`; + the `by` is currently set to the common `key` of the two + archives, so this can be resolved by adding key-like columns + to `x`'s key (to get a unique key). + ", class="epiprocess__epix_merge_by_cols_must_act_as_unique_key") + } + y_by_vals = y_DT[, by, with=FALSE] + if (anyDuplicated(y_by_vals) != 0L) { + Abort(" + The `by` columns must uniquely determine rows of `y$DT`; + the `by` is currently set to the common `key` of the two + archives, so this can be resolved by adding key-like columns + to `y`'s key (to get a unique key). + ", class="epiprocess__epix_merge_by_cols_must_act_as_unique_key") + } + result_DT = merge(x_by_vals, y_by_vals, by=by, + # We must have `all=TRUE` or we may skip updates + # from x and/or y and corrupt the history + all=TRUE, + # We don't want Cartesian products, but the + # by-is-unique-key check above already ensures + # this. (Note that `allow.cartesian=FALSE` doesn't + # actually catch all Cartesian products anyway.) + # Disable superfluous check: + allow.cartesian=TRUE) + set(result_DT,, x_nonby_colnames, + x_DT[result_DT[, by, with=FALSE], x_nonby_colnames, with=FALSE, + # It's good practice to specify `on`, and we must + # explicitly specify `on` if there's a potential key vs. + # by order mismatch (not possible currently for x + # with by = key(x$DT), but possible for y): + on = by, + # last version carried forward: + roll=TRUE, + # requesting non-version key that doesn't exist in the other archive, + # or before its first version, should result in NA + nomatch=NA, + # see note on `allow.cartesian` above; currently have a + # similar story here. + allow.cartesian=TRUE]) + set(result_DT,, y_nonby_colnames, + y_DT[result_DT[, by, with=FALSE], y_nonby_colnames, with=FALSE, + on = by, + roll=TRUE, + nomatch=NA, + allow.cartesian=TRUE]) + # The key could be unset in case of a key vs. by order mismatch as + # noted above. Ensure that we keep it: + setkeyv(result_DT, by) + + return (as_epi_archive( + result_DT[], # clear data.table internal invisibility flag if set + geo_type = x$geo_type, + time_type = x$time_type, + other_keys = setdiff(key(result_DT), c("geo_value","time_value","version")), + additional_metadata = result_additional_metadata, + # It'd probably be better to pre-compactify before the merge, and might be + # guaranteed not to be necessary to compactify the merge result if the + # inputs are already compactified, but at time of writing we don't have + # compactify in its own method or field, and it seems like it should be + # pretty fast anyway. + compactify = compactify, + clobberable_versions_start = result_clobberable_versions_start, + versions_end = new_versions_end + )) } #' Slide a function over variables in an `epi_archive` object @@ -207,7 +453,7 @@ epix_slide = function(x, f, ..., n, group_by, ref_time_values, as_list_col = FALSE, names_sep = "_", all_rows = FALSE) { if (!inherits(x, "epi_archive")) Abort("`x` must be of class `epi_archive`.") return(x$slide(f, ..., n = n, - group_by = enquo(group_by), + group_by = {{group_by}}, ref_time_values = ref_time_values, time_step = time_step, new_col_name = new_col_name, diff --git a/R/sysdata.rda b/R/sysdata.rda new file mode 100644 index 00000000..d100711d Binary files /dev/null and b/R/sysdata.rda differ diff --git a/README.md b/README.md index cdace7fc..3241cb0f 100644 --- a/README.md +++ b/README.md @@ -53,9 +53,15 @@ class. For example: - `epix_as_of()`, for generating a snapshot in `epi_df` from the data archive, which represents the most up-to-date values of the signal variables, as of the specified version; + +- `epix_fill_through_version()`, for filling in some fake version data following + simple rules, for use when downstream methods expect an archive that is more + up-to-date (e.g., if it is a forecasting deadline date and one of our data + sources cannot be accessed to provide the latest versions of its data) - `epix_merge()`, for merging two data archives with each other, with support - for filling in missing values via last observation carried forward (LOCF); + for various approaches to handling when one of the archives is more up-to-date + version-wise than the other; - `epix_slide()`, for sliding a custom computation to a data archive over local windows in time, much like `epi_slide` for an `epi_df` object, but with one diff --git a/data-raw/archive_cases_dv_subset.R b/data-raw/archive_cases_dv_subset.R index b1c1071c..36750b02 100644 --- a/data-raw/archive_cases_dv_subset.R +++ b/data-raw/archive_cases_dv_subset.R @@ -3,32 +3,41 @@ library(epiprocess) library(data.table) library(dplyr) -archive_cases_dv_subset <- covidcast( +dv_subset <- covidcast( data_source = "doctor-visits", signals = "smoothed_adj_cli", time_type = "day", geo_type = "state", - time_value = epirange(20200601, 20211201), + time_values = epirange(20200601, 20211201), geo_values = "ca,fl,ny,tx", issues = epirange(20200601, 20211201) -) %>% +) %>% fetch_tbl() %>% select(geo_value, time_value, version = issue, percent_cli = value) %>% - as_epi_archive() + # We're using compactify=FALSE here and below to avoid some testthat test + # failures on tests that were based on a non-compactified version. + as_epi_archive(compactify=FALSE) case_rate_subset <- covidcast( data_source = "jhu-csse", signals = "confirmed_7dav_incidence_prop", time_type = "day", geo_type = "state", - time_value = epirange(20200601, 20211201), + time_values = epirange(20200601, 20211201), geo_values = "ca,fl,ny,tx", issues = epirange(20200601, 20211201) ) %>% fetch_tbl() %>% select(geo_value, time_value, version = issue, case_rate_7d_av = value) %>% - as_epi_archive() + as_epi_archive(compactify=FALSE) -epix_merge(archive_cases_dv_subset, case_rate_subset, all = TRUE) +archive_cases_dv_subset = epix_merge(dv_subset, case_rate_subset, + sync="locf", + compactify=FALSE) -usethis::use_data(archive_cases_dv_subset, overwrite = TRUE) +# If we directly store an epi_archive R6 object as data, it will store its class +# implementation there as well. To prevent mismatches between these stored +# implementations and the latest class definition, don't store them as R6 +# objects; store the DT and construct the R6 object on request. +archive_cases_dv_subset_dt = archive_cases_dv_subset$DT +usethis::use_data(archive_cases_dv_subset_dt, overwrite = TRUE, internal = TRUE) diff --git a/data/archive_cases_dv_subset.rda b/data/archive_cases_dv_subset.rda deleted file mode 100644 index 8dd7c126..00000000 Binary files a/data/archive_cases_dv_subset.rda and /dev/null differ diff --git a/man/as_epi_archive.Rd b/man/as_epi_archive.Rd index 4fb7c1eb..a98798cc 100644 --- a/man/as_epi_archive.Rd +++ b/man/as_epi_archive.Rd @@ -9,7 +9,10 @@ as_epi_archive( geo_type, time_type, other_keys, - additional_metadata = list() + additional_metadata = list(), + compactify = NULL, + clobberable_versions_start = max_version_with_row_in(x), + versions_end = max_version_with_row_in(x) ) } \arguments{ @@ -31,6 +34,52 @@ apart from "geo_value", "time_value", and "version".} \item{additional_metadata}{List of additional metadata to attach to the \code{epi_archive} object. The metadata will have \code{geo_type} and \code{time_type} fields; named entries from the passed list or will be included as well.} + +\item{compactify}{Optional; Boolean or \code{NULL}: should we remove rows that are +considered redundant for the purposes of \code{epi_archive}'s built-in methods +such as \code{as_of}? As these methods use the last version of each observation +carried forward (LOCF) to interpolate between the version data provided, +rows that don't change these LOCF results can potentially be omitted to +save space. \code{TRUE} will remove these rows, \code{FALSE} will not, and missing or +\code{NULL} will remove these rows and issue a warning. Generally, this can be +set to \code{TRUE}, but if you directly inspect or edit the fields of the +\code{epi_archive} such as its \code{DT}, you will have to determine whether +\code{compactify=TRUE} will produce the desired results. If compactification +here is removing a large proportion of the rows, this may indicate a +potential for space, time, or bandwidth savings upstream the data pipeline, +e.g., when fetching, storing, or preparing the input data \code{x}} + +\item{clobberable_versions_start}{Optional; \code{length}-1; either a value of the +same \code{class} and \code{typeof} as \code{x$version}, or an \code{NA} of any \code{class} and +\code{typeof}: specifically, either (a) the earliest version that could be +subject to "clobbering" (being overwritten with different update data, but +using the same version tag as the old update data), or (b) \code{NA}, to +indicate that no versions are clobberable. There are a variety of reasons +why versions could be clobberable, such as upstream hotfixes to the latest +version, or delays in data synchronization that were mistaken for versions +with no updates; potential causes vary between different data pipelines. +The default value is \code{max_version_with_row_in(x)}; this default assumes +that (i) if a row in \code{x} (even one that \code{compactify} would consider +redundant) is present with version \code{ver}, then all previous versions must +be finalized and non-clobberable, although \code{ver} (and onward) might still +be modified, (ii) even if we have "observed" empty updates for some +versions beyond \code{max(x$version)} (as indicated by \code{versions_end}; +see below), we can't assume \code{max(x$version)} has been finalized, because we +might see a nonfinalized version + empty subsequent versions due to +upstream database replication delays in combination with the upstream +replicas using last-version-carried-forward to extrapolate that there were +no updates, (iii) "redundant" update rows that would be removed by +\code{compactify} are not redundant, and actually come from an explicit version +release that indicates that preceding versions are finalized. If \code{nrow(x) == 0}, then this argument is mandatory.} + +\item{versions_end}{Optional; length-1, same \code{class} and \code{typeof} as +\code{x$version}: what is the last version we have observed? The default is +\code{max_version_with_row_in(x)}, but values greater than this could also be +valid, and would indicate that we observed additional versions of the data +beyond \code{max(x$version)}, but they all contained empty updates. (The default +value of \code{clobberable_versions_start} does not fully trust these empty +updates, and assumes that any version \verb{>= max(x$version)} could be +clobbered.) If \code{nrow(x) == 0}, then this argument is mandatory.} } \value{ An \code{epi_archive} object. @@ -71,17 +120,17 @@ toy_epi_archive df <- data.frame (geo_value = c(replicate(2, "ca"), replicate(2, "fl")), county = c(1, 3, 2, 5), time_value = c("2020-06-01", - "2020-06-02", - "2020-06-01", - "2020-06-02"), + "2020-06-02", + "2020-06-01", + "2020-06-02"), version = c("2020-06-02", - "2020-06-03", - "2020-06-02", - "2020-06-03"), + "2020-06-03", + "2020-06-02", + "2020-06-03"), cases = c(1, 2, 3, 4), cases_rate = c(0.01, 0.02, 0.01, 0.05)) x <- df \%>\% as_epi_archive(geo_type = "state", - time_type = "day", - other_keys = "county") + time_type = "day", + other_keys = "county") } diff --git a/man/epi_archive.Rd b/man/epi_archive.Rd index 93a33dd5..026f27e1 100644 --- a/man/epi_archive.Rd +++ b/man/epi_archive.Rd @@ -32,17 +32,31 @@ Note that there can only be a single row per unique combination of key variables, and thus the key variables are critical for figuring out how to generate a snapshot of data from the archive, as of a given version. -In general, last observation carried forward (LOCF) is used to data in -between recorded versions. Currently, deletions must be represented as -revising a row to a special state (e.g., making the entries \code{NA} or -including a special column that flags the data as removed and performing -some kind of post-processing), and the archive is unaware of what this -state is. +In general, the last version of each observation is carried forward (LOCF) to +fill in data between recorded versions, and between the last recorded +update and the \code{versions_end}. One consequence is that the \code{DT} +doesn't have to contain a full snapshot of every version (although this +generally works), but can instead contain only the rows that are new or +changed from the previous version (see \code{compactify}, which does this +automatically). Currently, deletions must be represented as revising a row +to a special state (e.g., making the entries \code{NA} or including a special +column that flags the data as removed and performing some kind of +post-processing), and the archive is unaware of what this state is. Note +that \code{NA}s \emph{can} be introduced by \code{epi_archive} methods for other reasons, +e.g., in \code{\link{epix_fill_through_version}} and \code{\link{epix_merge}}, if requested, to +represent potential update data that we do not yet have access to; or in +\code{\link{epix_merge}} to represent the "value" of an observation before the +version in which it was first released, or if no version of that +observation appears in the archive data at all. \strong{A word of caution:} R6 objects, unlike most other objects in R, have reference semantics. A primary consequence of this is that objects are not copied when modified. You can read more about this in Hadley Wickham's -\href{https://adv-r.hadley.nz/r6.html#r6-semantics}{Advanced R} book. +\href{https://adv-r.hadley.nz/r6.html#r6-semantics}{Advanced R} book. In order +to construct a modified archive while keeping the original intact, first +make a clone using the \verb{$clone} method, then overwrite the clone's \code{DT} +field with \code{data.table::copy(clone$DT)}, and finally perform the +modifications on the clone. } \section{Metadata}{ @@ -68,7 +82,7 @@ An \code{epi_archive} object can be used to generate a snapshot of the data in \code{epi_df} format, which represents the most up-to-date values of the signal variables, as of the specified version. This is accomplished by calling the \code{as_of()} method for an \code{epi_archive} object \code{x}. More details on this -method are documented in the wrapper function \code{epix_as_of()}. +method are documented in the wrapper function \code{\link[=epix_as_of]{epix_as_of()}}. } \section{Sliding Computations}{ @@ -80,7 +94,7 @@ the way \code{epi_slide()} works for an \code{epi_df} object, but with one key difference: it is version-aware. That is, for an \code{epi_archive} object, the sliding computation at any given reference time point t is performed on \strong{data that would have been available as of t}. More details on \code{slide()} -are documented in the wrapper function \code{epix_slide()}. +are documented in the wrapper function \code{\link[=epix_slide]{epix_slide()}}. } \examples{ @@ -103,6 +117,7 @@ toy_epi_archive \item \href{#method-epi_archive-new}{\code{epi_archive$new()}} \item \href{#method-epi_archive-print}{\code{epi_archive$print()}} \item \href{#method-epi_archive-as_of}{\code{epi_archive$as_of()}} +\item \href{#method-epi_archive-fill_through_version}{\code{epi_archive$fill_through_version()}} \item \href{#method-epi_archive-merge}{\code{epi_archive$merge()}} \item \href{#method-epi_archive-slide}{\code{epi_archive$slide()}} \item \href{#method-epi_archive-clone}{\code{epi_archive$clone()}} @@ -114,7 +129,16 @@ toy_epi_archive \subsection{Method \code{new()}}{ Creates a new \code{epi_archive} object. \subsection{Usage}{ -\if{html}{\out{
}}\preformatted{epi_archive$new(x, geo_type, time_type, other_keys, additional_metadata)}\if{html}{\out{
}} +\if{html}{\out{
}}\preformatted{epi_archive$new( + x, + geo_type, + time_type, + other_keys, + additional_metadata, + compactify, + clobberable_versions_start, + versions_end +)}\if{html}{\out{
}} } \subsection{Arguments}{ @@ -138,6 +162,26 @@ apart from "geo_value", "time_value", and "version".} \item{\code{additional_metadata}}{List of additional metadata to attach to the \code{epi_archive} object. The metadata will have \code{geo_type} and \code{time_type} fields; named entries from the passed list or will be included as well.} + +\item{\code{compactify}}{Optional; Boolean or \code{NULL}: should we remove rows that are +considered redundant for the purposes of \code{epi_archive}'s built-in methods +such as \code{as_of}? As these methods use the last version of each observation +carried forward (LOCF) to interpolate between the version data provided, +rows that don't change these LOCF results can potentially be omitted to +save space while maintaining the same behavior (with the help of the +\code{clobberable_versions_start} and \code{versions_end} fields in some +edge cases). \code{TRUE} will remove these rows, \code{FALSE} will not, and missing +or \code{NULL} will remove these rows and issue a warning. Generally, this can +be set to \code{TRUE}, but if you directly inspect or edit the fields of the +\code{epi_archive} such as its \code{DT}, you will have to determine whether +\code{compactify=TRUE} will produce the desired results. If compactification +here is removing a large proportion of the rows, this may indicate a +potential for space, time, or bandwidth savings upstream the data pipeline, +e.g., when fetching, storing, or preparing the input data \code{x}} + +\item{\code{clobberable_versions_start}}{Optional; as in \code{\link{as_epi_archive}}} + +\item{\code{versions_end}}{Optional; as in \code{\link{as_epi_archive}}} } \if{html}{\out{}} } @@ -164,30 +208,70 @@ An \code{epi_archive} object. \if{latex}{\out{\hypertarget{method-epi_archive-as_of}{}}} \subsection{Method \code{as_of()}}{ Generates a snapshot in \code{epi_df} format as of a given version. -See the documentation for the wrapper function \code{epix_as_of()} for details. +See the documentation for the wrapper function \code{\link[=epix_as_of]{epix_as_of()}} for details. \subsection{Usage}{ \if{html}{\out{
}}\preformatted{epi_archive$as_of(max_version, min_time_value = -Inf)}\if{html}{\out{
}} } +} +\if{html}{\out{
}} +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-epi_archive-fill_through_version}{}}} +\subsection{Method \code{fill_through_version()}}{ +Fill in unobserved history using requested scheme by mutating +\code{self} and potentially reseating its fields. See +\code{\link{epix_fill_through_version}} for a full description of the non-R6-method +version, which doesn't mutate the input archive but might alias its fields. +\subsection{Usage}{ +\if{html}{\out{
}}\preformatted{epi_archive$fill_through_version(fill_versions_end, how = c("na", "locf"))}\if{html}{\out{
}} +} + +\subsection{Arguments}{ +\if{html}{\out{
}} +\describe{ +\item{\code{fill_versions_end}}{as in \code{\link{epix_fill_through_version}}} + +\item{\code{how}}{as in \code{\link{epix_fill_through_version}}} +} +\if{html}{\out{
}} +} } \if{html}{\out{
}} \if{html}{\out{}} \if{latex}{\out{\hypertarget{method-epi_archive-merge}{}}} \subsection{Method \code{merge()}}{ -Merges another \code{data.table} with the current one, and allows for -a post-filling of \code{NA} values by last observation carried forward (LOCF). -See the documentation for the wrapper function \code{epix_merge()} for details. +Merges another \code{epi_archive} with the current one, mutating the +current one by reseating its \code{DT} and several other fields, but avoiding +mutation of the old \code{DT}; returns the current archive +\link[base:invisible]{invisibly}. See \code{\link{epix_merge}} for a full description +of the non-R6-method version, which does not mutate either archive, and +does not alias either archive's \code{DT}. \subsection{Usage}{ -\if{html}{\out{
}}\preformatted{epi_archive$merge(y, ..., locf = TRUE, nan = NA)}\if{html}{\out{
}} +\if{html}{\out{
}}\preformatted{epi_archive$merge( + y, + sync = c("forbid", "na", "locf", "truncate"), + compactify = TRUE +)}\if{html}{\out{
}} } +\subsection{Arguments}{ +\if{html}{\out{
}} +\describe{ +\item{\code{y}}{as in \code{\link{epix_merge}}} + +\item{\code{sync}}{as in \code{\link{epix_merge}}} + +\item{\code{compactify}}{as in \code{\link{epix_merge}}} +} +\if{html}{\out{
}} +} } \if{html}{\out{
}} \if{html}{\out{}} \if{latex}{\out{\hypertarget{method-epi_archive-slide}{}}} \subsection{Method \code{slide()}}{ Slides a given function over variables in an \code{epi_archive} -object. See the documentation for the wrapper function \code{epix_as_of()} for +object. See the documentation for the wrapper function \code{\link[=epix_slide]{epix_slide()}} for details. \subsection{Usage}{ \if{html}{\out{
}}\preformatted{epi_archive$slide( diff --git a/man/epix_as_of.Rd b/man/epix_as_of.Rd index b5d5969c..4053cd28 100644 --- a/man/epix_as_of.Rd +++ b/man/epix_as_of.Rd @@ -44,6 +44,24 @@ is equivalent to: epix_as_of(x = archive_cases_dv_subset, max_version = max(archive_cases_dv_subset$DT$version)) -# no warning shown -epix_as_of(archive_cases_dv_subset, max_version = as.Date("2020-06-10")) + +range(archive_cases_dv_subset$DT$version) # 2020-06-02 -- 2021-12-01 + +epix_as_of(x = archive_cases_dv_subset, + max_version = as.Date("2020-06-12")) + +# When fetching a snapshot as of the latest version with update data in the +# archive, a warning is issued by default, as this update data might not yet +# be finalized (for example, if data versions are labeled with dates, these +# versions might be overwritten throughout the corresponding days with +# additional data or "hotfixes" of erroroneous data; when we build an archive +# based on database queries, the latest available update might still be +# subject to change, but previous versions should be finalized). We can +# muffle such warnings with the following pattern: +withCallingHandlers({ + epix_as_of(x = archive_cases_dv_subset, + max_version = max(archive_cases_dv_subset$DT$version)) +}, epiprocess__snapshot_as_of_clobberable_version = function(wrn) invokeRestart("muffleWarning")) +# Since R 4.0, there is a `globalCallingHandlers` function that can be used +# to globally toggle these warnings. } diff --git a/man/epix_fill_through_version.Rd b/man/epix_fill_through_version.Rd new file mode 100644 index 00000000..a1f27592 --- /dev/null +++ b/man/epix_fill_through_version.Rd @@ -0,0 +1,44 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/methods-epi_archive.R +\name{epix_fill_through_version} +\alias{epix_fill_through_version} +\title{\code{epi_archive} with unobserved history filled in (won't mutate, might alias)} +\usage{ +epix_fill_through_version(x, fill_versions_end, how = c("na", "locf")) +} +\arguments{ +\item{x}{An \code{epi_archive}} + +\item{fill_versions_end}{Length-1, same class&type as \verb{\%s$version}: the +version through which to fill in missing version history; this will be the +result's \verb{$versions_end} unless it already had a later +\verb{$versions_end}.} + +\item{how}{Optional; \code{"na"} or \code{"locf"}: \code{"na"} will fill in any missing +required version history with \code{NA}s, by inserting (if necessary) an update +immediately after the current \verb{$versions_end} that revises all +existing measurements to be \code{NA} (this is only supported for \code{version} +classes with a \code{next_after} implementation); \code{"locf"} will fill in missing +version history with the last version of each observation carried forward +(LOCF), by leaving the update \verb{$DT} alone (other \code{epi_archive} methods are +based on LOCF). Default is \code{"na"}.} +} +\value{ +An \code{epi_archive} +} +\description{ +Sometimes, due to upstream data pipeline issues, we have to work with a +version history that isn't completely up to date, but with functions that +expect archives that are completely up to date, or equally as up-to-date as +another archive. This function provides one way to approach such mismatches: +pretend that we've "observed" additional versions, filling in these versions +with NAs or extrapolated values. +} +\details{ +'\code{epix_fill_through_version} will not mutate its \code{x} argument, but its result +might alias fields of \code{x} (e.g., mutating the result's \code{DT} might mutate +\code{x$DT}). The R6 method variant, \code{x$fill_through_version}, will mutate \code{x} to +give the result, but might reseat its fields (e.g., references to the old +\code{x$DT} might not be updated by this function or subsequent operations on +\code{x}), and returns the updated \code{x} \link[base:invisible]{invisibly}. +} diff --git a/man/epix_merge.Rd b/man/epix_merge.Rd index 3d1b2e1c..09f67fa2 100644 --- a/man/epix_merge.Rd +++ b/man/epix_merge.Rd @@ -4,56 +4,70 @@ \alias{epix_merge} \title{Merge two \code{epi_archive} objects} \usage{ -epix_merge(x, y, ..., locf = TRUE, nan = NA) +epix_merge( + x, + y, + sync = c("forbid", "na", "locf", "truncate"), + compactify = TRUE +) } \arguments{ -\item{x, y}{Two \code{epi_archive} objects to join together, or more specifically, -whose underlying data tables are to be joined together. The data table in -\code{x} will be overwritten with the joined data table. For convenience, we -also allow \code{y} to be passed in directly as a \code{data.table} (need not be an -\code{epi_archive} object).} +\item{x, y}{Two \code{epi_archive} objects to join together.} -\item{...}{Named arguments to pass to \code{data.table::merge.data.table()}, which -is used for the join (with all default settings as in this function). For -example, passing \code{all = TRUE} will perform a full join.} +\item{sync}{Optional; \code{"forbid"}, \code{"na"}, \code{"locf"}, or \code{"truncate"}; in the +case that \code{x$versions_end} doesn't match \code{y$versions_end}, what do we do?: +\code{"forbid"}: emit an error; "na": use \code{max(x$versions_end, y$versions_end)} +as the result's \code{versions_end}, but ensure that, if we request a snapshot +as of a version after \code{min(x$versions_end, y$versions_end)}, the +observation columns from the less up-to-date archive will be all NAs (i.e., +imagine there was an update immediately after its \code{versions_end} which +revised all observations to be \code{NA}); \code{"locf"}: use \code{max(x$versions_end, y$versions_end)} as the result's \code{versions_end}, allowing the last version +of each observation to be carried forward to extrapolate unavailable +versions for the less up-to-date input archive (i.e., imagining that in the +less up-to-date archive's data set remained unchanged between its actual +\code{versions_end} and the other archive's \code{versions_end}); or \code{"truncate"}: +use \code{min(x$versions_end, y$versions_end)} as the result's \code{versions_end}, +and discard any rows containing update rows for later versions.} -\item{locf}{Should LOCF be used after joining on all non-key columns? This -will take the latest version of each signal value and propogate it forward -to fill in gaps that appear after merging. Default is \code{TRUE}.} - -\item{nan}{Should \code{NaN} values be treated as \code{NA} values in the post-filling -step? Default is \code{NA}, which means that they are treated as \code{NA} values; if} +\item{compactify}{Optional; \code{TRUE}, \code{FALSE}, or \code{NULL}; should the result be +compactified? See \code{\link{as_epi_archive}} for an explanation of what this means. +Default here is \code{TRUE}.} } \value{ -Nothing; the data table in \code{x} is overwritten with the merged one. +the resulting \code{epi_archive} } \description{ -Merges the underlying data tables in two \code{epi_archive} objects, allows for -post-filling of \code{NA} values by last observation carried forward (LOCF), and -\strong{overwrites} the first data table with the merged one. See the \href{https://cmu-delphi.github.io/epiprocess/articles/archive.html}{archive vignette} for -examples. +Merges two \code{epi_archive}s that share a common \code{geo_value}, \code{time_value}, and +set of key columns. When they also share a common \code{versions_end}, +using \verb{$as_of} on the result should be the same as using \verb{$as_of} on \code{x} and +\code{y} individually, then performing a full join of the \code{DT}s on the non-version +key columns (potentially consolidating multiple warnings about clobberable +versions). If the \code{versions_end} values differ, the +\code{sync} parameter controls what is done. } \details{ -This is simply a wrapper around the \code{merge()} method of the -\code{epi_archive} class, so if \code{x} and \code{y} are an \code{epi_archive} objects, then: - -\if{html}{\out{
}}\preformatted{epix_merge(x, y) -}\if{html}{\out{
}} - -is equivalent to: +This function, \code{\link{epix_merge}}, does not mutate its inputs and will not alias +either archive's \code{DT}, but may alias other fields; \code{x$merge} will overwrite +\code{x} with the result of the merge, reseating its \code{DT} and several other fields +(making them point to different objects), but avoiding mutation of the +contents of the old \code{DT} (only relevant if you have another reference to the +old \code{DT} in another object). -\if{html}{\out{
}}\preformatted{x$merge(y) -}\if{html}{\out{
}} +In all cases, \code{additional_metadata} will be an empty list, and +\code{clobberable_versions_start} will be set to the earliest version that could +be clobbered in either input archive. } \examples{ # create two example epi_archive datasets x <- archive_cases_dv_subset$DT \%>\% dplyr::select(geo_value,time_value,version,case_rate_7d_av) \%>\% - as_epi_archive() + as_epi_archive(compactify=TRUE) y <- archive_cases_dv_subset$DT \%>\% dplyr::select(geo_value,time_value,version,percent_cli) \%>\% - as_epi_archive() + as_epi_archive(compactify=TRUE) +# merge results stored in a third object: +xy = epix_merge(x, y) +# vs. mutating x to hold the merge result: +x$merge(y) -# a full join stored in x -epix_merge(x, y, all = TRUE) } diff --git a/man/max_version_with_row_in.Rd b/man/max_version_with_row_in.Rd new file mode 100644 index 00000000..0b2c6deb --- /dev/null +++ b/man/max_version_with_row_in.Rd @@ -0,0 +1,18 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/archive.R +\name{max_version_with_row_in} +\alias{max_version_with_row_in} +\title{Default arg helper: \code{max(x$version)}, with error if \code{x} has 0 rows} +\usage{ +max_version_with_row_in(x) +} +\arguments{ +\item{x}{\code{x} argument of \code{\link{as_epi_archive}}} +} +\value{ +\code{max(x$version)} if it has any rows; raises error if it has 0 rows or +an \code{NA} version value +} +\description{ +Exported to make defaults more easily copyable. +} diff --git a/man/next_after.Rd b/man/next_after.Rd new file mode 100644 index 00000000..5170e8d9 --- /dev/null +++ b/man/next_after.Rd @@ -0,0 +1,17 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/archive.R +\name{next_after} +\alias{next_after} +\title{Get the next possible value greater than \code{x} of the same type} +\usage{ +next_after(x) +} +\arguments{ +\item{x}{the starting "value"(s)} +} +\value{ +same class, typeof, and length as \code{x} +} +\description{ +Get the next possible value greater than \code{x} of the same type +} diff --git a/tests/testthat/test-archive-version-bounds.R b/tests/testthat/test-archive-version-bounds.R new file mode 100644 index 00000000..d4c94e09 --- /dev/null +++ b/tests/testthat/test-archive-version-bounds.R @@ -0,0 +1,109 @@ +test_that("`validate_version_bound` allows/catches `NA` as requested", { + my_version_bound = NA + validate_version_bound(my_version_bound, na_ok=TRUE) + expect_error(validate_version_bound(my_version_bound, na_ok=FALSE), + class="epiprocess__my_version_bound_is_na") + # Note that if the error class name changes, this test may produce some + # confusing output along the following lines: + # + # > Error in `$<-.data.frame`(`*tmp*`, "call_text", value = c("testthat::expect_error(...)", : + # > replacement has 5 rows, data has 3 +}) + +test_that("`validate_version_bound` catches bounds that are the wrong length", { + my_version_bound1a = NULL + expect_error(validate_version_bound(my_version_bound1a, na_ok=TRUE), + class="epiprocess__my_version_bound1a_is_not_length_1") + my_version_bound1b = integer(0L) + expect_error(validate_version_bound(my_version_bound1b, na_ok=TRUE), + class="epiprocess__my_version_bound1b_is_not_length_1") + my_version_bound2 = c(2, 10) + expect_error(validate_version_bound(my_version_bound2, na_ok=TRUE), + class="epiprocess__my_version_bound2_is_not_length_1") +}) + +test_that("`validate_version_bound` validate and class checks together allow and catch as intended", { + my_int = 5L + my_dbl = 5 + my_list = list(5L) + my_date = as.Date("2000-01-01") + my_datetime = vctrs::vec_cast(my_date, as.POSIXct(as.Date("1900-01-01"))) + # When first drafted, this validate function was a (validate+)cast function, + # which used vctrs::vec_cast inside. However, the initial implementation + # didn't actually allow casting to occur, and it was easier to change to the + # current stringent validation than to think about what exactly casts to + # allow. Some of the tests here were motivated based on that setup and have + # been kept around. For example, we wouldn't want to allow casts between dates + # and POSIXct's, because there are tz gotchas; these first couple of checks + # detect that we have a validate-compatible date and datetime to make sure we + # can properly help ward off the gotchas if switching to a cast rather than a + # validate. + expect_identical(vctrs::vec_cast(my_datetime, my_date), my_date) + expect_identical(vctrs::vec_cast(my_date, my_datetime), my_datetime) + # + x_int = tibble::tibble(version = my_int) + x_dbl = tibble::tibble(version = my_dbl) + x_list = tibble::tibble(version = my_list) + x_date = tibble::tibble(version = my_date) + x_datetime = tibble::tibble(version = my_datetime) + # Custom classes matter (test vectors and non-vctrs-specialized lists separately): + my_version_bound1 = `class<-`(24, "c1") + expect_error(validate_version_bound(my_version_bound1, x_int, na_ok=FALSE), + class="epiprocess__my_version_bound1_has_invalid_class_or_typeof") + my_version_bound2 = `class<-`(list(12), c("c2a","c2b","c2c")) + expect_error(validate_version_bound(my_version_bound2, x_list, na_ok=FALSE), + class="epiprocess__my_version_bound2_has_invalid_class_or_typeof") + # Want no error matching date to date or datetime to datetime, but no interop due to tz issues: + validate_version_bound(my_date, x_date, version_bound_arg="vb") + validate_version_bound(my_datetime, x_datetime, version_bound_arg="vb") + expect_error(validate_version_bound(my_datetime, x_date, na_ok=TRUE, version_bound_arg="vb"), + class="epiprocess__vb_has_invalid_class_or_typeof") + expect_error(validate_version_bound(my_date, x_datetime, na_ok=TRUE, version_bound_arg="vb"), + class="epiprocess__vb_has_invalid_class_or_typeof") + # Bad: + expect_error(validate_version_bound(3.5, x_int, TRUE, "vb")) + expect_error(validate_version_bound(.Machine$integer.max, x_dbl, TRUE, "vb")) + expect_error(validate_version_bound(`class<-`(list(2), "clazz"), + tibble::tibble(version=`class<-`(5L, "clazz")), TRUE, "vb")) + # Maybe questionable: + expect_error(validate_version_bound(3, x_int, TRUE, "vb")) + expect_error(validate_version_bound(3L, x_dbl, TRUE, "vb")) + # Good: + validate_version_bound(my_int, x_int, TRUE, "vb") + validate_version_bound(my_dbl, x_dbl, TRUE, "vb") + validate_version_bound(my_list, x_list, TRUE, "vb") + validate_version_bound(my_date, x_date, TRUE, "vb") + validate_version_bound(my_datetime, x_datetime, TRUE, "vb") +}) + +test_that("archive version bounds args work as intended", { + measurement_date = as.Date("2000-01-01") + update_tbl = tibble::tibble( + geo_value = "g1", + time_value = measurement_date, + version = measurement_date + 1:5, + value = 1:5 + ) + expect_error(as_epi_archive(update_tbl, + clobberable_versions_start = 1241, + versions_end = measurement_date), + class="epiprocess__clobberable_versions_start_has_invalid_class_or_typeof") + expect_error(as_epi_archive(update_tbl[integer(0L),]), + class="epiprocess__max_version_cannot_be_used") + expect_error(as_epi_archive(update_tbl, + clobberable_versions_start = NA, + versions_end = measurement_date), + class="epiprocess__versions_end_earlier_than_updates") + expect_error(as_epi_archive(update_tbl, + clobberable_versions_start=measurement_date+6L, + versions_end = measurement_date+5L), + class="epiprocess__versions_end_earlier_than_clobberable_versions_start") + expect_error(as_epi_archive(update_tbl, versions_end = NA), + regexp="versions_end.*must not satisfy.*is.na") + ea_default = as_epi_archive(update_tbl) + ea_default$as_of(measurement_date+4L) + expect_warning(ea_default$as_of(measurement_date+5L), + class = "epiprocess__snapshot_as_of_clobberable_version") + expect_error(ea_default$as_of(measurement_date+6L), + regexp = "max_version.*at most.*versions_end") +}) diff --git a/tests/testthat/test-compactify.R b/tests/testthat/test-compactify.R new file mode 100644 index 00000000..962747d9 --- /dev/null +++ b/tests/testthat/test-compactify.R @@ -0,0 +1,114 @@ +library(epiprocess) +library(data.table) +library(dplyr) + +dt <- archive_cases_dv_subset$DT +dt <- filter(dt,geo_value == "ca") %>% + filter(version <= "2020-06-15") %>% + select(-case_rate_7d_av) + +test_that("Input for compactify must be NULL or a boolean", { + expect_error(as_epi_archive(dt,compactify="no")) +}) + +dt$percent_cli <- c(1:80) +dt$case_rate <- c(1:80) + +row_replace <- function(dt,row,x,y) { + # (This way of "replacing" elements appears to use copy-on-write even though + # we are working with a data.table.) + dt[row,4] <- x + dt[row,5] <- y + dt +} + +# Note that compactify is working on version-wise LOCF (last version of each +# observation carried forward) + +# Rows 1 should not be eliminated even if NA +dt <- row_replace(dt,1,NA,NA) # Not LOCF + +# NOTE! We are assuming that there are no NA's in geo_value, time_value, +# and version. Even though compactify may erroneously remove the first row +# if it has all NA's, we are not testing this behaviour for now as this dataset +# has problems beyond the scope of this test + +# Rows 11 and 12 correspond to different time_values +dt <- row_replace(dt,12,11,11) # Not LOCF + +# Rows 20 and 21 only differ in version +dt <- row_replace(dt,21,20,20) # LOCF + +# Rows 21 and 22 only differ in version +dt <- row_replace(dt,22,20,20) # LOCF + +# Row 39 comprises the first NA's +dt <- row_replace(dt,39,NA,NA) # Not LOCF + +# Row 40 has two NA's, just like its lag, row 39 +dt <- row_replace(dt,40,NA,NA) # LOCF + +# Row 62's values already exist in row 15, but row 15 is not a preceding row +dt <- row_replace(dt,62,15,15) # Not LOCF + +# Row 73 only has one value carried over +dt <- row_replace(dt,74,73,74) # Not LOCF + +dt_true <- as_tibble(as_epi_archive(dt,compactify=TRUE)$DT) +dt_false <- as_tibble(as_epi_archive(dt,compactify=FALSE)$DT) +dt_null <- suppressWarnings(as_tibble(as_epi_archive(dt,compactify=NULL)$DT)) + +test_that("Warning for LOCF with compactify as NULL", { + expect_warning(as_epi_archive(dt,compactify=NULL)) +}) + +test_that("No warning when there is no LOCF", { + expect_warning(as_epi_archive(dt[1:5],compactify=NULL),NA) +}) + +test_that("LOCF values are ignored with compactify=FALSE", { + expect_identical(nrow(dt),nrow(dt_false)) +}) + +test_that("LOCF values are taken out with compactify=TRUE", { + dt_test <- as_tibble(as_epi_archive(dt[-c(21,22,40),],compactify=FALSE)$DT) + + expect_identical(dt_true,dt_null) + expect_identical(dt_null,dt_test) +}) + +test_that("as_of produces the same results with compactify=TRUE as with compactify=FALSE", { + ea_true <- as_epi_archive(dt,compactify=TRUE) + ea_false <- as_epi_archive(dt,compactify=FALSE) + + # Row 22, an LOCF row corresponding to the latest version, is omitted in + # ea_true + latest_version = max(ea_false$DT$version) + expect_warning({ + as_of_true <- ea_true$as_of(latest_version) + }, class = "epiprocess__snapshot_as_of_clobberable_version") + expect_warning({ + as_of_false <- ea_false$as_of(latest_version) + }, class = "epiprocess__snapshot_as_of_clobberable_version") + + expect_identical(as_of_true,as_of_false) +}) + +test_that("compactify does not alter the default clobberable and observed version bounds", { + x = tibble::tibble( + geo_value = "geo1", + time_value = as.Date("2000-01-01"), + version = as.Date("2000-01-01") + 1:5, + value = 42L + ) + ea_true <- as_epi_archive(x, compactify=TRUE) + ea_false <- as_epi_archive(x, compactify=FALSE) + # We say that we base the bounds on the user's `x` arg. We might mess up or + # change our minds and base things on the `DT` field (or a temporary `DT` + # variable, post-compactify) instead. Check that this test would trigger + # in that case: + expect_true(max(ea_true$DT$version) != max(ea_false$DT$version)) + # The actual test: + expect_identical(ea_true$clobberable_versions_start, ea_false$clobberable_versions_start) + expect_identical(ea_true$versions_end, ea_false$versions_end) +}) diff --git a/tests/testthat/test-data.R b/tests/testthat/test-data.R new file mode 100644 index 00000000..f3d4c9d7 --- /dev/null +++ b/tests/testthat/test-data.R @@ -0,0 +1,68 @@ +test_that("`archive_cases_dv_subset` is formed successfully", { + expect_true(is_epi_archive(archive_cases_dv_subset)) +}) + +test_that("`delayed_assign_with_unregister_awareness` works as expected on good promises", { + # Since we're testing environment stuff, use some "my_" prefixes to try to + # prevent naming coincidences from changing behavior. + my_eval_env = rlang::new_environment(list(x=40L, n_evals=0L), parent=rlang::base_env()) + my_assign_env = rlang::new_environment() + delayed_assign_with_unregister_awareness("good1", { + n_evals <- n_evals + 1L + x + 2L + }, my_eval_env, my_assign_env) + force(my_assign_env[["good1"]]) + force(my_assign_env[["good1"]]) + force(my_assign_env[["good1"]]) + expect_identical(my_assign_env[["good1"]], 42L) + expect_identical(my_eval_env[["n_evals"]], 1L) +}) + +test_that("original `delayedAssign` works as expected on good promises", { + my_eval_env = rlang::new_environment(list(x=40L, n_evals=0L), parent=rlang::base_env()) + my_assign_env = rlang::new_environment() + delayedAssign("good1", { + n_evals <- n_evals + 1L + x + 2L + }, my_eval_env, my_assign_env) + force(my_assign_env[["good1"]]) + force(my_assign_env[["good1"]]) + force(my_assign_env[["good1"]]) + expect_identical(my_assign_env[["good1"]], 42L) + expect_identical(my_eval_env[["n_evals"]], 1L) +}) + +test_that("`delayed_assign_with_unregister_awareness` doesn't wrap a buggy promise if not unregistering", { + delayed_assign_with_unregister_awareness("x", Abort("msg", class="original_error_class")) + expect_error(force(x), class="original_error_class") +}) + +test_that("`delayed_assign_with_unregister_awareness` doesn't wrap a buggy promise if not unregistering", { + delayed_assign_with_unregister_awareness("x", Abort("msg", class="original_error_class")) + # Take advantage of a false positive / hedge against package renaming: make + # our own `unregister` function to trigger the special error message. + unregister = function(y) y + expect_error(unregister(force(x)), class="epiprocess__promise_evaluation_error_during_unregister") +}) + +test_that("`delayed_assign_with_unregister_awareness` injection support works", { + my_exprs = rlang::exprs(a = b + c, d = e) + delayed_assign_with_unregister_awareness( + "good2", list(!!!my_exprs), + eval.env=rlang::new_environment(list(b=2L, c=3L, e=4L), rlang::base_env()) + ) + force(good2) + expect_identical(good2, list(a=5L, d=4L)) +}) + +test_that("`some_package_is_being_unregistered` doesn't fail in response to non-simple calls", { + # Prerequisite for current implementation to work (testing here to help debug + # in case some R version doesn't obey): + expect_false(NA_character_ %in% letters) + f = function() function() some_package_is_being_unregistered() + my_expr = rlang::expr(f()()) + # Prerequisite for this to test to actually be testing on non-simple calls: + expect_false(rlang::is_call_simple(my_expr)) + # Actual test (`FALSE` is correct; `NA` or error is not): + expect_false(rlang::eval_bare(my_expr)) +}) diff --git a/tests/testthat/test-epix_fill_through_version.R b/tests/testthat/test-epix_fill_through_version.R new file mode 100644 index 00000000..03e9c504 --- /dev/null +++ b/tests/testthat/test-epix_fill_through_version.R @@ -0,0 +1,106 @@ + +test_that("epix_fill_through_version mirrors input when it is sufficiently up to date", { + ea_orig = as_epi_archive(data.table::data.table(geo_value = "g1", time_value = as.Date("2020-01-01"), + version = 1:5, value = 1:5)) + some_earlier_observed_version = 2L + ea_trivial_fill_na1 = epix_fill_through_version(ea_orig, some_earlier_observed_version, "na") + ea_trivial_fill_na2 = epix_fill_through_version(ea_orig, ea_orig$versions_end, "na") + ea_trivial_fill_locf = epix_fill_through_version(ea_orig, some_earlier_observed_version, "locf") + # Below, we want R6 objects to be compared based on contents rather than + # addresses. We appear to get this with `expect_identical` in `testthat` + # edition 3, which is based on `waldo::compare` rather than `base::identical`; + # `waldo::compare` in waldo >=0.3.1 appears (as of 0.4.0) to compare R6 + # objects by contents rather than address (in a way that is tested but maybe + # not guaranteed via user docs). Use `local_edition` to ensure we use edition + # 3 here. + local_edition(3) + expect_identical(ea_orig, ea_trivial_fill_na1) + expect_identical(ea_orig, ea_trivial_fill_na2) + expect_identical(ea_orig, ea_trivial_fill_locf) +}) + +test_that("epix_fill_through_version can extend observed versions, gives expected `as_of`s", { + ea_orig = as_epi_archive(data.table::data.table( + geo_value = "g1", + time_value = as.Date("2020-01-01") + c(rep(0L,5L), 1L), + version = c(1:5, 2L), + value = 1:6)) + first_unobserved_version = 6L + later_unobserved_version = 10L + ea_fill_na = epix_fill_through_version(ea_orig, later_unobserved_version, "na") + ea_fill_locf = epix_fill_through_version(ea_orig, later_unobserved_version, "locf") + + # We use edition 3 features here, passing `ignore_attr` to `waldo::compare`. + # Ensure we are using edition 3: + local_edition(3) + withCallingHandlers({ + expect_identical(ea_fill_na$versions_end, later_unobserved_version) + expect_identical(tibble::as_tibble(ea_fill_na$as_of(first_unobserved_version)), + tibble::tibble(geo_value="g1", time_value=as.Date("2020-01-01")+0:1, value=rep(NA_integer_, 2L)), + ignore_attr = TRUE) + expect_identical(ea_fill_locf$versions_end, later_unobserved_version) + expect_identical(ea_fill_locf$as_of(first_unobserved_version), + ea_fill_locf$as_of(ea_orig$versions_end) %>% + {attr(., "metadata")$as_of <- first_unobserved_version; .}) + }, epiprocess__snapshot_as_of_clobberable_version = function(wrn) invokeRestart("muffleWarning")) +}) + +test_that("epix_fill_through_version does not mutate x", { + for (ea_orig in list( + # vanilla case + as_epi_archive(data.table::data.table(geo_value = "g1", time_value = as.Date("2020-01-01"), + version = 1:5, value = 1:5)), + # data.table unique yielding original DT by reference special case (maybe + # having only 1 row is the trigger? having no revisions of initial values + # doesn't seem sufficient to trigger) + as_epi_archive(tibble::tibble(geo_value=1L, time_value=1L, version=1L, value=10L)) + )) { + # We want to perform a strict comparison of the contents of `ea_orig` before + # and `ea_orig` after. `clone` + `expect_identical` based on waldo would + # sort of work, but we might want something stricter. `as.list` + + # `identical` plus a check of the DT seems to do the trick. + ea_orig_before_as_list = as.list(ea_orig) + ea_orig_DT_before_copy = data.table::copy(ea_orig$DT) + some_unobserved_version = 8L + # + ea_fill_na = epix_fill_through_version(ea_orig, some_unobserved_version, "na") + ea_orig_after_as_list = as.list(ea_orig) + # use identical, not expect_identical, for the R6-as-list test; latter isn't as strict + expect_true(identical(ea_orig_before_as_list, ea_orig_after_as_list)) + expect_identical(ea_orig_DT_before_copy, ea_orig$DT) + # + ea_fill_locf = epix_fill_through_version(ea_orig, some_unobserved_version, "locf") + ea_orig_after_as_list = as.list(ea_orig) + expect_true(identical(ea_orig_before_as_list, ea_orig_after_as_list)) + expect_identical(ea_orig_DT_before_copy, ea_orig$DT) + } +}) + +test_that("x$fill_through_version mutates x (if needed)", { + ea = as_epi_archive(data.table::data.table(geo_value = "g1", time_value = as.Date("2020-01-01"), + version = 1:5, value = 1:5)) + # We want the contents to change in a substantial way that makes waldo compare + # different (if the contents need to change). + ea_before_copies_as_list = lapply(ea, data.table::copy) + some_unobserved_version = 8L + ea$fill_through_version(some_unobserved_version, "na") + ea_after_copies_as_list = lapply(ea, data.table::copy) + expect_failure(expect_identical(ea_before_copies_as_list, ea_after_copies_as_list)) +}) + +test_that("{epix_,$}fill_through_version return with expected visibility", { + ea = as_epi_archive(data.table::data.table(geo_value = "g1", time_value = as.Date("2020-01-01"), + version = 1:5, value = 1:5)) + expect_true(withVisible(epix_fill_through_version(ea, 10L, "na"))[["visible"]]) + expect_false(withVisible(ea$fill_through_version(15L, "na"))[["visible"]]) +}) + +test_that("epix_fill_through_version returns same key & doesn't mutate old DT or its key", { + ea = as_epi_archive(tibble::tibble(geo_value=1L, time_value=1L, version=1L, value=10L)) + old_DT = ea$DT + old_DT_copy = data.table::copy(old_DT) + old_key = data.table::key(ea$DT) + expect_identical(data.table::key(epix_fill_through_version(ea, 5L, "na")$DT), old_key) + expect_identical(data.table::key(epix_fill_through_version(ea, 5L, "locf")$DT), old_key) + expect_identical(data.table::key(ea$DT), old_key) +}) diff --git a/tests/testthat/test-epix_merge.R b/tests/testthat/test-epix_merge.R new file mode 100644 index 00000000..51f2c3c6 --- /dev/null +++ b/tests/testthat/test-epix_merge.R @@ -0,0 +1,193 @@ + +test_that("epix_merge requires forbids on invalid `y`",{ + ea = archive_cases_dv_subset$clone() + expect_error(epix_merge(ea, data.frame(x=1))) +}) + +test_that("epix_merge merges and carries forward updates properly", { + x = as_epi_archive( + data.table::as.data.table( + tibble::tribble(~geo_value, ~time_value, ~version, ~x_value, + # same version set for x and y + "g1", 1L, 1:3, paste0("XA", 1:3), + # versions of x surround those of y + this measurement has + # max update version beyond some others + "g1", 2L, 1:5, paste0("XB", 1:5), + # mirror case + "g1", 3L, 2L, paste0("XC", 2L), + # x has 1 version, y has 0 + "g1", 4L, 1L, paste0("XD", 1L), + # non-NA values that should be carried forward + # (version-wise LOCF) in other versions, plus NAs that + # should (similarly) be carried forward as NA (latter + # wouldn't work with an ordinary merge + post-processing + # with `data.table::nafill`) + "g1", 6L, c(1L,3L,5L), paste0("XE", c(1L, NA, 5L)) + ) %>% + tidyr::unchop(c(version, x_value)) %>% + dplyr::mutate(dplyr::across(c(x_value), ~ dplyr::if_else(grepl("NA", .x), NA_character_, .x))) + ) + ) + y = as_epi_archive( + data.table::as.data.table( + tibble::tribble(~geo_value, ~time_value, ~version, ~y_value, + "g1", 1L, 1:3, paste0("YA", 1:3), + "g1", 2L, 2L, paste0("YB", 2L), + "g1", 3L, 1:5, paste0("YC", 1:5), + "g1", 5L, 1L, paste0("YD", 1L), + "g1", 6L, 1:5, paste0("YE", 1:5), + ) %>% + tidyr::unchop(c(version, y_value)) %>% + dplyr::mutate(dplyr::across(c(y_value), ~ dplyr::if_else(grepl("NA", .x), NA_character_, .x))) + ) + ) + xy = epix_merge(x, y) + xy_expected = as_epi_archive( + data.table::as.data.table( + tibble::tribble(~geo_value, ~time_value, ~version, ~x_value, ~y_value, + "g1", 1L, 1:3, paste0("XA", 1:3), paste0("YA", 1:3), + "g1", 2L, 1:5, paste0("XB", 1:5), paste0("YB", c(NA,2L,2L,2L,2L)), + "g1", 3L, 1:5, paste0("XC", c(NA,2L,2L,2L,2L)), paste0("YC", 1:5), + "g1", 4L, 1L, paste0("XD", 1L), paste0("YD", NA), + "g1", 5L, 1L, paste0("XD", NA), paste0("YD", 1L), + "g1", 6L, 1:5, paste0("XE", c(1L,1L,NA,NA,5L)), paste0("YE", 1:5), + ) %>% + tidyr::unchop(c(version, x_value, y_value)) %>% + dplyr::mutate(dplyr::across(c(x_value, y_value), ~ dplyr::if_else(grepl("NA", .x), NA_character_, .x))) + ) + ) + # We rely on testthat edition 3 expect_identical using waldo, not identical. See + # test-epix_fill_through_version.R comments for details. + local_edition(3) + expect_identical(xy, xy_expected) +}) + +test_that('epix_merge forbids and warns on metadata and naming issues', { + expect_error( + epix_merge( + as_epi_archive(tibble::tibble(geo_value="tx", time_value=1L, version=1L, x_value=1L)), + as_epi_archive(tibble::tibble(geo_value="us", time_value=1L, version=5L, y_value=2L)) + ), + regexp = "must have the same.*geo_type" + ) + expect_error( + epix_merge( + as_epi_archive(tibble::tibble(geo_value="pa", time_value=1L, version=1L, x_value=1L)), + as_epi_archive(tibble::tibble(geo_value="pa", time_value=as.Date("2020-01-01"), version=5L, y_value=2L)) + ), + regexp = "must have the same.*time_type" + ) + expect_error( + epix_merge( + as_epi_archive(tibble::tibble(geo_value=1L, time_value=1L, version=1L, value=1L)), + as_epi_archive(tibble::tibble(geo_value=1L, time_value=1L, version=1L, value=2L)) + ), + regexp = "overlapping.*names" + ) + expect_warning( + epix_merge( + as_epi_archive(tibble::tibble(geo_value=1L, time_value=1L, version=1L, x_value=1L), + additional_metadata=list("updates_fetched"=lubridate::ymd_hms("2022-05-01 16:00:00", tz="UTC"))), + as_epi_archive(tibble::tibble(geo_value=1L, time_value=1L, version=1L, y_value=2L)) + ), + regexp = "x\\$additional_metadata", + class = "epiprocess__epix_merge_ignores_additional_metadata" + ) + expect_warning( + epix_merge( + as_epi_archive(tibble::tibble(geo_value=1L, time_value=1L, version=1L, x_value=1L)), + as_epi_archive(tibble::tibble(geo_value=1L, time_value=1L, version=1L, y_value=2L), + additional_metadata=list("updates_fetched"=lubridate::ymd_hms("2022-05-01 16:00:00", tz="UTC"))) + ), + regexp = "y\\$additional_metadata", + class = "epiprocess__epix_merge_ignores_additional_metadata" + ) +}) + +# use `local` to prevent accidentally using the x, y, xy bindings here +# elsewhere, while allowing reuse across a couple tests +local({ + x = as_epi_archive(tibble::tibble(geo_value=1L, time_value=1L, version=1L, x_value=1L), + clobberable_versions_start=1L, versions_end = 10L) + y = as_epi_archive(tibble::tibble(geo_value=1L, time_value=1L, version=1L, y_value=2L), + clobberable_versions_start=3L, versions_end = 10L) + xy = epix_merge(x,y) + test_that('epix_merge considers partially-clobberable row to be clobberable', { + expect_identical(xy$clobberable_versions_start, 1L) + }) + test_that('epix_merge result uses versions_end metadata not max version val', { + expect_identical(xy$versions_end, 10L) + }) +}) + +local({ + x = as_epi_archive(tibble::tibble(geo_value=1L, time_value=1L, version=1L, x_value=10L)) + y = as_epi_archive(tibble::tibble(geo_value=1L, time_value=1L, version=5L, y_value=20L)) + print(epix_merge(x,y, sync = "na")) + test_that('epix_merge forbids on sync default or "forbid"', { + expect_error(epix_merge(x,y), + class="epiprocess__epix_merge_unresolved_sync") + expect_error(epix_merge(x,y, sync = "forbid"), + class="epiprocess__epix_merge_unresolved_sync") + }) + test_that('epix_merge sync="na" works', { + expect_equal( + epix_merge(x,y, sync = "na"), + as_epi_archive(tibble::tribble( + ~geo_value, ~time_value, ~version, ~x_value, ~y_value, + 1L, 1L, 1L, 10L, NA_integer_, # x updated, y not observed yet + 1L, 1L, 2L, NA_integer_, NA_integer_, # NA-ing out x, y not observed yet + 1L, 1L, 5L, NA_integer_, 20L, # x still NA, y updated + ), clobberable_versions_start=1L) + ) + }) + test_that('epix_merge sync="locf" works', { + expect_equal( + epix_merge(x,y, sync = "locf"), + as_epi_archive(tibble::tribble( + ~geo_value, ~time_value, ~version, ~x_value, ~y_value, + 1L, 1L, 1L, 10L, NA_integer_, # x updated, y not observed yet + 1L, 1L, 5L, 10L, 20L, # x LOCF'd, y updated + ), clobberable_versions_start=1L) + ) + }) + x_no_conflict = as_epi_archive(tibble::tibble(geo_value=1L, time_value=1L, version=1L, x_value=10L)) + y_no_conflict = as_epi_archive(tibble::tibble(geo_value=1L, time_value=1L, version=1L, y_value=20L)) + xy_no_conflict_expected = as_epi_archive(tibble::tribble( + ~geo_value, ~time_value, ~version, ~x_value, ~y_value, + 1L, 1L, 1L, 10L, 20L, # x updated, y not observed yet + )) + test_that('epix_merge sync="forbid" on no-conflict works', { + expect_equal( + epix_merge(x_no_conflict, y_no_conflict, sync = "forbid"), + xy_no_conflict_expected + ) + }) + test_that('epix_merge sync="na" on no-conflict works', { + # This test is the main reason for these no-conflict tests. We want to make + # sure that we don't add an unnecessary NA-ing-out version beyond a common + # versions_end. + expect_equal( + epix_merge(x_no_conflict, y_no_conflict, sync = "na"), + xy_no_conflict_expected + ) + }) + test_that('epix_merge sync="locf" on no-conflict works', { + expect_equal( + epix_merge(x_no_conflict, y_no_conflict, sync = "locf"), + xy_no_conflict_expected + ) + }) +}) + + +test_that('epix_merge sync="na" balks if do not know next_after', { + expect_error( + epix_merge( + as_epi_archive(tibble::tibble(geo_value=1L, time_value=1L, version=as.POSIXct(as.Date("2020-01-01")), x_value=10L)), + as_epi_archive(tibble::tibble(geo_value=1L, time_value=1L, version=as.POSIXct(as.Date("2020-01-02")), y_value=20L)), + sync = "na" + ), + regexp = "no applicable method.*next_after" + ) +}) diff --git a/tests/testthat/test-methods-epi_archive.R b/tests/testthat/test-methods-epi_archive.R index ee00b8fc..d0434f59 100644 --- a/tests/testthat/test-methods-epi_archive.R +++ b/tests/testthat/test-methods-epi_archive.R @@ -22,9 +22,10 @@ test_that("Warning against max_version being same as edf's max version",{ expect_warning(ea$as_of(max_version = min(ea$DT$version)),NA) }) -test_that("as_of properly grabs the data",{ +test_that("as_of properly grabs the data and doesn't mutate key",{ + d <- as.Date("2020-06-01") - + ea2 = tibble::tribble( ~geo_value, ~time_value, ~version, ~cases, "ca", "2020-06-01", "2020-06-01", 1, @@ -40,47 +41,118 @@ test_that("as_of properly grabs the data",{ ) %>% dplyr::mutate(dplyr::across(c(time_value, version), as.Date)) %>% as_epi_archive() - - df_as_of <- ea2 %>% - epix_as_of(max_version = as.Date("2020-06-03")) %>% - as_tibble() - - df_expected <- tibble( + + old_key = data.table::key(ea2$DT) + + edf_as_of <- ea2 %>% + epix_as_of(max_version = as.Date("2020-06-03")) + + edf_expected <- as_epi_df(tibble( geo_value = "ca", time_value = d + 0:2, cases = c(2,1,1) - ) - - expect_identical(df_as_of[[1]],df_expected[[1]]) - expect_identical(df_as_of[[2]],df_expected[[2]]) - expect_identical(df_as_of[[3]],df_expected[[3]]) -}) + ), as_of = as.Date("2020-06-03")) -# epix_merge tests -test_that("epix_merge requires second argument to be a data.table or - epi_archive",{ - expect_error(epix_merge(ea,data.frame(x=1))) + expect_equal(edf_as_of, edf_expected, ignore_attr=c(".internal.selfref", "sorted")) + expect_equal(data.table::key(ea2$DT), old_key) }) -test_that("data.table merging is utilized if second argument is a data.table",{ - dt1 <- select(ea$DT , -case_rate_7d_av) - ea1 <- as_epi_archive(dt1) - dt2 <- select(ea$DT , -percent_cli) - +test_that("quosure passing issue in epix_slide is resolved + other potential issues", { + # (First part adapted from @examples) + time_values <- seq(as.Date("2020-06-01"), + as.Date("2020-06-02"), + by = "1 day") + # We only have one non-version, non-time key in the example archive. Add + # another so that we don't accidentally pass tests due to accidentally + # matching the default grouping. + ea = as_epi_archive(archive_cases_dv_subset$DT %>% + dplyr::mutate(modulus = seq_len(nrow(.)) %% 5L), + other_keys = "modulus", + compactify = TRUE) + reference_by_modulus = epix_slide(x = ea, + f = ~ mean(.x$case_rate_7d_av), + n = 3, + group_by = modulus, + ref_time_values = time_values, + new_col_name = 'case_rate_3d_av') + reference_by_both = epix_slide(x = ea, + f = ~ mean(.x$case_rate_7d_av), + n = 3, + group_by = c(geo_value, modulus), + ref_time_values = time_values, + new_col_name = 'case_rate_3d_av') + # test the passing-something-that-must-be-enquosed behavior: expect_identical( - epix_merge(ea1,dt2), - merge(dt1,dt2,all=TRUE) + ea$slide( + f = ~ mean(.x$case_rate_7d_av), + n = 3, + group_by = modulus, + ref_time_values = time_values, + new_col_name = 'case_rate_3d_av' + ), + reference_by_modulus + ) + # test the passing-string-literal behavior: + expect_identical( + epix_slide(x = ea, + f = ~ mean(.x$case_rate_7d_av), + n = 3, + group_by = "modulus", + ref_time_values = time_values, + new_col_name = 'case_rate_3d_av'), + reference_by_modulus + ) + expect_identical( + ea$slide( + f = ~ mean(.x$case_rate_7d_av), + n = 3, + group_by = "modulus", + ref_time_values = time_values, + new_col_name = 'case_rate_3d_av' + ), + reference_by_modulus + ) + # Might also want to test the passing-string-var-without-all_of behavior, but + # make sure to set, trigger, then reset (or restore to old value) the + # tidyselect once-per-session message about the ambiguity + # + # test the passing-all-of-string-var behavior: + my_group_by = "modulus" + expect_identical( + epix_slide(x = ea, + f = ~ mean(.x$case_rate_7d_av), + n = 3, + group_by = tidyselect::all_of(my_group_by), + ref_time_values = time_values, + new_col_name = 'case_rate_3d_av'), + reference_by_modulus + ) + expect_identical( + ea$slide( + f = ~ mean(.x$case_rate_7d_av), + n = 3, + group_by = tidyselect::all_of(my_group_by), + ref_time_values = time_values, + new_col_name = 'case_rate_3d_av' + ), + reference_by_modulus + ) + # test the default behavior (default in this case should just be "geo_value"): + expect_identical( + epix_slide(x = ea, + f = ~ mean(.x$case_rate_7d_av), + n = 3, + ref_time_values = time_values, + new_col_name = 'case_rate_3d_av'), + reference_by_both ) -}) - -test_that("data.table merging works as intended",{ - ea <- archive_cases_dv_subset$clone() - dt1 <- select(ea$DT , -case_rate_7d_av) - ea1 <- as_epi_archive(dt1) - dt2 <- select(ea$DT , -percent_cli) - expect_identical( - as_epi_archive(ea$DT), - as_epi_archive(merge(dt1,dt2,all=TRUE)) + ea$slide( + f = ~ mean(.x$case_rate_7d_av), + n = 3, + ref_time_values = time_values, + new_col_name = 'case_rate_3d_av' + ), + reference_by_both ) }) diff --git a/vignettes/advanced.Rmd b/vignettes/advanced.Rmd index a76c5225..04c73af3 100644 --- a/vignettes/advanced.Rmd +++ b/vignettes/advanced.Rmd @@ -224,13 +224,13 @@ x <- y1 %>% ) %>% as_epi_archive() -epix_merge(x, y2 %>% +# mutating merge operation: +x$merge(y2 %>% select(geo_value, time_value, version = issue, case_rate_7d_av = value ) %>% - as_epi_archive(), -all = TRUE + as_epi_archive() ) ``` diff --git a/vignettes/aggregation.Rmd b/vignettes/aggregation.Rmd index f1d32616..617f0983 100644 --- a/vignettes/aggregation.Rmd +++ b/vignettes/aggregation.Rmd @@ -153,8 +153,8 @@ explicit value. The default is `NA`, but in the current case, where missingness is not at random but rather represents a small value that was censored (only a hypothetical with COVID-19 reports, but certainly a real phenomenon that occurs in other signals), it is better to replace it by zero, which is what we do here. -(Other approaches, such as LOCF: last-observation-carried-forward, could be -accomplished by first filling with `NA` values and then following up with a +(Other approaches, such as LOCF: last observation carried forward in time, could +be accomplished by first filling with `NA` values and then following up with a second call to `tidyr::fill()`.) ```{r} diff --git a/vignettes/archive.Rmd b/vignettes/archive.Rmd index e729b32d..1b8d0622 100644 --- a/vignettes/archive.Rmd +++ b/vignettes/archive.Rmd @@ -63,15 +63,17 @@ has (at least) the following columns: `time_value` is January 14, 2022, then this row contains the measurements of the data for January 14, 2022 that were available one day later. -As we can see from the above, the data frame returned by -`delphi.epidata::covidcast()` has the columns required for the `epi_archive` -format, with `issue` playing the role of `version`. We can now use -`as_epi_archive()` to bring it into `epi_archive` format. +As we can see from the above, the data frame returned by +`delphi.epidata::covidcast()` has the columns required for the `epi_archive` +format, with `issue` playing the role of `version`. We can now use +`as_epi_archive()` to bring it into `epi_archive` format. For removal of +redundant version updates in `as_epi_archive` using compactify, please refer +to the compactify vignette. ```{r, eval=FALSE} x <- dv %>% select(geo_value, time_value, version = issue, percent_cli = value) %>% - as_epi_archive() + as_epi_archive(compactify=TRUE) class(x) print(x) @@ -80,7 +82,7 @@ print(x) ```{r, echo=FALSE, message=FALSE, warning=FALSE} x <- archive_cases_dv_subset$DT %>% select(geo_value, time_value, version , percent_cli) %>% - as_epi_archive() + as_epi_archive(compactify=TRUE) class(x) print(x) @@ -106,10 +108,10 @@ snapshot of data from the archive, as of a given version (also described below). key(x$DT) ``` -In general, last observation carried forward (LOCF) is used to data in between -recorded versions. **A word of caution:** R6 objects, unlike most other objects -in R, have reference semantics. An important consequence of this is that objects -are not copied when modified. +In general, the last version of each observation is carried forward (LOCF) to +fill in data between recorded versions. **A word of caution:** R6 objects, +unlike most other objects in R, have reference semantics. An important +consequence of this is that objects are not copied when modified. ```{r} original_value <- x$DT$percent_cli[1] @@ -123,7 +125,7 @@ x$DT$percent_cli[1] <- original_value To make a copy, we can use the `clone()` method for an R6 class, as in `y <- x$clone()`. You can read more about reference semantics in Hadley Wickham's [Advanced R](https://adv-r.hadley.nz/r6.html#r6-semantics) book. - + ## Some details on metadata The following pieces of metadata are included as fields in an `epi_archive` @@ -139,6 +141,15 @@ as in `x$geo_type` or `x$time_type`, etc. Just like `as_epi_df()`, the function object is instantiated, if they are not explicitly specified in the function call (as it did in the case above). +Note that `compactify` is **NOT** metadata and is an argument passed when creating +the dataset, without being stored in the end: + +```{r,message=FALSE} +# `dt` here is taken from the tests +as_epi_archive(archive_cases_dv_subset$DT,compactify=TRUE)$geo_type # "state" +as_epi_archive(archive_cases_dv_subset$DT,compactify=TRUE)$compactify # NULL +``` + ## Producing snapshots in `epi_df` form A key method of an `epi_archive` class is `as_of()`, which generates a snapshot @@ -162,7 +173,7 @@ date was June 1, 2021. From this we can infer that the doctor's visits signal was 2 days latent on June 1. Also, we can see that the metadata in the `epi_df` object has the version date recorded in the `as_of` field. -Using the maximum of the `version` column in the underlying data table in an +By default, using the maximum of the `version` column in the underlying data table in an `epi_archive` object itself generates a snapshot of the latest values of signal variables in the entire archive. The `epix_as_of()` function issues a warning in this case, since updates to the current version may still come in at a later @@ -215,25 +226,28 @@ quite as dramatically. Modeling the revision process, which is often called ## Merging `epi_archive` objects -Now we demonstrate how to merge the underlying data tables in two `epi_archive` -objects together. The `epi_archive` class provides a method `merge()` precisely -for this purpose. The wrapper function is called `epix_merge()`; as before, this -is just offered as a matter of convenience/familiarity for some users. Below we -merge the working `epi_archive` of versioned percentage CLI from outpatient -visits to another one of versioned COVID-19 case reporting data, which we fetch -the from the [COVIDcast +Now we demonstrate how to merge two `epi_archive` objects together, e.g., so +that grabbing data from multiple sources as of a particular version can be +performed with a single `as_of` call. The `epi_archive` class provides a method +`merge()` precisely for this purpose. The wrapper function is called +`epix_merge()`; this wrapper avoids mutating its inputs, while `x$merge` will +mutate `x`. Below we merge the working `epi_archive` of versioned percentage CLI +from outpatient visits to another one of versioned COVID-19 case reporting data, +which we fetch the from the [COVIDcast API](https://cmu-delphi.github.io/delphi-epidata/api/covidcast.html/), on the rate scale (counts per 100,000 people in the population). -When merging archives, we typically want to perform a *full join*, otherwise we -will be throwing out versioned data from one table or the other. This is -accomplished by setting `all = TRUE` in the call to `epix_merge()`. Furthermore, -this function provides an option for filling `NA` values via LOCF by setting -`locf = TRUE`. In general, unless two data tables have the exact same pattern of -updates, we will get `NA` values in the signals after performing a full join. -Because the original data archives are stored in LOCF (last observation carried -forward) format in the first place, it generally makes sense to perform `NA` -filling after merging using LOCF. Therefore `locf = TRUE` is the default. +When merging archives, unless the archives have identical data release patterns, +`NA`s can be introduced in the non-key variables for a few reasons: +- to represent the "value" of an observation before its initial release (when we + need to pair it with additional observations from the other archive that have + been released) +- to represent the "value" of an observation that has no recorded versions at + all (in the same sort of situation) +- if requested via `sync="na"`, to represent potential update data that we do + not yet have access to (e.g., due to encountering issues while attempting to + download the currently available version data for one of the archives, but not + the other). ```{r, message = FALSE, warning = FALSE,eval=FALSE} y <- covidcast( @@ -241,15 +255,15 @@ y <- covidcast( signals = "confirmed_7dav_incidence_prop", time_type = "day", geo_type = "state", - time_value = epirange(20200601, 20211201), + time_values = epirange(20200601, 20211201), geo_values = "ca,fl,ny,tx", issues = epirange(20200601, 20211201) ) %>% fetch_tbl() %>% select(geo_value, time_value, version = issue, case_rate_7d_av = value) %>% - as_epi_archive() + as_epi_archive(compactify=TRUE) -epix_merge(x, y, all = TRUE) +x$epix_merge(y) print(x) head(x$DT) ``` @@ -260,9 +274,10 @@ print(x) head(x$DT) ``` -Importantly, as we can see, the way `epix_merge()` works is that it -**overwrites** the data table in the first `epi_archive` object `x` by the -merged data table. +Importantly, see that `x$merge` mutated `x` to hold the result of the merge. We +could also have used `xy = epix_merge(x,y)` to avoid mutating `x`. See the +documentation for either for more detailed descriptions of what mutation, +pointer aliasing, and pointer reseating is possible. ## Sliding version-aware computations @@ -408,7 +423,7 @@ ggplot(fc, aes(x = target_date, group = time_value, fill = as_of)) + theme(legend.position = "none") ``` -Each row displays the forecasts for a different location (CA and FL), and each +Each row displays the forecasts for a different location (CA, FL, NY, and TX), and each column corresponds to whether properly-versioned data is used (`FALSE` means no, and `TRUE` means yes). We can see that the properly-versioned forecaster is, at some points in time, more problematic; for example, it massively overpredicts diff --git a/vignettes/compactify.Rmd b/vignettes/compactify.Rmd new file mode 100644 index 00000000..034235b3 --- /dev/null +++ b/vignettes/compactify.Rmd @@ -0,0 +1,119 @@ +--- +title: Compactify to remove redundant archive data +output: rmarkdown::html_vignette +vignette: > + %\VignetteIndexEntry{Compactify to remove redundant archive data} + %\VignetteEngine{knitr::rmarkdown} + %\VignetteEncoding{UTF-8} +--- + +## Removing redundant update data to save space + +We do not need to store version update rows that look like the last version of +the corresponding observations carried forward (LOCF) for use with +`epiprocess`'s' `epi_archive`-related functions, as they all apply LOCF to fill +in data between explicit updates. By default, we even detect and remove these +LOCF-redundant rows to save space; this should not impact results as long as you +do not directly work with the archive's `DT` field in a way that expects these +rows to remain. + +There are three different values that can be assigned to `compactify`: + +* No argument: if there are LOCF-redundant rows, removes them and issues a + warning with some information about what rows were removed +* `TRUE`: removes any LOCF-redundant rows without any warning or other feedback +* `FALSE`: keeps any LOCF-redundant rows without any warning or other feedback + +For this example, we have one chart using LOCF values, while another doesn't +use them to illustrate LOCF. Notice how the head of the first dataset differs +from the second from the third value included. + +```{r} +library(epiprocess) +library(dplyr) + +dt <- archive_cases_dv_subset$DT + +locf_omitted <- as_epi_archive(dt) +locf_included <- as_epi_archive(dt,compactify = FALSE) + +head(locf_omitted$DT) +head(locf_included$DT) +``` + +LOCF-redundant values can mar the performance of dataset operations. As the column +`case_rate_7d_av` has many more LOCF-redundant values than `percent_cli`, we will omit the +`percent_cli` column for comparing performance. + +```{r} +dt2 <- select(dt,-percent_cli) + +locf_included_2 <- as_epi_archive(dt2,compactify=FALSE) +locf_omitted_2 <- as_epi_archive(dt2,compactify=TRUE) +``` + +In this example, a huge proportion of the original version update data were +LOCF-redundant, and compactifying saves a large amount of space. The proportion +of data that is LOCF-redundant can vary widely between data sets, so we won't +always be this lucky. + +```{r} +nrow(locf_included_2$DT) +nrow(locf_omitted_2$DT) +``` + + +As we would expect, performing 1000 iterations of `dplyr::filter` is faster when +the LOCF values are omitted. + +```{r} +# Performance of filtering +iterate_filter <- function(my_ea) { + for (i in 1:1000) { + filter(my_ea$DT,version >= as.Date("2020-01-01") + i) + } +} + +elapsed_time <- function(fx) c(system.time(fx))[[3]] + +speed_test <- function(f,name) { + data.frame( + operation = name, + locf = elapsed_time(f(locf_included_2)), + no_locf = elapsed_time(f(locf_omitted_2)) + ) +} + +speeds <- speed_test(iterate_filter,"filter_1000x") + +``` + +We would also like to measure the speed of `epi_archive` methods. + +```{r} +# Performance of as_of iterated 200 times +iterate_as_of <- function(my_ea) { + for (i in 1:1000) { + my_ea$as_of(min(my_ea$DT$time_value) + i - 1000) + } +} + +speeds <- rbind(speeds, speed_test(iterate_as_of,"as_of_1000x")) + +# Performance of slide +slide_median <- function(my_ea) { + my_ea$slide(median = median(case_rate_7d_av), n = 7) +} + +speeds <- rbind(speeds, speed_test(slide_median,"slide_median")) +``` +Here is a detailed performance comparison: + +```{r} +speeds_tidy <- tidyr::gather(speeds,key="is_locf",value="time_in_s",locf,no_locf) + +library(ggplot2) + +ggplot(speeds_tidy) + + geom_bar(aes(x=is_locf,y=time_in_s,fill=operation),stat = "identity") +```