Skip to content

Commit f2b6c80

Browse files
committed
Separate combine and save steps
1 parent 96b599b commit f2b6c80

File tree

1 file changed

+70
-33
lines changed

1 file changed

+70
-33
lines changed

facebook/contingency-combine.R

Lines changed: 70 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ suppressPackageStartupMessages({
2828
#' @param pattern Regular expression indicating which files in that directory to
2929
#' open. By default, selects all `.csv` files with standard table date prefix.
3030
run_rollup <- function(input_dir, output_dir, pattern = "^[0-9]{8}_[0-9]{8}.*[.]csv$") {
31+
if (!dir.exists(output_dir)) { dir.create(output_dir) }
32+
3133
files <- list.files(input_dir, pattern = pattern)
3234
if (length(files) == 0) {
3335
stop("No matching data files.")
@@ -39,19 +41,21 @@ run_rollup <- function(input_dir, output_dir, pattern = "^[0-9]{8}_[0-9]{8}.*[.]
3941
# (and thus same output file) are in a character vector named with the output
4042
# file.
4143
files <- lapply(split(files, files$rollup_name), function(x) {x$filename})
42-
43-
if (!dir.exists(output_dir)) { dir.create(output_dir) }
44+
4445
seen_file <- file.path(output_dir, "seen.txt")
45-
seen_files <- load_seen_file(seen_file)
46+
if (any(file.exists(names(files)))) {
47+
assert(file.exists(seen_file),
48+
paste0("If any output file exists, ", seen_file, " listing input ",
49+
"files previously used in generating a combined table should also exist"))
50+
}
4651

4752
for (output_name in names(files)) {
48-
newly_seen_files <- combine_and_save_tables(
49-
seen_files,
50-
input_dir,
53+
combined_output <- combine_tables(
54+
seen_file,
55+
input_dir,
5156
files[[output_name]],
5257
file.path(output_dir, output_name))
53-
54-
write(newly_seen_files, seen_file, append=TRUE)
58+
write_rollup(combined_output, seen_file, file.path(output_dir, output_name))
5559
}
5660

5761
return(NULL)
@@ -81,22 +85,22 @@ load_seen_file <- function(seen_file) {
8185
return(seen_files)
8286
}
8387

84-
#' Combine set of input files with existing output file, and save to disk.
88+
#' Combine set of input files with existing output file.
8589
#'
8690
#' If an input filename has been seen before, the input and output data are
87-
#' deduplicated to use the newer set of data. Output is saved in gzip-compressed
88-
#' format.
91+
#' deduplicated to use the newer set of data.
8992
#'
90-
#' @param seen_files Vector of filenames that have been previously loaded into
91-
#' an output file.
93+
#' @param seen_file Path to file listing filenames that have been previously
94+
#' loaded into an output file.
9295
#' @param input_dir Directory in which to look for survey CSV files, relative to
9396
#' the current working directory.
9497
#' @param input_files Vector of paths to input files that share a set of
9598
#' grouping variables.
9699
#' @param output_file Path to corresponding output file.
97100
#'
98-
#' @return Character vector of newly-seen filenames.
99-
combine_and_save_tables <- function(seen_files, input_dir, input_files, output_file) {
101+
#' @return Named list of combined output dataframe, character vector, and
102+
#' boolean.
103+
combine_tables <- function(seen_file, input_dir, input_files, output_file) {
100104
cols <- cols(
101105
.default = col_guess(),
102106
survey_geo = col_character(),
@@ -126,34 +130,31 @@ combine_and_save_tables <- function(seen_files, input_dir, input_files, output_f
126130
output_names <- names(read_csv(output_file, n_max = 0L))
127131
assert(identical(output_names, names(input_df)),
128132
paste0("Column names and/or order differ between new and old input for ", output_file))
129-
}
133+
}
130134

131-
# If no input files have been seen before, we can append directly to the
132-
# output file without needing to deduplicate. File is created if it doesn't
133-
# already exist.
135+
seen_files <- load_seen_file(seen_file)
134136
any_prev_seen <- any(input_files %in% seen_files)
135137

136-
if (!any_prev_seen) {
137-
write_csv(input_df, output_file, append=file.exists(output_file))
138-
} else {
138+
# If no input files have been seen before, we don't need to deduplicate.
139+
if (any_prev_seen) {
139140
assert(file.exists(output_file),
140141
paste0("The output file ", output_file, " does not exist, but non-zero",
141142
" files using the same grouping variables have been seen before."))
142143

143144
output_df <- read_csv(output_file, col_types = cols)
144145

145-
# Use all columns up to the first "val" column to find unique rows.
146+
# Use all columns up to the first non-aggregate column to find unique rows.
146147
group_names <- names(output_df)
147148

148-
include_names <- c("val", "se", "sample_size", "represented", "effective_sample_size")
149-
assert( any(include_names %in% group_names) ,
149+
report_names <- c("val", "se", "sample_size", "represented", "effective_sample_size")
150+
exclude_patterns <- paste0("^", report_names)
151+
exclude_map <- grepl(paste(exclude_patterns, collapse="|"), group_names)
152+
assert( any(exclude_map) ,
150153
"No value-reporting columns are available or their names have changed.")
151154

152-
include_patterns <- paste0("^", include_names)
153-
include_map <- grepl(paste(include_patterns, collapse="|"), group_names)
154-
ind_first_val_col <- min(which(include_map))
155-
156-
group_names <- group_names[ 1:ind_first_val_col-1 ]
155+
ind_first_report_col <- min(which(exclude_map))
156+
157+
group_names <- group_names[ 1:ind_first_report_col-1 ]
157158

158159
## Deduplicate, keeping newest version by issue date of each unique row.
159160
# Merge the new data with the existing data, taking the last issue date for
@@ -167,14 +168,50 @@ combine_and_save_tables <- function(seen_files, input_dir, input_files, output_f
167168
slice_tail() %>%
168169
ungroup() %>%
169170
arrange(period_start)
170-
171+
} else {
172+
output_df <- input_df
173+
}
174+
175+
newly_seen <- setdiff(input_files, seen_files)
176+
177+
return(list(
178+
output_df=output_df,
179+
newly_seen_files=newly_seen,
180+
any_prev_seen=any_prev_seen))
181+
}
182+
183+
#' Save a combined dataframe and list of seen files to disk.
184+
#'
185+
#' Output is saved using compression format specified in output file name (gzip
186+
#' by default).
187+
#'
188+
#' @param combined_output Named list output from `combine_tables`. Contains an
189+
#' `output` dataframe, a list of newly seen files, and a flag indicating
190+
#' whether any input filenames have been seen before.
191+
#' @param seen_file Path to file listing filenames that have been previously
192+
#' loaded into an output file.
193+
#' @param output_file Path to corresponding output file.
194+
write_rollup <- function(combined_output, seen_file, output_file) {
195+
output_df <- combined_output[["output_df"]]
196+
newly_seen_files <- combined_output[["newly_seen_files"]]
197+
any_prev_seen_files <- combined_output[["any_prev_seen"]]
198+
199+
# If some input files have been seen before, overwrite any existing output
200+
# file. If no input files have been seen before, we can append directly to the
201+
# output file. File is created if it doesn't already exist.
202+
if (any_prev_seen_files) {
171203
# Automatically uses gzip compression based on output file name. Overwrites
172204
# existing file of the same name.
173205
write_csv(output_df, output_file)
206+
} else {
207+
write_csv(output_df, output_file, append=file.exists(output_file))
174208
}
175209

176-
newly_seen <- setdiff(input_files, seen_files)
177-
return(newly_seen)
210+
if (length(newly_seen_files) > 0) {
211+
write(newly_seen_files, seen_file, append=TRUE)
212+
}
213+
214+
return(NULL)
178215
}
179216

180217

0 commit comments

Comments
 (0)