Skip to content

Commit 3a425fe

Browse files
authored
Merge pull request #1227 from cmu-delphi/contingency-mem-run-reduction
Runtime improvements to survey pipelines
2 parents 131a90e + 8412fd1 commit 3a425fe

18 files changed

+221
-111
lines changed

facebook/contingency_tables.R

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ library(delphiFacebook)
22

33
params <- read_contingency_params("params.json")
44
run_contingency_tables(params)
5+
message("run_contingency_tables completed successfully")

facebook/delphiFacebook/DESCRIPTION

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ Imports:
2020
lubridate,
2121
data.table,
2222
tibble,
23-
purrr
23+
purrr,
24+
Rcpp
2425
Suggests:
2526
knitr (>= 1.15),
2627
rmarkdown (>= 1.4),
2728
testthat (>= 1.0.1),
2829
covr (>= 2.2.2)
30+
LinkingTo: Rcpp
2931
RoxygenNote: 7.1.1
3032
Encoding: UTF-8

facebook/delphiFacebook/NAMESPACE

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ export(write_contingency_tables)
5555
export(write_data_api)
5656
export(write_individual)
5757
import(data.table)
58+
importFrom(data.table,fread)
5859
importFrom(dplyr,"%>%")
5960
importFrom(dplyr,across)
6061
importFrom(dplyr,all_of)
@@ -107,6 +108,7 @@ importFrom(readr,write_rds)
107108
importFrom(rlang,.data)
108109
importFrom(stats,complete.cases)
109110
importFrom(stats,na.omit)
111+
importFrom(stats,setNames)
110112
importFrom(stats,weighted.mean)
111113
importFrom(stringi,stri_extract)
112114
importFrom(stringi,stri_replace)
@@ -116,5 +118,5 @@ importFrom(stringi,stri_sub)
116118
importFrom(stringi,stri_trans_tolower)
117119
importFrom(stringi,stri_trim)
118120
importFrom(tibble,add_column)
119-
importFrom(tibble,as_tibble)
120121
importFrom(tibble,tribble)
122+
useDynLib(delphiFacebook, .registration = TRUE)
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Generated by using Rcpp::compileAttributes() -> do not edit by hand
2+
# Generator token: 10BE3573-1514-4C36-9D1C-5A225CD40393
3+
4+
is_selected_cpp <- function(responses, target) {
5+
.Call(`_delphiFacebook_is_selected_cpp`, responses, target)
6+
}
7+

facebook/delphiFacebook/R/aggregate.R

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -146,23 +146,27 @@ summarize_indicators <- function(df, crosswalk_data, indicators, geo_level,
146146
#' @param geo_level Name of the geo level (county, state, etc.) for which we are
147147
#' aggregating.
148148
#' @param params Named list of configuration options.
149-
#' @importFrom dplyr mutate filter
149+
#'
150+
#' @importFrom dplyr mutate filter bind_rows
151+
#' @importFrom stats setNames
150152
#' @importFrom rlang .data
151153
summarize_indicators_day <- function(day_df, indicators, target_day, geo_level, params) {
152-
## Prepare outputs.
153-
dfs_out <- list()
154+
## Prepare outputs as list of lists. Saves some time and memory since lists
155+
## are not copied on modify.
154156
geo_ids <- unique(day_df$geo_id)
155-
for (indicator in indicators$name) {
156-
dfs_out[[indicator]] <- tibble(
157-
geo_id = geo_ids,
158-
day = target_day,
159-
val = NA_real_,
160-
se = NA_real_,
161-
sample_size = NA_real_,
162-
effective_sample_size = NA_real_
163-
)
164-
}
165-
157+
n_geo_ids <- length(geo_ids)
158+
fill_list <- list(geo_id = geo_ids,
159+
day = rep(target_day, n_geo_ids),
160+
val = rep(NA_real_, n_geo_ids),
161+
se = rep(NA_real_, n_geo_ids),
162+
sample_size = rep(NA_real_, n_geo_ids),
163+
effective_sample_size = rep(NA_real_, n_geo_ids)
164+
)
165+
166+
dfs_out <- setNames(
167+
rep(list(fill_list), times=length(indicators$name)),
168+
indicators$name)
169+
166170
for (ii in seq_along(geo_ids))
167171
{
168172
target_geo <- geo_ids[ii]
@@ -175,8 +179,10 @@ summarize_indicators_day <- function(day_df, indicators, target_day, geo_level,
175179
var_weight <- indicators$var_weight[row]
176180
compute_fn <- indicators$compute_fn[[row]]
177181

178-
ind_df <- sub_df[!is.na(sub_df[[var_weight]]) & !is.na(sub_df[[metric]]), ]
179-
182+
# Copy only columns we're using.
183+
select_cols <- c(metric, var_weight, "weight_in_location")
184+
ind_df <- sub_df[, select_cols, with=FALSE][!is.na(sub_df[[var_weight]]) & !is.na(sub_df[[metric]]), ]
185+
180186
if (nrow(ind_df) > 0)
181187
{
182188
s_mix_coef <- params$s_mix_coef
@@ -191,13 +197,18 @@ summarize_indicators_day <- function(day_df, indicators, target_day, geo_level,
191197
weight = if (indicators$skip_mixing[row]) { mixing$normalized_preweights } else { mixing$weights },
192198
sample_size = sample_size)
193199

194-
dfs_out[[indicator]]$val[ii] <- new_row$val
195-
dfs_out[[indicator]]$se[ii] <- new_row$se
196-
dfs_out[[indicator]]$sample_size[ii] <- sample_size
197-
dfs_out[[indicator]]$effective_sample_size[ii] <- new_row$effective_sample_size
200+
dfs_out[[indicator]][["val"]][ii] <- new_row$val
201+
dfs_out[[indicator]][["se"]][ii] <- new_row$se
202+
dfs_out[[indicator]][["sample_size"]][ii] <- sample_size
203+
dfs_out[[indicator]][["effective_sample_size"]][ii] <- new_row$effective_sample_size
198204
}
199205
}
200206
}
207+
208+
# Convert list of lists to list of tibbles.
209+
for (indicator in indicators$name) {
210+
dfs_out[[indicator]] <- bind_rows(dfs_out[[indicator]])
211+
}
201212

202213
for (row in seq_len(nrow(indicators))) {
203214
indicator <- indicators$name[row]

facebook/delphiFacebook/R/contingency_aggregate.R

Lines changed: 17 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ produce_aggregates <- function(df, aggregations, cw_list, params) {
3434
## table in sorted order so data.table can use a binary search to find
3535
## matching dates, rather than a linear scan, and is important for very large
3636
## input files.
37-
df <- as.data.table(df)
37+
df <- as.data.table(df)[!is.na(weight), ]
3838
setkeyv(df, "start_dt")
3939

4040
# Keep only obs in desired date range.
@@ -151,15 +151,6 @@ post_process_aggs <- function(df, aggregations, cw_list) {
151151
}
152152

153153
aggregations$geo_level[agg_ind] <- geo_level
154-
155-
# Multiple choice metrics should also be included in the group_by vars
156-
if (startsWith(aggregations$metric[agg_ind], "mc_")) {
157-
if ( !(aggregations$metric[agg_ind] %in%
158-
aggregations$group_by[agg_ind][[1]]) ) {
159-
aggregations$group_by[agg_ind][[1]] <-
160-
c(aggregations$group_by[agg_ind][[1]], aggregations$metric[agg_ind])
161-
}
162-
}
163154
}
164155

165156
# Remove aggregations using unavailable variables.
@@ -320,33 +311,32 @@ summarize_aggs <- function(df, crosswalk_data, aggregations, geo_level, params)
320311
#' being used
321312
#' @param params Named list of configuration options.
322313
#'
323-
#' @importFrom tibble add_column as_tibble
314+
#' @importFrom tibble add_column
324315
#' @importFrom dplyr %>%
316+
#' @importFrom stats setNames
325317
#'
326318
#' @export
327319
summarize_aggregations_group <- function(group_df, aggregations, target_group, geo_level, params) {
328-
## Prepare outputs.
329-
dfs_out <- list()
330-
for (row in seq_along(aggregations$id)) {
331-
aggregation <- aggregations$id[row]
332-
333-
dfs_out[[aggregation]] <- target_group %>%
334-
as.list %>%
335-
as_tibble %>%
336-
add_column(val=NA_real_,
337-
se=NA_real_,
338-
sample_size=NA_real_,
339-
effective_sample_size=NA_real_,
340-
represented=NA_real_)
341-
}
342-
320+
# Prepare outputs.
321+
fill_df <- target_group %>%
322+
add_column(val=NA_real_,
323+
se=NA_real_,
324+
sample_size=NA_real_,
325+
effective_sample_size=NA_real_,
326+
represented=NA_real_)
327+
dfs_out <- setNames(
328+
rep(list(fill_df), times=length(aggregations$id)),
329+
aggregations$id)
330+
343331
for (row in seq_along(aggregations$id)) {
344332
aggregation <- aggregations$id[row]
345333
metric <- aggregations$metric[row]
346334
var_weight <- aggregations$var_weight[row]
347335
compute_fn <- aggregations$compute_fn[[row]]
348336

349-
agg_df <- group_df[!is.na(group_df[[var_weight]]) & !is.na(group_df[[metric]]), ]
337+
# Copy only columns we're using.
338+
select_cols <- c(metric, var_weight, "weight_in_location")
339+
agg_df <- group_df[, select_cols, with=FALSE][!is.na(eval(as.name(metric))), ]
350340

351341
if (nrow(agg_df) > 0) {
352342
s_mix_coef <- params$s_mix_coef

facebook/delphiFacebook/R/contingency_run.R

Lines changed: 12 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,6 @@ run_contingency_tables <- function(params) {
1414
warning(debug_msg)
1515
}
1616

17-
if ( !is.null(params$aggs_in) ) {
18-
if ( !file.exists(params$aggs_in) ) {
19-
stop("requested aggregate-setting file does not exist")
20-
}
21-
22-
# Run non-default aggregates. File should create an object called `aggs`.
23-
source(params$aggs_in)
24-
25-
if ( !exists("aggs") || !inherits(aggs, "data.frame") ) {
26-
stop("external aggregate-setting file must create a dataframe `aggs`")
27-
}
28-
} else {
29-
aggs <- get_aggs()
30-
}
31-
3217
## Set default number of cores for mclapply to the total available number,
3318
## because we are greedy and this will typically run on a server.
3419
if (params$parallel) {
@@ -43,20 +28,14 @@ run_contingency_tables <- function(params) {
4328
}
4429
}
4530

46-
if (params$aggregate_range == "week") {
47-
run_contingency_tables_many_periods(params, aggs$week)
48-
} else if (params$aggregate_range == "month") {
49-
run_contingency_tables_many_periods(params, aggs$month)
50-
} else if (params$aggregate_range == "both") {
51-
params$aggregate_range <- "week"
52-
run_contingency_tables_many_periods(params, aggs$week)
53-
54-
params$aggregate_range <- "month"
55-
run_contingency_tables_many_periods(params, aggs$month)
56-
} else {
31+
aggs <- get_aggs()
32+
33+
if ( length(params[["aggregate_range"]]) != 1 || !(params$aggregate_range %in% c("week", "month")) ) {
5734
stop(paste0("aggregate_range setting must be provided in params and be one",
58-
" of 'week', 'month', or 'both'"))
35+
" of 'week' or 'month'"))
5936
}
37+
38+
run_contingency_tables_many_periods(params, aggs[[params$aggregate_range]])
6039
}
6140

6241

@@ -82,17 +61,18 @@ run_contingency_tables_many_periods <- function(params, aggregations)
8261
{
8362
if (!is.null(params$n_periods)) {
8463
msg_plain(paste0("Producing CSVs for ", params$n_periods, " time periods"))
85-
64+
65+
params$end_date <- ifelse(
66+
is.null(params$end_date), as.character(Sys.Date()), params$end_date
67+
)
68+
69+
# Make list of dates to aggregate over.
8670
if (params$aggregate_range == "month") {
8771
period_step <- months(1)
8872
} else {
8973
period_step <- days(7)
9074
}
9175

92-
params$end_date <- ifelse(
93-
is.null(params$end_date), as.character(Sys.Date()), params$end_date
94-
)
95-
# Make list of dates to aggregate over.
9676
end_dates <- as.character(sort(
9777
ymd(params$end_date) - period_step * seq(0, params$n_periods - 1)
9878
))

facebook/delphiFacebook/R/contingency_variables.R

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -180,16 +180,16 @@ code_health <- function(input_data, wave) {
180180
if ("C1" %in% names(input_data)) {
181181
comorbidities <- split_options(input_data$C1)
182182

183-
input_data$comorbidheartdisease <- is_selected(comorbidities, 3)
184-
input_data$comorbidcancer <- is_selected(comorbidities, 2)
185-
input_data$comorbidkidneydisease <- is_selected(comorbidities, 7)
186-
input_data$comorbidlungdisease <- is_selected(comorbidities, 6)
183+
input_data$comorbidheartdisease <- is_selected(comorbidities, "3")
184+
input_data$comorbidcancer <- is_selected(comorbidities, "2")
185+
input_data$comorbidkidneydisease <- is_selected(comorbidities, "7")
186+
input_data$comorbidlungdisease <- is_selected(comorbidities, "6")
187187
input_data$comorbiddiabetes <-
188-
is_selected(comorbidities, 1) |
189-
is_selected(comorbidities, 12) |
190-
is_selected(comorbidities, 10)
191-
input_data$comorbidimmuno <- is_selected(comorbidities, 11)
192-
input_data$comorbidobese <- is_selected(comorbidities, 13)
188+
is_selected(comorbidities, "1") |
189+
is_selected(comorbidities, "12") |
190+
is_selected(comorbidities, "10")
191+
input_data$comorbidimmuno <- is_selected(comorbidities, "11")
192+
input_data$comorbidobese <- is_selected(comorbidities, "13")
193193

194194
# Combo vaccine-eligibility
195195
input_data$eligible <-

facebook/delphiFacebook/R/contingency_write.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,8 @@ add_geo_vars <- function(data, params, geo_type) {
114114

115115
# Insert the geographic variables in place of the "geo_id" variable.
116116
index <- which(names(data) == "geo_id")
117-
before <- if (index > 1) data[1:(index-1)] else NULL
118-
after <- data[(index+1):ncol(data)]
117+
before <- if (index > 1) data[, 1:(index-1)] else NULL
118+
after <- data[, (index+1):ncol(data)]
119119
result <- bind_cols(before, geo_vars, after)
120120

121121
return(result)

facebook/delphiFacebook/R/dates.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Time zone to use throughout package.
22
tz_to <- "America/Los_Angeles"
3-
wave6_mod_date <- ymd("2021-01-06", tz=tz_to)
3+
wave6_mod_date <- lubridate::ymd("2021-01-06", tz=tz_to)
44

55
#' Get the date of the first day of the previous month.
66
#'

facebook/delphiFacebook/R/responses.R

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,10 @@
1717
#' @importFrom parallel mclapply
1818
#' @export
1919
load_responses_all <- function(params, contingency_run = FALSE) {
20-
input_data <- vector("list", length(params$input))
21-
2220
msg_plain(paste0("Loading ", length(params$input), " CSVs"))
2321

2422
map_fn <- if (params$parallel) { mclapply } else { lapply }
25-
input_data <- map_fn(seq_along(input_data), function(i) {
23+
input_data <- map_fn(seq_along(params$input), function(i) {
2624
load_response_one(params$input[i], params, contingency_run)
2725
})
2826

@@ -58,7 +56,7 @@ load_response_one <- function(input_filename, params, contingency_run) {
5856

5957
col_names <- stri_split(read_lines(full_path, n_max = 1L), fixed = ",")[[1]]
6058
col_names <- stri_replace_all(col_names, "", fixed = "\"")
61-
59+
6260
## The CSVs have some columns with column-separated fields showing which of
6361
## multiple options a user selected; readr would interpret these as thousand
6462
## separators by default, so we tell it that no thousands separators are used.
@@ -363,7 +361,7 @@ filter_data_for_aggregation <- function(df, params, lead_days = 12L)
363361
dplyr::between(.data$hh_number_sick, 0L, 30L),
364362
dplyr::between(.data$hh_number_total, 1L, 30L),
365363
.data$hh_number_sick <= .data$hh_number_total,
366-
.data$day >= (as.Date(params$start_date) - lead_days),
364+
.data$day >= (as.Date(params$start_date) - lead_days)
367365
)
368366

369367
msg_plain(paste0("Finished filtering data for aggregations"))
@@ -612,7 +610,7 @@ surveyID_to_wave <- Vectorize(function(surveyID) {
612610
"SV_6PADB8DyF9SIyXk" = 10,
613611
"SV_4VEaeffqQtDo33M" = 11)
614612

615-
if (surveyID %in% names(waves)) {
613+
if ( any(names(waves) == surveyID) ) {
616614
return(waves[[surveyID]])
617615
}
618616

0 commit comments

Comments
 (0)