Skip to content

Port Jingjing's backcasting preprocessing utils #114

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Authors@R: c(
person("Quang", "Nguyen", role = "ctb"),
person("Evan", "Ray", role = "aut"),
person("Dmitry", "Shemetov", role = "ctb"),
person("Jingjing", "Tang", role = "ctb"),
person("Ryan", "Tibshirani", , "[email protected]", role = c("aut", "cre"))
)
Description: This package introduces a common data structure for epidemiological
Expand All @@ -38,6 +39,7 @@ Imports:
tidyselect,
tsibble,
utils,
zoo,
vctrs
Suggests:
covidcast,
Expand All @@ -58,7 +60,7 @@ Config/testthat/edition: 3
Encoding: UTF-8
LazyData: true
Roxygen: list(markdown = TRUE)
RoxygenNote: 7.2.1
RoxygenNote: 7.2.0
Depends:
R (>= 2.10)
URL: https://cmu-delphi.github.io/epiprocess/
16 changes: 16 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ S3method(summary,epi_df)
S3method(ungroup,epi_df)
S3method(unnest,epi_df)
export("%>%")
export(add_7davs_and_target)
export(add_shift)
export(archive_cases_dv_subset)
export(arrange)
export(as_epi_archive)
Expand All @@ -35,7 +37,11 @@ export(epi_slide)
export(epix_as_of)
export(epix_merge)
export(epix_slide)
export(fill_missing_updates)
export(fill_rows)
export(filter)
export(get_7dav)
export(get_lags)
export(group_by)
export(group_modify)
export(growth_rate)
Expand All @@ -59,7 +65,10 @@ importFrom(data.table,copy)
importFrom(data.table,key)
importFrom(data.table,set)
importFrom(data.table,setkeyv)
importFrom(dplyr,across)
importFrom(dplyr,all_of)
importFrom(dplyr,arrange)
importFrom(dplyr,everything)
importFrom(dplyr,filter)
importFrom(dplyr,group_by)
importFrom(dplyr,group_modify)
Expand All @@ -85,7 +94,14 @@ importFrom(rlang,sym)
importFrom(rlang,syms)
importFrom(stats,cor)
importFrom(stats,median)
importFrom(stats,setNames)
importFrom(tidyr,crossing)
importFrom(tidyr,drop_na)
importFrom(tidyr,fill)
importFrom(tidyr,pivot_longer)
importFrom(tidyr,pivot_wider)
importFrom(tidyr,unnest)
importFrom(tidyselect,eval_select)
importFrom(tidyselect,starts_with)
importFrom(tsibble,as_tsibble)
importFrom(zoo,rollmeanr)
2 changes: 1 addition & 1 deletion R/archive.R
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ epi_archive =
tibble::as_tibble() %>%
dplyr::select(-.data$version) %>%
as_epi_df(geo_type = self$geo_type,
time_type = self$time_type,
time_type = self$time_type,
as_of = max_version,
additional_metadata = c(self$additional_metadata,
other_keys = other_keys))
Expand Down
237 changes: 237 additions & 0 deletions R/revision_patterns.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
### Data Preprocessing
###
### The raw input data should have 4 or 5 basic columns:
### time_value: reference date
### issue_date: issue date/date of reporting
### geo_value: location
### (optional) lag: the number of days between issue date and the reference
### date; if not provided, will be calculated from the issue and reference
### dates.
### counts: the number of counts used for estimation
###
### library(lubridate)
### library(stats)
### library(dyplr)
### library(tidyverse)

#' Create empty obs for every sequentially missing date-lag combo
#'
#' Make sure all reference date have enough rows for updates. Re-index
#' dataframe to insert a row for each missing date between `min_refd`
#' and `max_refd` for every requested lag from 1 to `ref_lag`, filling with `NA`.
#'
#' @param df Data Frame of aggregated counts within a single location
#' reported for each reference date and issue date.
#' @param refd_col column name for the column of reference date
#' @param lag_col column name for the column of lag
#' @param min_refd the earliest reference date considered in the data
#' @param max_refd the latest reference date considered in the data
#' @param ref_lag the maximum lag value through which to complete
#' @param issued_col name of the issue (report) date column
#'
#' @return df_new Data Frame with filled rows for missing lags
#'
#' @importFrom tidyr crossing
#' @importFrom stats setNames
#' @importFrom dplyr mutate
#' @importFrom rlang !!
#'
#' @export
fill_rows <- function(df, refd_col, lag_col, min_refd, max_refd, ref_lag, issued_col){
lags <- get_lags(df, lag_col, refd_col, issued_col)
lag_seq <- min(lags):ref_lag
if (missing(lag_col)) {
lag_col <- "lag"
}

refds <- seq(min_refd, max_refd, by="day") # Full list reference date
row_inds_df <- as.data.frame(crossing(refds, lag_seq)) %>%
setNames(c(refd_col, lag_col))
df_new = merge(
x=mutate(df, !!lag_col := lags), y=row_inds_df,
by=c(refd_col, lag_col),
all.y=TRUE
)
return (df_new)
}

#' Perform LOCF to fill missing values in issues
#'
#' Perform LOCF to fill `NA`s if a group is missing from an issue but was
#' previously available. If there is no update on issue date \eqn{D} but
#' previous reports exist for issue date \eqn{D_p} < \eqn{D}, all the dates
#' between \eqn{[D_p, D]} are filled with the value reported on date \eqn{D_p}.
#' If there is no update for any previous issue date, fill in with 0.
#'
#' @param df Data Frame of aggregated counts within a single location
#' reported for each reference date and issue date.
#' @param value_col column name for the column of counts
#' @param refd_col column name for the column of reference date
#' @param lag_col column name for the column of lag
#' @param issued_col name of the issue (report) date column
#'
#' @importFrom tidyr fill pivot_wider pivot_longer
#' @importFrom dplyr everything select arrange across all_of mutate
#' @importFrom rlang .env !!
#'
#' @export
fill_missing_updates <- function(df, value_col, refd_col, lag_col, issued_col) {
lags <- get_lags(df, lag_col, refd_col, issued_col)
if (missing(lag_col)) {
lag_col <- "lag"
}

# Add lag col to df if using implied lag
# col. If lag col already exists, it is overwritten with the original
# values.
pivot_df <- mutate(df, !!lag_col := lags) %>%
arrange(across(all_of(lag_col))) %>%
pivot_wider(id_cols=lag_col, names_from=refd_col, values_from=value_col)

if (any(diff(pivot_df[[lag_col]])!=1)) {Abort("Risk exists in forward fill")}

pivot_df <- pivot_df %>% fill(everything(), .direction="down")
pivot_df[is.na(pivot_df)] <- 0 # fill NAs with 0s
backfill_df <- pivot_df %>%
pivot_longer(-lag_col, values_to="value_raw", names_to=refd_col)
backfill_df[[refd_col]] = as.Date(backfill_df[[refd_col]])

return (as.data.frame(backfill_df))
}

#' Calculate 7 day moving average for each issue date
#'
#' The 7-day average for date \eqn{D} reported on issue date \eqn{D_i} uses data
#' from \eqn{D-7} to \eqn{D-1}.
#'
#' @param pivot_df Data Frame where the columns are issue dates and the rows are
#' reference dates
#' @param refd_col column name for the column of reference date
#' @param issued_col name of the issue (report) date column; default "issue_date"
#'
#' @importFrom zoo rollmeanr
#' @importFrom tidyr pivot_longer
#'
#' @export
get_7dav <- function(pivot_df, refd_col, issued_col = "issue_date"){
for (col in colnames(pivot_df)){
if (col == refd_col) next
pivot_df[, col] <- rollmeanr(pivot_df[, col], 7, align="right", fill=NA)
}
backfill_df <- pivot_df %>%
pivot_longer(-refd_col, values_to="value_raw", names_to=issued_col)
backfill_df[[refd_col]] = as.Date(backfill_df[[refd_col]])
backfill_df[[issued_col]] = as.Date(backfill_df[[issued_col]])

return (as.data.frame(backfill_df))
}

