Skip to content

Distinguish snapshot of signal vs. archive of signal, and as-of vs. issue #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
brookslogan opened this issue Oct 15, 2021 · 5 comments

Comments

@brookslogan
Copy link
Contributor

brookslogan commented Oct 15, 2021

The current epi_signal implementation contains issue as a column, allowing it to contain the full revision history of a signal, but functions using epi_signal appear to work only with the latest issue, and expects that issue to constitute a full snapshot of the data as of that issue, rather than, e.g., just the new & updated values. It makes sense to separate out the concept of such a snapshot from the full revision history, giving these concepts different names, classes, functions, etc., and requiring the user to explicitly convert between them (for conceptual clarity and code readability).

  • The current epi_signal should add the metadata field as-of-issue or as-of or something along those lines. Maybe it should be renamed to epi_snapshot? The issue column should be removed or renamed; some users might want to characterize how mature an observation is; some might do that with as_of_issue - time_value, and others might want to use latest_issue_that_updated_this_row - time_value.
  • A new class epi_archive (or epi_signal_archive?) should be added. It would have a structure similar to the structure of the current epi_signal table, but might benefit from being based on a data.table or R6 class wrapping a data.table, and may also need or benefit from some extra metadata.
    • The primary function (after construction) would be epi.archive$snapshot_as_of(as.of.issue), which would return an epi_signal or an error if the as.of.issue isn't in the range covered by the archive. LOCF could be used to fill in the gaps between issues but not after the latest issue. For the LOCF, something like one of the below:
      • unique(diff.DT[issue <= as.of.issue], by=prefixed.nonissue.key.names, fromLast=TRUE)[status == "new.or.updated"]
      • diff.DT[setkeyv(set(unique(diff.DT, by=prefixed.nonissue.key.names)[,prefixed.nonissue.key.names],,"issue",as.of.issue), c(prefixed.nonissue.key.names,"issue")), roll=TRUE][status == "new.or.updated"]
    • The entries would represent a sort of git-like diff. The most memory-efficient and canonical version would store ONLY entries that were added, changed, or removed in each issue. The operations above are based on a design where the columns are c(paste0("diff.", names(epi.signal)), "issue", "status"), where status is either "new.or.updated" or "removed". It could be made more complex to allow for more error checking or to provide extra info to users interested in revision modeling.
    • Extra metadata that may be useful:
      • a vector of issues for which the archive has recorded snapshots. This, or a max.issue value, is required if the archive covers all updates through, e.g., Sep 15, but data did not change from Sep 2 to Sep 15, in order to not complain when the user requests Sep 14 snapshot but to complain when they request Sep 16 snapshot, and in order to complain when the user tries to "add" a Sep 15 snapshot a second time by accident. It is also necessary to enable a "reproducible" option; this option would assume that the latest recorded issue could be subject to revisions (e.g., because the data for an issue date was revised in the middle of that date, and it's either currently that date & before the change was made, or any time/day after the change was made & before the user found out about it, due to pipeline delays and data fetching frequencies). It is also necessary if we want to allow a user to revise the archive by adding a new snapshot in the middle of the currently recorded issue range.
      • something equivalent to unique(diff.DT, by=prefixed.nonissue.key.names)[,prefixed.nonissue.key.names], to improve speed of the second query approach above. But users would also benefit from a function that returns this data, so that they know what geos&times were ever included in a data set.
      • class & typeof information for the columns, to help check that any snapshots added later are compatible with the existing data.
    • Implementing construction and updating the archive may be a little inconvenient, especially if users are allowed to insert new snapshots in the middle of the range of issues already recorded (the new diff needs to be computed and added + the already-recorded diff for the following issue needs to be updated). The user would be softly forbidden from providing a column redundant with as_of_issue when inserting snapshots, as that would make the diffs very inefficient.
  • Functions currently taking epi_signals should continue to work on epi_signals; instead of working on the latest "issue" by default, they will require the user to provide just a single snapshot of the data.

Secondly, we should disambiguate "issue". E.g., at least for epi_signal, it should be called "as.of.issue" or "as.of"; the archive could still use just "issue" or maybe it should also have a more specific term.

@ryantibs
Copy link
Member

This is an awesome suggestion Logan. I'm not tagging you simply because I'm hoping you'll stop reading GitHub this week and actually take some time off 🙂!

I love this idea in general, and after mulled over it for a couple of days, I'm ready to go with it. In the spirit of moving quickly, I'm thinking about just refactoring epi_signal so that it's issue-free (and put a required as-of field in the metadata), and making epi_signal_archive to just be with the same thing with an issue column (basically what we have now). Then I'll refactor all the vignettes, which currently don't use issue dates anyway.

A better and more efficient implementation of epi_signal_archive would be something that we should do in the near future, once discussion of how to do it has matured a bit, and then maybe/hopefully you could take over an implement it and write a new vignette on the archive format.

ryantibs added a commit that referenced this issue Oct 21, 2021
- As per the discussion on #8, from now on, an `epi_signal` object
  represents a single snapshot of a data set
- Modify `as.epi_signal()` to reflect this change
- Update documentation and vignettes accordingly
@ryantibs ryantibs mentioned this issue Oct 21, 2021
@brookslogan
Copy link
Contributor Author

brookslogan commented Oct 25, 2021

Regarding implementation:

  • old work on flu used just dplyr to do this and I believe it was a huge chunk of overhead
  • benchmarks with both data.table approaches above (unique and rolling-join-with-tracked-nonissue-keyset) seem fairly quick (sometimes <1s, sometimes 2s--5s) on an artificial archive with 200M rows (messy code below). The main issue is probably the memory footprint (200M rows x 4 numeric columns = 3.7GB), column typing, and dealing with updates to the archives. Things to consider:
    • partitioning by parts of the key (e.g., the "geo_value"). Storing these parts in a hash map (environment) might speed things up. Storing these parts in files would reduce the CPU memory footprint, but complicate updating the archive.
    • examining character vs. factor speed and on-disk size (in-memory should be similar), and encouraging/forcing one or the other
    • R6 vs. S3. Some natural functionality would modify in place or write to disk, and none of the main functions involved seem like they would apply to other classes, so I would lean toward R6.
  • database approaches: require significant setup, maybe a viral license, and in my past experimentation have been slow (but current covidcast approach is probably much better)

For the main implementation I could pursue something along the lines of one of the data.table approaches. But since there is flexibility here on the implementation, it may make sense to first define an interface for an archive, and allow multiple implementations of that interface (but to only provide one).

"Benchmark" code is somewhere in here:


library("data.table")


## n.geos = 3000L
## n.times = 365L*2L
## n.lags = 10L

## n.geos = 500L
## n.times = 365L*2L
## n.lags = 90L

n.geos = 3000L
n.times = 365L*2L
n.lags = 90L

DT = setkeyv(
  data.table(
    geo = rep(seq_len(n.geos), each=n.times*n.lags),
    time = rep(rep(seq_len(n.times), each=n.lags), n.geos)
    )
  [, issue := time + seq_len(n.lags)] # (recycling)
  [, value := rnorm(.N)]
  [],
  c("geo","time","issue")
)

t1 = Sys.time()
unique(DT[issue <= 10L], by=c("geo","time"), fromLast=TRUE)
t2 = Sys.time()
t2-t1

nonissue.key.DT = unique(DT, by=c("geo","time"))[,c("geo","time")]

DT[setkeyv(nonissue.key.DT[,issue := 10L], c("geo","time","issue")), roll=TRUE] # needs extra filtering for not-yet-present values

DT[c(nonissue.key.DT, issue=10L), roll=TRUE] # needs extra filtering for not-yet-present values

DT[SJ(nonissue.key.DT, data.table(issue=10L)), roll=TRUE] # needs extra filtering for not-yet-present values



t1 = Sys.time()
unique(DT[issue <= 60L], by=c("geo","time"), fromLast=TRUE)
t2 = Sys.time()
t2-t1

## FIXME not the way to really benchmark
t1 = Sys.time()
DT[setkeyv(nonissue.key.DT[,issue := 10L], c("geo","time","issue")), roll=TRUE] # needs extra filtering for not-yet-present values
t2 = Sys.time()
t2-t1

t1 = Sys.time()
DT[c(nonissue.key.DT, issue=60L), roll=TRUE] # needs extra filtering for not-yet-present values
t2 = Sys.time()
t2-t1

DT[SJ(nonissue.key.DT, data.table(issue=60L)), roll=TRUE] # needs extra filtering for not-yet-present values



## NOTE the nonissue key DT could be maintained separately.

## TODO also try alternative key-ing approaches

## TODO also try partitioning by geo (val/valgroup)

## TODO also need to look at other data types and whether using `factor` there helps (plus want clear invalid-key checking in both setups)... or even turning into a single index using the nonissue key DT?

@ryantibs
Copy link
Member

Excellent, thank you @brookslogan very useful!

How do you feel about me just defining an epi_archive object to be a tibble with an issue column, so that I can move forward quickly. I will just write some functions to interface from it to get out an epi_signal (assuming we keep the latter name, or maybe we call it epi_tibble as I suggested on #10).

This will definitely not be the most efficient, but then you can redesign using data.table, keeping the same interface, at a later point.

@brookslogan
Copy link
Contributor Author

This seems fine, as long as it has the same semantics for the snapshot-extraction function. I think the unique DT approach should also be fairly easy to go with if disregarding generality and robustness.

Diff-based archive contents for a covidcast signal could be returned in the new clients by just issuing a huge query for all dates and all issues from say Jan 1, 1234 to Jan 1, 3456. Or in the old client extract from the below:


devtools::dev_mode(TRUE, "dev_mode_dir")

library("pipeR")
library("tidyverse")
library("data.table")

source("delphi-epidata/src/client/delphi_epidata.R")

analysis.date = as.Date("2021-08-17")

cache_fetch = function(cache.file.sans.ext, fetcher_thunk) {
  ## XXX consider using rlang stuff instead of thunk
  ## XXX consider failing on, or storing, warnings
  cache.file = paste0(cache.file.sans.ext, ".RDS")
  if (file.exists(cache.file)) {
    return (readRDS(cache.file))
  } else {
    result = fetcher_thunk()
    saveRDS(result, cache.file)
    return (result)
  }
}
## TODO invalidation nonce?

row_major_list_of_lists_to_DT = function(row.major.list.of.lists) {
  withCallingHandlers({
    rbindlist(row.major.list.of.lists)
  },
  warning = function(w) {
    if (grepl("Column[^;.]*of item[^;.]*is length 0\\.", conditionMessage(w))) {
      invokeRestart("muffleWarning")
    }
  })
}

extract_epidata_DT = function(epidata.response) {
  if (epidata.response$result == 1L && epidata.response$message == "success") {
    row_major_list_of_lists_to_DT(epidata.response$epidata)
  } else {
    stop (epidata.response$message)
  }
}

match_refinement_arg = function(maybe.refinement.arg) {
  if (identical(maybe.refinement.arg, "*")) {
    rlang::abort ('"*" is not supported for refinement args', maybe.refinement.arg=maybe.refinement.arg)
  } else if (is.list(maybe.refinement.arg) && length(maybe.refinement.arg) == 2L && identical(names(maybe.refinement.arg), c("from","to"))) {
    ## single range; sort endpoints
    return (Epidata[["range"]](min(maybe.refinement.arg[["from"]], maybe.refinement.arg[["to"]]),
                               max(maybe.refinement.arg[["from"]], maybe.refinement.arg[["to"]])))
  } else if (inherits(maybe.refinement.arg, "Date") || is.vector(maybe.refinement.arg) && !is.list(maybe.refinement.arg)) {
    ## looks like vector of values; sort
    return (sort(maybe.refinement.arg))
  } else {
    ## unrecognized / too complex; reject
    rlang::abort ('Argument value was too complex for refinement arg handler; see rlang::last_error()$maybe.refinement.arg for the problematic arg.  Currently handled values for an arg are a single Epidata$range or a non-list vector of values', maybe.refinement.arg=maybe.refinement.arg)
  }
}

refinement_arg_is_single_value = function(refinement.arg) {
  if (is.list(refinement.arg) && length(refinement.arg) == 2L && identical(names(refinement.arg), c("from","to"))) {
    return (refinement.arg[["from"]] == refinement.arg[["to"]])
  } else if (inherits(refinement.arg, "Date") || is.vector(refinement.arg) && !is.list(refinement.arg)) {
    return (length(refinement.arg) == 1L)
  } else {
    ## XXX vs. a special object indicating failure, and check for at call site?
    rlang::abort ('Input did not appear to be a valid refinement arg.')
  }
}

next_highest_possible_value = function(x, ...) UseMethod("next_highest_possible_value", x)
next_highest_possible_value.integer = function(x, ...) x+1L
next_highest_possible_value.numeric = function(x, ...) {
  if (all(x == as.integer(x))) {
    x + 1
  } else {
    stop ('Getting the next highest possible value for general numeric values is not supported.')
  }
}
next_highest_possible_value.Date = function(x, ...) x+1L

split_refinement_arg = function(refinement.arg, incl.excl.split.refinement.key) {
  if (is.list(refinement.arg) && length(refinement.arg) == 2L && identical(names(refinement.arg), c("from","to"))) {
    return (list(left=Epidata[["range"]](refinement.arg[["from"]], incl.excl.split.refinement.key),
                 right=Epidata[["range"]](next_highest_possible_value(incl.excl.split.refinement.key), refinement.arg[["to"]])))
  } else if (inherits(refinement.arg, "Date") || is.vector(refinement.arg) && !is.list(refinement.arg)) {
    return (list(left=refinement.arg[refinement.arg <= incl.excl.split.refinement.key],
                 right=refinement.arg[refinement.arg > incl.excl.split.refinement.key]))
  } else {
    ## XXX vs. a special object indicating failure, and check for at call site?
    rlang::abort ('Input did not appear to be a valid refinement arg.')
  }
}

refining_subquery = function(epidata_endpoint, parent.arglist, refinement.argnames, refinement.keynames, refinement.argkey.i) {
  if (refinement.argkey.i > length(refinement.argnames)) {
    ## (If we are here, all of the refinement args should be single-valued.)
    response = rlang::exec(epidata_endpoint, !!!parent.arglist) # todo retries on some class of errors
    if (response$result != 2) {
      ## Not a partial result; return this subresult, or stop on other errors:
      return (extract_epidata_DT(response))
      ## TODO retries on certain errors
    } else {
      rlang::abort ('Subquery response was truncated, but all refinement args were already single values; cannot refine further to attempt to assemble the full response.  Maybe try adding another refinement argkey or manually breaking this query into multiple subqueries.', parent.arglist=parent.arglist, refinement.argnames=refinement.argnames, refinement.keynames=refinement.keynames)
    }
  } else {
    refinement.argname = refinement.argnames[[refinement.argkey.i]]
    refinement.keyname = refinement.keynames[[refinement.argkey.i]]
    if (refinement_arg_is_single_value(parent.arglist[[refinement.argname]])) {
      ## Can't refine this arg as it's already just a single value; recurse to try the next refinement argkey
      refining_subquery(epidata_endpoint, parent.arglist, refinement.argnames, refinement.keynames, refinement.argkey.i + 1L)
    } else {
      response = rlang::exec(epidata_endpoint, !!!parent.arglist) # todo retries on some class of errors
      if (response$result != 2) {
        ## Not a partial result; return this subresult, or stop on other errors:
        return (extract_epidata_DT(response))
        ## TODO retries on certain errors
      } else {
        ## Partial result, and current refinement arg is not single-valued; try refining the current arg:
        truncated.DT = row_major_list_of_lists_to_DT(response$epidata)
        some.values.of.refinement.key = unique(truncated.DT[[refinement.keyname]])
        tallied.values.of.refinement.key = truncated.DT[, list(N=.N), keyby=refinement.keyname]
        ##   Try querying values of key covering ~75% of partial result in one subquery, rest in another subquery:
        incl.excl.split.refinement.key = tail(tallied.values.of.refinement.key[cumsum(N) <= 0.75*nrow(truncated.DT) | seq_along(N) == 1L][[refinement.keyname]], 1L)
        refinement.arg.split = split_refinement_arg(parent.arglist[[refinement.argname]], incl.excl.split.refinement.key)
        left.subquery.arglist = `[[<-`(parent.arglist, refinement.argname, refinement.arg.split[["left"]])
        right.subquery.arglist = `[[<-`(parent.arglist, refinement.argname, refinement.arg.split[["right"]])
        ## TODO look at more efficient / stack-friendly approach looping within the same refinement.argkey.i?  At least remove the right subquery recursion?
        left.subquery.DT = refining_subquery(epidata_endpoint, left.subquery.arglist, refinement.argnames, refinement.keynames, refinement.argkey.i)
        right.subquery.DT = refining_subquery(epidata_endpoint, right.subquery.arglist, refinement.argnames, refinement.keynames, refinement.argkey.i)
        return (rbind(left.subquery.DT, right.subquery.DT))
      }
    }
  }
  ## TODO enforce some type of subquery limit or rate? and at least track?
  ## TODO benchmark processing time
}

## TODO unit tests

## TODO more rlang and tidyselect

refining_query = function(epidata_endpoint, arglist, refinement.argkeys) {
  ## Check refinement.argkeys spec
  refinement.argname.is.empty = rlang::names2(refinement.argkeys) == ""
  names(refinement.argkeys)[refinement.argname.is.empty] <- refinement.argkeys[refinement.argname.is.empty]
  refinement.argnames = names(refinement.argkeys)
  refinement.keynames = unname(refinement.argkeys)
  if (any(c("N","refinement.keyname") %in% refinement.argnames)) {
    stop ('This function is unable to handle "N" or "refinement.keyname" as a refinement argnames.')
  }
  refinement.argnames.invalid = setdiff(refinement.argnames, rlang::fn_fmls_names(epidata_endpoint))
  if (length(refinement.argnames.invalid) != 0L) {
    ## todo use rlang::abort
    stop(sprintf('The following refinement argnames were not actually argnames of the endpoint: %s.  The endpoint argnames were: %s.  Refinement argnames are the names of refinement.argkeys, or the entries of refinement.argkeys where names are missing; perhaps (a) a name was missing and the refinement key does not match the corresponding endpoint arg name or (b) the refinement keyname was provided as the argkey name instead of the refinement argname.', toString(refinement.argnames.invalid), toString(rlang::fn_fmls_names(epidata_endpoint))))
  }
  refinement.argnames.unmatched = setdiff(refinement.argnames, names(arglist))
  if (length(refinement.argnames.unmatched) != 0L) {
    stop (sprintf('The following refinement argnames were not present in the names of the arglist: %s.  The arglist names were: %s.  All refinement args must be provided & named in `arglist`', toString(refinement.argnames), toString(names(arglist))))
    ## xxx vs. try harder with rlang::call_standardise and something for default values
  }
  for (refinement.argname in refinement.argnames) {
    ## todo better error message includine refinement.argname and refinement.arg str representation
    arglist[[refinement.argname]] <- match_refinement_arg(arglist[[refinement.argname]])
  }
  ##
  refining_subquery(epidata_endpoint, arglist, refinement.argnames, refinement.keynames, 1L)
}







latest.response.data =
  cache_fetch("latest_response_data", function() covidcast::covidcast_signal("hhs", "confirmed_admissions_covid_1d", geo_type = "state", geo_values = "*", as_of = analysis.date, end_day=analysis.date))

target.geo.values =
  unique(latest.response.data[["geo_value"]])

response.history.dir = sprintf("hhs_history_as_of_%s", analysis.date)
if (!dir.exists(response.history.dir)) {
  dir.create(response.history.dir)
}

response.history.DT =
  rbindlist(
    lapply(target.geo.values, function(target.geo.value) {
      print(target.geo.value)
      beginning.of.time = format(as.Date("1234-01-01"), "%Y%m%d")
      end.of.time = format(analysis.date + lubridate::years(1000L), "%Y%m%d")
      cache_fetch(file.path(response.history.dir, target.geo.value), function() {
        refining_query(Epidata$covidcast, list("hhs", "confirmed_admissions_covid_1d",
                                               "day", "state",
                                               time_values=Epidata$range(beginning.of.time, end.of.time),
                                               target.geo.value,
                                               analysis.date,
                                               Epidata$range(beginning.of.time, end.of.time)), c("time_values"="time_value"))
      })
    })
  )

@ryantibs
Copy link
Member

Just a heads up that I'm going to close this with #14.

@brookslogan Can you please create a new issue about:

  1. creating the epi_archive class;
  2. implementing the functionality as outlined in the issues.Rmd vignette;
  3. and filling out this vignette?

As we discussed, for this next round, we should just get this all implemented and not worry about efficiency. Then once this is done, we can rewrite the backend for efficiency. So that can also be a second new issue (which could only be completed after the first one), and for that, you can copy-paste relevant parts of your comments here. Thanks!

kenmawer pushed a commit that referenced this issue Jul 5, 2022
Added final newline to epi_archive print statement.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants