diff --git a/DESCRIPTION b/DESCRIPTION index 18a3bdfb..6f5c3c81 100755 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -46,7 +46,9 @@ Suggests: knitr, outbreaks, rmarkdown, - testthat (>= 3.0.0) + testthat (>= 3.0.0), + waldo (>= 0.3.1), + withr VignetteBuilder: knitr Remotes: diff --git a/NAMESPACE b/NAMESPACE index c6bd7d31..a290ab27 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -11,6 +11,8 @@ S3method(filter,epi_df) S3method(group_by,epi_df) S3method(group_modify,epi_df) S3method(mutate,epi_df) +S3method(next_after,Date) +S3method(next_after,integer) S3method(print,epi_df) S3method(relocate,epi_df) S3method(rename,epi_df) @@ -19,6 +21,7 @@ S3method(summary,epi_df) S3method(ungroup,epi_df) S3method(unnest,epi_df) export("%>%") +export(archive_cases_dv_subset) export(arrange) export(as_epi_archive) export(as_epi_df) @@ -38,19 +41,23 @@ export(group_modify) export(growth_rate) export(is_epi_archive) export(is_epi_df) +export(max_version_with_row_in) export(mutate) export(new_epi_df) +export(next_after) export(relocate) export(rename) export(slice) export(ungroup) export(unnest) importFrom(R6,R6Class) +importFrom(data.table,":=") +importFrom(data.table,address) importFrom(data.table,as.data.table) importFrom(data.table,between) +importFrom(data.table,copy) importFrom(data.table,key) -importFrom(data.table,merge.data.table) -importFrom(data.table,nafill) +importFrom(data.table,set) importFrom(data.table,setkeyv) importFrom(dplyr,arrange) importFrom(dplyr,filter) @@ -69,9 +76,11 @@ importFrom(rlang,"!!!") importFrom(rlang,"!!") importFrom(rlang,.data) importFrom(rlang,.env) +importFrom(rlang,arg_match) importFrom(rlang,enquo) importFrom(rlang,enquos) importFrom(rlang,is_quosure) +importFrom(rlang,quo_is_missing) importFrom(rlang,sym) importFrom(rlang,syms) importFrom(stats,cor) diff --git a/R/archive.R b/R/archive.R index d1f2b90f..bf5a38a9 100644 --- a/R/archive.R +++ b/R/archive.R @@ -6,6 +6,100 @@ # `data.table::` everywhere and not importing things. .datatable.aware = TRUE +#' Validate a version bound arg +#' +#' Expected to be used on `clobberable_versions_start`, `versions_end`, +#' and similar arguments. Some additional context-specific checks may be needed. +#' +#' @param version_bound the version bound to validate +#' @param x a data frame containing a version column with which to check +#' compatibility +#' @param na_ok Boolean; is `NULL` an acceptable "bound"? (If so, `NULL` will +#' have a special context-dependent meaning.) +#' @param version_bound_arg optional string; what to call the version bound in +#' error messages +#' +#' @section Side effects: raises an error if version bound appears invalid +#' +#' @noRd +validate_version_bound = function(version_bound, x, na_ok, + version_bound_arg = rlang::caller_arg(version_bound), + x_arg = rlang::caller_arg(version_bound)) { + # We might want some (optional?) validation here to detect internal bugs. + if (length(version_bound) != 1L) { + # Check for length-1-ness fairly early so we don't have to worry as much + # about our `if`s receiving non-length-1 "Boolean"s. + Abort(sprintf("`version_bound` must have length 1, but instead was length %d", + length(version_bound)), + class=sprintf("epiprocess__%s_is_not_length_1", version_bound_arg)) + } else if (is.na(version_bound)) { + # Check for NA before class&type, as any-class&type NA should be fine for + # our purposes, and some version classes&types might not have their own NA + # value to pass in. + if (na_ok) { + # Looks like a valid version bound; exit without error. + return(invisible(NULL)) + } else { + Abort(sprintf( + '`%s` must not satisfy `is.na` (NAs are not allowed for this kind of version bound)', + version_bound_arg + ), class=sprintf("epiprocess__%s_is_na", version_bound_arg)) + } + } else if (!identical(class(version_bound), class(x[["version"]])) || + !identical(typeof(version_bound), typeof(x[["version"]]))) { + Abort(sprintf( + '`class(%1$s)` must be identical to `class(%2$s)` and `typeof(%1$s)` must be identical to `typeof(%2$s)`', + version_bound_arg, + # '{x_arg}[["version"]]' except adding parentheses if needed: + rlang::expr_deparse(rlang::new_call( + quote(`[[`), rlang::pairlist2(rlang::parse_expr(x_arg), "version") + )) + ), class=sprintf("epiprocess__%s_has_invalid_class_or_typeof", version_bound_arg)) + } else { + # Looks like a valid version bound; exit without error. + return(invisible(NULL)) + } +} + +#' Default arg helper: `max(x$version)`, with error if `x` has 0 rows +#' +#' Exported to make defaults more easily copyable. +#' +#' @param x `x` argument of [`as_epi_archive`] +#' +#' @return `max(x$version)` if it has any rows; raises error if it has 0 rows or +#' an `NA` version value +#' +#' @export +max_version_with_row_in = function(x) { + if (nrow(x) == 0L) { + Abort(sprintf("`nrow(x)==0L`, representing a data set history with no row up through the latest observed version, but we don't have a sensible guess at what version that is, or whether any of the empty versions might be clobbered in the future; if we use `x` to form an `epi_archive`, then `clobberable_versions_start` and `versions_end` must be manually specified."), + class="epiprocess__max_version_cannot_be_used") + } else { + version_col = purrr::pluck(x, "version") # error not NULL if doesn't exist + if (anyNA(version_col)) { + Abort("version values cannot be NA", + class="epiprocess__version_values_must_not_be_na") + } else { + version_bound <- max(version_col) + } + } +} + +#' Get the next possible value greater than `x` of the same type +#' +#' @param x the starting "value"(s) +#' @return same class, typeof, and length as `x` +#' +#' @export +next_after = function(x) UseMethod("next_after") + +#' @export +next_after.integer = function(x) x + 1L + +#' @export +next_after.Date = function(x) x + 1L + #' @title `epi_archive` object #' #' @description An `epi_archive` is an R6 class which contains a data table @@ -34,17 +128,31 @@ #' key variables, and thus the key variables are critical for figuring out how #' to generate a snapshot of data from the archive, as of a given version. #' -#' In general, last observation carried forward (LOCF) is used to data in -#' between recorded versions. Currently, deletions must be represented as -#' revising a row to a special state (e.g., making the entries `NA` or -#' including a special column that flags the data as removed and performing -#' some kind of post-processing), and the archive is unaware of what this -#' state is. +#' In general, the last version of each observation is carried forward (LOCF) to +#' fill in data between recorded versions, and between the last recorded +#' update and the `versions_end`. One consequence is that the `DT` +#' doesn't have to contain a full snapshot of every version (although this +#' generally works), but can instead contain only the rows that are new or +#' changed from the previous version (see `compactify`, which does this +#' automatically). Currently, deletions must be represented as revising a row +#' to a special state (e.g., making the entries `NA` or including a special +#' column that flags the data as removed and performing some kind of +#' post-processing), and the archive is unaware of what this state is. Note +#' that `NA`s *can* be introduced by `epi_archive` methods for other reasons, +#' e.g., in [`epix_fill_through_version`] and [`epix_merge`], if requested, to +#' represent potential update data that we do not yet have access to; or in +#' [`epix_merge`] to represent the "value" of an observation before the +#' version in which it was first released, or if no version of that +#' observation appears in the archive data at all. #' #' **A word of caution:** R6 objects, unlike most other objects in R, have #' reference semantics. A primary consequence of this is that objects are not #' copied when modified. You can read more about this in Hadley Wickham's -#' [Advanced R](https://adv-r.hadley.nz/r6.html#r6-semantics) book. +#' [Advanced R](https://adv-r.hadley.nz/r6.html#r6-semantics) book. In order +#' to construct a modified archive while keeping the original intact, first +#' make a clone using the `$clone` method, then overwrite the clone's `DT` +#' field with `data.table::copy(clone$DT)`, and finally perform the +#' modifications on the clone. #' #' @section Metadata: #' The following pieces of metadata are included as fields in an `epi_archive` @@ -66,7 +174,7 @@ #' `epi_df` format, which represents the most up-to-date values of the signal #' variables, as of the specified version. This is accomplished by calling the #' `as_of()` method for an `epi_archive` object `x`. More details on this -#' method are documented in the wrapper function `epix_as_of()`. +#' method are documented in the wrapper function [`epix_as_of()`]. #' #' @section Sliding Computations: #' We can run a sliding computation over an `epi_archive` object, much like @@ -76,8 +184,8 @@ #' difference: it is version-aware. That is, for an `epi_archive` object, the #' sliding computation at any given reference time point t is performed on #' **data that would have been available as of t**. More details on `slide()` -#' are documented in the wrapper function `epix_slide()`. -#' +#' are documented in the wrapper function [`epix_slide()`]. +#' #' @importFrom R6 R6Class #' @export #' @examples @@ -102,6 +210,8 @@ epi_archive = geo_type = NULL, time_type = NULL, additional_metadata = NULL, + clobberable_versions_start = NULL, + versions_end = NULL, #' @description Creates a new `epi_archive` object. #' @param x A data frame, data table, or tibble, with columns `geo_value`, #' `time_value`, `version`, and then any additional number of columns. @@ -117,6 +227,23 @@ epi_archive = #' @param additional_metadata List of additional metadata to attach to the #' `epi_archive` object. The metadata will have `geo_type` and `time_type` #' fields; named entries from the passed list or will be included as well. +#' @param compactify Optional; Boolean or `NULL`: should we remove rows that are +#' considered redundant for the purposes of `epi_archive`'s built-in methods +#' such as `as_of`? As these methods use the last version of each observation +#' carried forward (LOCF) to interpolate between the version data provided, +#' rows that don't change these LOCF results can potentially be omitted to +#' save space while maintaining the same behavior (with the help of the +#' `clobberable_versions_start` and `versions_end` fields in some +#' edge cases). `TRUE` will remove these rows, `FALSE` will not, and missing +#' or `NULL` will remove these rows and issue a warning. Generally, this can +#' be set to `TRUE`, but if you directly inspect or edit the fields of the +#' `epi_archive` such as its `DT`, you will have to determine whether +#' `compactify=TRUE` will produce the desired results. If compactification +#' here is removing a large proportion of the rows, this may indicate a +#' potential for space, time, or bandwidth savings upstream the data pipeline, +#' e.g., when fetching, storing, or preparing the input data `x` +#' @param clobberable_versions_start Optional; as in [`as_epi_archive`] +#' @param versions_end Optional; as in [`as_epi_archive`] #' @return An `epi_archive` object. #' @importFrom data.table as.data.table key setkeyv #' @@ -124,7 +251,8 @@ epi_archive = #' Refer to the documentation for [as_epi_archive()] for more information #' and examples of parameter names. initialize = function(x, geo_type, time_type, other_keys, - additional_metadata) { + additional_metadata, compactify, + clobberable_versions_start, versions_end) { # Check that we have a data frame if (!is.data.frame(x)) { Abort("`x` must be a data frame.") @@ -140,6 +268,10 @@ epi_archive = if (!("version" %in% names(x))) { Abort("`x` must contain a `version` column.") } + if (anyNA(x$version)) { + Abort("`x$version` must not contain `NA`s", + class = "epiprocess__version_values_must_not_be_na") + } # If geo type is missing, then try to guess it if (missing(geo_type)) { @@ -164,19 +296,106 @@ epi_archive = c("geo_type", "time_type"))) { Warn("`additional_metadata` names overlap with existing metadata fields \"geo_type\", \"time_type\".") } - + + # Conduct checks and apply defaults for `compactify` + if (missing(compactify)) { + compactify = NULL + } else if (!rlang::is_bool(compactify) && + !rlang::is_null(compactify)) { + Abort("compactify must be boolean or null.") + } + + # Apply defaults and conduct checks and apply defaults for + # `clobberable_versions_start`, `versions_end`: + if (missing(clobberable_versions_start)) { + clobberable_versions_start <- max_version_with_row_in(x) + } + if (missing(versions_end)) { + versions_end <- max_version_with_row_in(x) + } + validate_version_bound(clobberable_versions_start, x, na_ok=TRUE) + validate_version_bound(versions_end, x, na_ok=FALSE) + if (nrow(x) > 0L && versions_end < max(x[["version"]])) { + Abort(sprintf("`versions_end` was %s, but `x` contained + updates for a later version or versions, up through %s", + versions_end, max(x[["version"]])), + class="epiprocess__versions_end_earlier_than_updates") + } + if (!is.na(clobberable_versions_start) && clobberable_versions_start > versions_end) { + Abort(sprintf("`versions_end` was %s, but a `clobberable_versions_start` + of %s indicated that there were later observed versions", + versions_end, clobberable_versions_start), + class="epiprocess__versions_end_earlier_than_clobberable_versions_start") + } + + # --- End of validation and replacing missing args with defaults --- + # Create the data table; if x was an un-keyed data.table itself, # then the call to as.data.table() will fail to set keys, so we # need to check this, then do it manually if needed key_vars = c("geo_value", "time_value", other_keys, "version") DT = as.data.table(x, key = key_vars) if (!identical(key_vars, key(DT))) setkeyv(DT, cols = key_vars) - + + # Checks to see if a value in a vector is LOCF + is_locf <- function(vec) { + dplyr::if_else(!is.na(vec) & !is.na(dplyr::lag(vec)), + vec == dplyr::lag(vec), + is.na(vec) & is.na(dplyr::lag(vec))) + } + + # LOCF is defined by a row where all values except for the version + # differ from their respective lag values + + # Checks for LOCF's in a data frame + rm_locf <- function(df) { + dplyr::filter(df,if_any(c(everything(),-version),~ !is_locf(.))) + } + + # Keeps LOCF values, such as to be printed + keep_locf <- function(df) { + dplyr::filter(df,if_all(c(everything(),-version),~ is_locf(.))) + } + + # Runs compactify on data frame + if (is.null(compactify) || compactify == TRUE) { + elim = keep_locf(DT) + DT = rm_locf(DT) + } else { + # Create empty data frame for nrow(elim) to be 0 + elim = tibble::tibble() + } + + # Warns about redundant rows + if (is.null(compactify) && nrow(elim) > 0) { + warning_intro <- break_str(paste( + 'Found rows that appear redundant based on', + 'last (version of each) observation carried forward;', + 'these rows have been removed to "compactify" and save space:' + )) + + warning_data <- paste(collapse="\n", capture.output(print(elim, topn=3L, nrows=7L))) + + warning_outro <- break_str(paste( + "Built-in `epi_archive` functionality should be unaffected,", + "but results may change if you work directly with its fields (such as `DT`).", + "See `?as_epi_archive` for details.", + "To silence this warning but keep compactification,", + "you can pass `compactify=TRUE` when constructing the archive." + )) + + warning_message <- paste(sep="\n", warning_intro, warning_data, warning_outro) + + rlang::warn(warning_message, class="epiprocess__compactify_default_removed_rows") + } + # Instantiate all self variables self$DT = DT self$geo_type = geo_type self$time_type = time_type self$additional_metadata = additional_metadata + self$clobberable_versions_start = clobberable_versions_start + self$versions_end = versions_end }, print = function() { cat("An `epi_archive` object, with metadata:\n") @@ -194,14 +413,20 @@ epi_archive = min_time = Min(self$DT$time_value) max_time = Max(self$DT$time_value) } - cat(sprintf("* %-14s = %s\n", "min time value", - min_time)) - cat(sprintf("* %-14s = %s\n", "max time value", - max_time)) - cat(sprintf("* %-14s = %s\n", "min version", + cat(sprintf("* %-14s = %s\n", "min time value", min_time)) + cat(sprintf("* %-14s = %s\n", "max time value", max_time)) + cat(sprintf("* %-14s = %s\n", "first version with update", min(self$DT$version))) - cat(sprintf("* %-14s = %s\n", "max version", + cat(sprintf("* %-14s = %s\n", "last version with update", max(self$DT$version))) + if (is.na(self$clobberable_versions_start)) { + cat("No clobberable versions\n") + } else { + cat(sprintf("* %-14s = %s\n", "clobberable versions start", + self$clobberable_versions_start)) + } + cat(sprintf("* %-14s = %s\n", "versions end", + self$versions_end)) cat("----------\n") cat(sprintf("Data archive (stored in DT field): %i x %i\n", nrow(self$DT), ncol(self$DT))) @@ -213,31 +438,36 @@ epi_archive = cat("----------\n") cat(sprintf("Public methods: %s\n", paste(names(epi_archive$public_methods), - collapse = ", "))) + collapse = ", ")),"\n") + }, ##### #' @description Generates a snapshot in `epi_df` format as of a given version. -#' See the documentation for the wrapper function `epix_as_of()` for details. +#' See the documentation for the wrapper function [`epix_as_of()`] for details. #' @importFrom data.table between key as_of = function(max_version, min_time_value = -Inf) { # Self max version and other keys - self_max = max(self$DT$version) other_keys = setdiff(key(self$DT), c("geo_value", "time_value", "version")) if (length(other_keys) == 0) other_keys = NULL # Check a few things on max_version - if (!identical(class(max_version), class(self$DT$version))) { - Abort("`max_version` and `DT$version` must have same class.") + if (!identical(class(max_version), class(self$DT$version)) || + !identical(typeof(max_version), typeof(self$DT$version))) { + Abort("`max_version` and `DT$version` must have same `class` and `typeof`.") } if (length(max_version) != 1) { Abort("`max_version` cannot be a vector.") } - if (max_version > self_max) { - Abort("`max_version` must be at most `max(DT$max_version)`.") + if (is.na(max_version)) { + Abort("`max_version` must not be NA.") + } + if (max_version > self$versions_end) { + Abort("`max_version` must be at most `self$versions_end`.") } - if (max_version == self_max) { - Warn("Getting data as of the latest version possible. For a variety of reasons, it is possible that we only have a preliminary picture of this version (e.g., the upstream source has updated it but we have not seen it due to latency in synchronization). Thus, the snapshot that we produce here might not be reproducible at a later time (e.g., when the archive has caught up in terms of synchronization).") + if (!is.na(self$clobberable_versions_start) && max_version >= self$clobberable_versions_start) { + Warn('Getting data as of some "clobberable" version that might be hotfixed, synced, or otherwise replaced later with different data using the same version tag. Thus, the snapshot that we produce here might not be reproducible later. See `?epi_archive` for more info and `?epix_as_of` on how to muffle.', + class="epiprocess__snapshot_as_of_clobberable_version") } # Filter by version and return @@ -257,41 +487,103 @@ epi_archive = ) }, ##### -#' @description Merges another `data.table` with the current one, and allows for -#' a post-filling of `NA` values by last observation carried forward (LOCF). -#' See the documentation for the wrapper function `epix_merge()` for details. -#' @importFrom data.table key merge.data.table nafill - merge = function(y, ..., locf = TRUE, nan = NA) { - # Check we have a `data.table` object - if (!(inherits(y, "data.table") || inherits(y, "epi_archive"))) { - Abort("`y` must be of class `data.table` or `epi_archive`.") +#' @description Fill in unobserved history using requested scheme by mutating +#' `self` and potentially reseating its fields. See +#' [`epix_fill_through_version`] for a full description of the non-R6-method +#' version, which doesn't mutate the input archive but might alias its fields. +#' +#' @param fill_versions_end as in [`epix_fill_through_version`] +#' @param how as in [`epix_fill_through_version`] +#' +#' @importFrom data.table key setkeyv := address copy +#' @importFrom rlang arg_match + fill_through_version = function(fill_versions_end, + how=c("na", "locf")) { + validate_version_bound(fill_versions_end, self$DT, na_ok=FALSE) + how <- arg_match(how) + if (self$versions_end < fill_versions_end) { + new_DT = switch( + how, + "na" = { + # old DT + a version consisting of all NA observations + # immediately after the last currently/actually-observed + # version. Note that this NA-observation version must only be + # added if `self` is outdated. + nonversion_key_cols = setdiff(key(self$DT), "version") + nonkey_cols = setdiff(names(self$DT), key(self$DT)) + next_version_tag = next_after(self$versions_end) + if (next_version_tag > fill_versions_end) { + Abort(sprintf(paste( + "Apparent problem with `next_after` implementation:", + "archive contained observations through version %s", + "and the next possible version was supposed to be %s,", + "but this appeared to jump from a version < %3$s", + "to one > %3$s, implying at least one version in between." + ), self$versions_end, next_version_tag, fill_versions_end)) + } + nonversion_key_vals_ever_recorded = unique(self$DT, by=nonversion_key_cols) + # In edge cases, the `unique` result can alias the original + # DT; detect and copy if necessary: + if (identical(address(self$DT), address(nonversion_key_vals_ever_recorded))) { + nonversion_key_vals_ever_recorded <- copy(nonversion_key_vals_ever_recorded) + } + next_version_DT = nonversion_key_vals_ever_recorded[ + , version := next_version_tag][ + # this makes the class of these columns logical (`NA` is a + # logical NA; we're relying on the rbind below to convert to + # the proper class&typeof) + , (nonkey_cols) := NA] + # full result DT: + setkeyv(rbind(self$DT, next_version_DT), key(self$DT))[] + }, + "locf" = { + # just the old DT; LOCF is built into other methods: + self$DT + } + ) + new_versions_end = fill_versions_end + # Update `self` all at once with simple, error-free operations + + # return below: + self$DT <- new_DT + self$versions_end <- new_versions_end + } else { + # Already sufficiently up to date; nothing to do. } + return (invisible(self)) + }, + ##### +#' @description Merges another `epi_archive` with the current one, mutating the +#' current one by reseating its `DT` and several other fields, but avoiding +#' mutation of the old `DT`; returns the current archive +#' [invisibly][base::invisible]. See [`epix_merge`] for a full description +#' of the non-R6-method version, which does not mutate either archive, and +#' does not alias either archive's `DT`. +#' @param y as in [`epix_merge`] +#' @param sync as in [`epix_merge`] +#' @param compactify as in [`epix_merge`] + merge = function(y, sync = c("forbid","na","locf","truncate"), compactify = TRUE) { + result = epix_merge(self, y, + sync = sync, + compactify = compactify) - # Use the data.table merge function, carrying through ... args - if (inherits(y, "data.table")) self$DT = merge(self$DT, y, ...) - else self$DT = merge(self$DT, y$DT, ...) + if (length(epi_archive$private_fields) != 0L) { + Abort("expected no private fields in epi_archive", + internal=TRUE) + } - # Now use the data.table locf function, if we're asked to - if (locf) { - key_vars = key(self$DT) - cols = setdiff(names(self$DT), key_vars) - by = setdiff(key_vars, "version") + # Mutate fields all at once, trying to avoid any potential errors: + for (field_name in names(epi_archive$public_fields)) { + self[[field_name]] <- result[[field_name]] + } - # Important: use nafill and not setnafill because the latter - # returns the entire data frame by reference, and the former can - # be set to act on particular columns by reference using := - self$DT[, - (cols) := nafill(.SD, type = "locf", nan = nan), - .SDcols = cols, - by = by] - } - }, + return (invisible(self)) + }, ##### #' @description Slides a given function over variables in an `epi_archive` -#' object. See the documentation for the wrapper function `epix_as_of()` for +#' object. See the documentation for the wrapper function [`epix_slide()`] for #' details. #' @importFrom data.table key -#' @importFrom rlang !! !!! enquo enquos is_quosure sym syms +#' @importFrom rlang !! !!! enquo quo_is_missing enquos is_quosure sym syms slide = function(f, ..., n, group_by, ref_time_values, time_step, new_col_name = "slide_value", as_list_col = FALSE, names_sep = "_", @@ -310,19 +602,16 @@ epi_archive = before_num = n-1 if (!missing(time_step)) before_num = time_step(n-1) - # What to group by? If missing, set according to internal keys - if (missing(group_by)) { - group_by = setdiff(key(self$DT), c("time_value", "version")) + # What to group by? If missing, set according to internal keys; + # otherwise, tidyselect. + if (quo_is_missing(enquo(group_by))) { + group_by <- syms(setdiff(key(self$DT), c("time_value", "version"))) + } else { + group_by <- syms(names(eval_select(enquo(group_by), self$DT))) } - # Symbolize column name, defuse grouping variables. We have to do - # the middle step here which is a bit complicated (unfortunately) - # since the function epix_slide() could have called the current one, - # and in doing so, it may have already needed to defuse the grouping - # variables + # Symbolize column name new_col = sym(new_col_name) - if (!is_quosure(group_by)) group_by = enquo(group_by) - group_by = syms(names(eval_select(group_by, self$DT))) # Key variable names, apart from time value and version key_vars = setdiff(key(self$DT), c("time_value", "version")) @@ -463,6 +752,50 @@ epi_archive = #' @param additional_metadata List of additional metadata to attach to the #' `epi_archive` object. The metadata will have `geo_type` and `time_type` #' fields; named entries from the passed list or will be included as well. +#' @param compactify Optional; Boolean or `NULL`: should we remove rows that are +#' considered redundant for the purposes of `epi_archive`'s built-in methods +#' such as `as_of`? As these methods use the last version of each observation +#' carried forward (LOCF) to interpolate between the version data provided, +#' rows that don't change these LOCF results can potentially be omitted to +#' save space. `TRUE` will remove these rows, `FALSE` will not, and missing or +#' `NULL` will remove these rows and issue a warning. Generally, this can be +#' set to `TRUE`, but if you directly inspect or edit the fields of the +#' `epi_archive` such as its `DT`, you will have to determine whether +#' `compactify=TRUE` will produce the desired results. If compactification +#' here is removing a large proportion of the rows, this may indicate a +#' potential for space, time, or bandwidth savings upstream the data pipeline, +#' e.g., when fetching, storing, or preparing the input data `x` +#' @param clobberable_versions_start Optional; `length`-1; either a value of the +#' same `class` and `typeof` as `x$version`, or an `NA` of any `class` and +#' `typeof`: specifically, either (a) the earliest version that could be +#' subject to "clobbering" (being overwritten with different update data, but +#' using the same version tag as the old update data), or (b) `NA`, to +#' indicate that no versions are clobberable. There are a variety of reasons +#' why versions could be clobberable, such as upstream hotfixes to the latest +#' version, or delays in data synchronization that were mistaken for versions +#' with no updates; potential causes vary between different data pipelines. +#' The default value is `max_version_with_row_in(x)`; this default assumes +#' that (i) if a row in `x` (even one that `compactify` would consider +#' redundant) is present with version `ver`, then all previous versions must +#' be finalized and non-clobberable, although `ver` (and onward) might still +#' be modified, (ii) even if we have "observed" empty updates for some +#' versions beyond `max(x$version)` (as indicated by `versions_end`; +#' see below), we can't assume `max(x$version)` has been finalized, because we +#' might see a nonfinalized version + empty subsequent versions due to +#' upstream database replication delays in combination with the upstream +#' replicas using last-version-carried-forward to extrapolate that there were +#' no updates, (iii) "redundant" update rows that would be removed by +#' `compactify` are not redundant, and actually come from an explicit version +#' release that indicates that preceding versions are finalized. If `nrow(x) +#' == 0`, then this argument is mandatory. +#' @param versions_end Optional; length-1, same `class` and `typeof` as +#' `x$version`: what is the last version we have observed? The default is +#' `max_version_with_row_in(x)`, but values greater than this could also be +#' valid, and would indicate that we observed additional versions of the data +#' beyond `max(x$version)`, but they all contained empty updates. (The default +#' value of `clobberable_versions_start` does not fully trust these empty +#' updates, and assumes that any version `>= max(x$version)` could be +#' clobbered.) If `nrow(x) == 0`, then this argument is mandatory. #' @return An `epi_archive` object. #' #' @details This simply a wrapper around the `new()` method of the `epi_archive` @@ -495,22 +828,26 @@ epi_archive = #' df <- data.frame (geo_value = c(replicate(2, "ca"), replicate(2, "fl")), #' county = c(1, 3, 2, 5), #' time_value = c("2020-06-01", -#' "2020-06-02", -#' "2020-06-01", -#' "2020-06-02"), +#' "2020-06-02", +#' "2020-06-01", +#' "2020-06-02"), #' version = c("2020-06-02", -#' "2020-06-03", -#' "2020-06-02", -#' "2020-06-03"), +#' "2020-06-03", +#' "2020-06-02", +#' "2020-06-03"), #' cases = c(1, 2, 3, 4), #' cases_rate = c(0.01, 0.02, 0.01, 0.05)) #' #' x <- df %>% as_epi_archive(geo_type = "state", -#' time_type = "day", -#' other_keys = "county") +#' time_type = "day", +#' other_keys = "county") as_epi_archive = function(x, geo_type, time_type, other_keys, - additional_metadata = list()) { - epi_archive$new(x, geo_type, time_type, other_keys, additional_metadata) + additional_metadata = list(), + compactify = NULL, + clobberable_versions_start = max_version_with_row_in(x), + versions_end = max_version_with_row_in(x)) { + epi_archive$new(x, geo_type, time_type, other_keys, additional_metadata, + compactify, clobberable_versions_start, versions_end) } #' Test for `epi_archive` format diff --git a/R/data.R b/R/data.R index f281803e..f642ac47 100644 --- a/R/data.R +++ b/R/data.R @@ -67,8 +67,129 @@ #' * \href{https://cmu-delphi.github.io/delphi-epidata/api/covidcast-signals/doctor-visits.html}{From the COVIDcast Doctor Visits API}: The signal `percent_cli` is taken directly from the API without changes. #' * \href{https://cmu-delphi.github.io/delphi-epidata/api/covidcast-signals/jhu-csse.html}{From the COVIDcast Epidata API}: `case_rate_7d_av` is taken directly from the JHU CSSE \href{https://github.com/CSSEGISandData/COVID-19}{COVID-19 GitHub repository} without changes. The 7-day average signals are computed by Delphi by calculating moving averages of the preceding 7 days, so the signal for June 7 is the average of the underlying data for June 1 through 7, inclusive. #' * Furthermore, the data is a subset of the full dataset, the signal names slightly altered, and formatted into a tibble. +#' +#' @export "archive_cases_dv_subset" +#' Detect whether `pkgload` is unregistering a package (with some unlikely false positives) +#' +#' More precisely, detects the presence of a call to an `unregister` or +#' `unregister_namespace` function from any package in the indicated part of the +#' function call stack. +#' +#' @param parent_n optional, single non-`NA` non-negative integer; how many +#' "parent"/"ancestor" calls should we skip inspecting? Default of `0L` will +#' check everything up to, but not including the call to this function. If +#' building wrappers or utilities around this function it may be useful to use +#' this default to ignore those wrappers, especially if they might trigger +#' false positives now or in some future version of this function with a looser +#' function name test. +#' +#' @return Boolean +#' +#' @noRd +some_package_is_being_unregistered = function(parent_n = 0L) { + calls = sys.calls() + # `calls` will include the call to this function; strip out this call plus + # `parent_n` additional requested calls to make it like we're reasoning about + # the desired call. This could prevent potential false positives from + # triggering if, in a later version, we decide to loosen the `call_name` + # checks below to something that would be `TRUE` for the name of this function + # or one of the undesired call ancestors. + calls_to_inspect = utils::head(calls, n = -(parent_n + 1L)) + # Note that `utils::head(sys.calls(), n=-1L)` isn't equivalent, due to lazy + # argument evaluation. Note that copy-pasting the body of this function + # without this `utils::head` operation isn't always equivalent to calling it; + # e.g., within the `value` argument of a package-level `delayedAssign`, + # `sys.calls()` will return `NULL` is some or all cases, including when its + # evaluation has been triggered via `unregister`. + simple_call_names = purrr::map_chr(calls_to_inspect, function(call) { + maybe_simple_call_name = rlang::call_name(call) + if (is.null(maybe_simple_call_name)) NA_character_ else maybe_simple_call_name + }) + # `pkgload::unregister` is an (the?) exported function that forces + # package-level promises, while `pkgload:::unregister_namespace` is the + # internal function that does this package-level promise. Check for both just + # in case there's another exported function that calls `unregister_namespace` + # or other `pkgload` versions don't use the `unregister_namespace` internal. + # (Note that `NA_character_ %in%