#' Shift reference dates by `n` days, keeping all other columns the same.
#'
#' @param df Data Frame of aggregated counts within a single location
#' reported for each reference date and issue date.
#' @param n_day number of days to be shifted. A positive value corresponds to
#' a shift forward in time, negative shifts dates backwards in time.
#' @param refd_col column name for the column of reference date
#'
#' @export
add_shift <- function(df, n_day, refd_col){
df[, refd_col] <- as.Date(df[, refd_col]) + n_day

return (df)
}

#' Add 7-day moving average and prediction target to a dataframe
#'
#' Each row must be uniquely identified by a reference date + lag combination.
#' Issue dates are not required and are regenerated from the reference date and
#' lag fields.
#'
#' Targets are updates made `ref_lag` days after the first release.
#'
#' @param df Data Frame of aggregated counts within a single location
#' reported for each reference date and issue date.
#' @param value_col column name for the column of raw value
#' @param refd_col column name for the column of reference date
#' @param lag_col column name for the column of lag
#' @param ref_lag target lag
#' @param issued_col name of the issue (report) date column
#'
#' @importFrom tidyr pivot_wider drop_na
#' @importFrom rlang .env
#'
#' @export
add_7davs_and_target <- function(df, value_col, refd_col, lag_col, ref_lag, issued_col){
if (missing(issued_col) && missing(lag_col)) {
Abort("One of `issued_col` and `lag_col` must be provided and present in the data")
}
if (missing(issued_col)) {
issued_col = "issue_date"
}
if (!(issued_col %in% names(df))) {
df[[issued_col]] <- df[[refd_col]] + df[[lag_col]]
}
if (missing(lag_col)) {
lag_col = "lag"
}
if (!(lag_col %in% names(df))) {
df[[lag_col]] <- get_lags(df, lag_col, refd_col, issued_col)
}

pivot_df <- arrange(df, across(all_of(issued_col))) %>%
pivot_wider(id_cols=refd_col, names_from=issued_col,
values_from=value_col)

# Add 7dav avg
avg_df <- get_7dav(pivot_df, refd_col, issued_col)
# 7dav until yesterday
avg_df <- add_shift(avg_df, 1, refd_col) %>%
rename(value_7dav = .env$value_col)
avg_df_prev7 <- add_shift(avg_df, 7, refd_col) %>%
rename(value_prev_7dav = value_7dav)

backfill_df <- Reduce(function(x, y) merge(x, y, all=TRUE),
list(df, avg_df, avg_df_prev7))

# Add target
target_df <- df[df$lag==ref_lag, ] %>%
select(c(refd_col, value_col, issued_col)) %>%
rename(value_target = .env$value_col, target_date = .env$issued_col)

backfill_df <- merge(backfill_df, target_df, by=refd_col, all.x=TRUE)

# Remove invalid rows
backfill_df <- backfill_df %>% drop_na(c(lag_col))

return (as.data.frame(backfill_df))
}

#' Calculate an implied lag column as the difference between reference
#' date and report date. Meant to be used when `lag_col` is not specified.
#' Don't add field to dataframe directly.
#'
#' @param df Data Frame of aggregated counts within a single location
#' reported for each reference date and issue date.
#' @param refd_col column name for the column of reference date
#' @param lag_col column name for the column of lag
#' @param issued_col name of the issue (report) date column
#'
#' @export
get_lags <- function(df, lag_col, refd_col, issued_col) {
if (missing(lag_col) || !(lag_col %in% names(df))) {
if (missing(refd_col) || missing(issued_col)) {
Abort(paste(
"Either `lag_col`, or both of`refd_col` and `issued_col` names must be ",
"specified to be able to fetch lags"
))
}
if (!all(c(refd_col, issued_col) %in% names(df))) {
Abort("`refd_col` and `issued_col` names not found in data")
}
lags <- df[[issued_col]] - df[[refd_col]]
} else {
lags <- df[[lag_col]]
}

return (lags)
}
30 changes: 30 additions & 0 deletions man/add_7davs_and_target.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions man/add_shift.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions man/as_epi_archive.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading