Skip to content

Commit f4a8159

Browse files
committed
Always use merge process to simplify logic
1 parent 28737b3 commit f4a8159

File tree

1 file changed

+57
-94
lines changed

1 file changed

+57
-94
lines changed

facebook/contingency-combine.R

Lines changed: 57 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
##
77
## Rscript contingency-combine.R path/to/individual/files/ path/to/rollup/files/
88
##
9-
## Appends a set of newly-generated contingency tables to a rollup CSV that
10-
## contains all dates for a given set of grouping variables. Can also be used to
11-
## combine a directory of tables spanning multiple time periods.
9+
## Combines a set of contingency tables with a rollup CSV that contains all
10+
## dates for a given set of grouping variables. Can also be used to combine a
11+
## directory of tables spanning multiple time periods.
1212

1313
suppressPackageStartupMessages({
1414
library(dplyr)
@@ -18,8 +18,7 @@ suppressPackageStartupMessages({
1818
})
1919

2020

21-
#' Fetch all tables in a chosen directory and combine according to grouping
22-
#' used.
21+
#' Fetch all tables in a chosen directory. Combine and save according to grouping.
2322
#'
2423
#' @param input_dir Directory in which to look for survey CSV files, relative to
2524
#' the current working directory.
@@ -31,22 +30,19 @@ run_rollup <- function(input_dir, output_dir, pattern = "^[0-9]{8}_[0-9]{8}.*[.]
3130
if (!dir.exists(output_dir)) { dir.create(output_dir) }
3231

3332
files <- list.files(input_dir, pattern = pattern)
34-
if (length(files) == 0) {
35-
stop("No matching data files.")
36-
}
33+
if (length(files) == 0) { stop("No matching data files.") }
3734

35+
# Get df of input files and corresponding output files. Reformat as a list
36+
# such that input files with same grouping variables (and thus same output
37+
# file) are in a character vector named with the output file.
3838
files <- map_dfr(files, get_file_properties)
39-
40-
# Reformat files as a list such that input files with same grouping variables
41-
# (and thus same output file) are in a character vector named with the output
42-
# file.
4339
files <- lapply(split(files, files$rollup_name), function(x) {x$filename})
4440

4541
seen_file <- file.path(output_dir, "seen.txt")
46-
if (any(file.exists(names(files)))) {
42+
if ( any(file.exists(names(files))) ) {
4743
assert(file.exists(seen_file),
48-
paste0("If any output file exists, ", seen_file, " listing input ",
49-
"files previously used in generating a combined table should also exist"))
44+
paste0("If any output file exists, ", seen_file, ", listing input ",
45+
"files previously used in generating a combined table, should also exist"))
5046
}
5147

5248
for (output_name in names(files)) {
@@ -55,7 +51,11 @@ run_rollup <- function(input_dir, output_dir, pattern = "^[0-9]{8}_[0-9]{8}.*[.]
5551
input_dir,
5652
files[[output_name]],
5753
file.path(output_dir, output_name))
58-
write_rollup(combined_output, seen_file, file.path(output_dir, output_name))
54+
write_rollup(
55+
combined_output[["newly_seen_files"]],
56+
seen_file,
57+
combined_output[["output_df"]],
58+
file.path(output_dir, output_name))
5959
}
6060

6161
return(NULL)
@@ -85,10 +85,7 @@ load_seen_file <- function(seen_file) {
8585
return(seen_files)
8686
}
8787

88-
#' Combine set of input files with existing output file.
89-
#'
90-
#' If an input filename has been seen before, the input and output data are
91-
#' deduplicated to use the newer set of data.
88+
#' Combine data from set of input files with existing output data.
9289
#'
9390
#' @param seen_file Path to file listing filenames that have been previously
9491
#' loaded into an output file.
@@ -98,8 +95,7 @@ load_seen_file <- function(seen_file) {
9895
#' grouping variables.
9996
#' @param output_file Path to corresponding output file.
10097
#'
101-
#' @return Named list of combined output dataframe, character vector, and
102-
#' boolean.
98+
#' @return Named list of combined output dataframe and character vector.
10399
combine_tables <- function(seen_file, input_dir, input_files, output_file) {
104100
cols <- cols(
105101
.default = col_guess(),
@@ -118,99 +114,69 @@ combine_tables <- function(seen_file, input_dir, input_files, output_file) {
118114
county_fips = col_character()
119115
)
120116

121-
# Get input data. Make sure `issue_date` is last column after combining.
117+
# Get input data. Make sure `issue_date` is the last column after combining.
122118
input_df <- map_dfr(
123119
file.path(input_dir, input_files),
124120
function(f) {
125121
read_csv(f, col_types = cols)
126-
}) %>%
127-
relocate(issue_date, .after=last_col())
128-
129-
if (file.exists(output_file)) {
130-
output_names <- names(read_csv(output_file, n_max = 0L))
131-
identical_names <- identical(output_names, names(input_df))
132-
} else {
133-
identical_names <- TRUE
134-
}
122+
}) %>% relocate(issue_date, .after=last_col())
135123

136124
seen_files <- load_seen_file(seen_file)
137-
any_prev_seen <- any(input_files %in% seen_files)
138-
139-
# If input files have been seen before, we need to deduplicate. If there is a
140-
# mismatch between input and output column names/order, we need to explicitly
141-
# merge input and output data to make sure columns match up correctly.
142-
if (any_prev_seen || !identical_names) {
125+
if (any(input_files %in% seen_files)) {
143126
assert(file.exists(output_file),
144127
paste0("The output file ", output_file, " does not exist, but non-zero",
145128
" files using the same grouping variables have been seen before."))
146-
129+
}
130+
131+
if ( file.exists(output_file) ) {
147132
output_df <- read_csv(output_file, col_types = cols)
148-
149-
# Use all columns up to the first non-aggregate column to find unique rows.
150-
group_names <- names(output_df)
151-
152-
report_names <- c("val", "se", "sample_size", "represented", "effective_sample_size")
153-
exclude_patterns <- paste0("^", report_names)
154-
exclude_map <- grepl(paste(exclude_patterns, collapse="|"), group_names)
155-
assert( any(exclude_map) ,
156-
"No value-reporting columns are available or their names have changed.")
157-
158-
ind_first_report_col <- min(which(exclude_map))
159-
160-
group_names <- group_names[ 1:ind_first_report_col-1 ]
161-
162-
## Deduplicate, keeping newest version by issue date of each unique row.
163-
# Merge the new data with the existing data, taking the last issue date for
164-
# any given grouping/geo level/date combo. This prevents duplication in case
165-
# of reissues. Note that the order matters: since arrange() uses order(),
166-
# which is a stable sort, ties will result in the input data being used in
167-
# preference over the existing rollup data.
168-
output_df <- bind_rows(output_df, input_df) %>%
169-
relocate(issue_date, .after=last_col()) %>%
170-
arrange(issue_date) %>%
171-
group_by(across(all_of(group_names))) %>%
172-
slice_tail() %>%
173-
ungroup() %>%
174-
arrange(period_start)
175133
} else {
176-
output_df <- input_df
134+
output_df <- input_df[FALSE,]
177135
}
178136

137+
# Use all columns up to the first non-aggregate column to find unique rows.
138+
group_names <- names(output_df)
139+
report_names <- c("val", "se", "sample_size", "represented", "effective_sample_size")
140+
exclude_patterns <- paste0("^", report_names)
141+
exclude_map <- grepl(paste(exclude_patterns, collapse="|"), group_names)
142+
assert( any(exclude_map) ,
143+
"No value-reporting columns are available or their names have changed.")
144+
145+
ind_first_report_col <- min(which(exclude_map))
146+
group_names <- group_names[ 1:ind_first_report_col-1 ]
147+
148+
## Deduplicate, keeping newest version by issue date of each unique row.
149+
# Merge the new data with the existing data, taking the last issue date for
150+
# any given grouping/geo level/date combo. This prevents duplication in case
151+
# of reissues. Note that the order matters: since arrange() uses order(),
152+
# which is a stable sort, ties will result in the input data being used in
153+
# preference over the existing rollup data.
154+
output_df <- bind_rows(output_df, input_df) %>%
155+
relocate(issue_date, .after=last_col()) %>%
156+
arrange(issue_date) %>%
157+
group_by(across(all_of(group_names))) %>%
158+
slice_tail() %>%
159+
ungroup() %>%
160+
arrange(period_start)
161+
179162
newly_seen <- setdiff(input_files, seen_files)
180163

181164
return(list(
182165
output_df=output_df,
183-
newly_seen_files=newly_seen,
184-
can_overwrite=any_prev_seen || !identical_names))
166+
newly_seen_files=newly_seen))
185167
}
186168

187169
#' Save a combined dataframe and list of seen files to disk.
188170
#'
189-
#' Output is saved using compression format specified in output file name (gzip
190-
#' by default).
191-
#'
192-
#' @param combined_output Named list output from `combine_tables`. Contains an
193-
#' `output` dataframe, a list of newly seen files, and a flag indicating
194-
#' whether we need to overwrite the existing output file or we can append to
195-
#' it.
171+
#' @param newly_seen_files Character vector.
196172
#' @param seen_file Path to file listing filenames that have been previously
197173
#' loaded into an output file.
174+
#' @param output_df Output dataframe.
198175
#' @param output_file Path to corresponding output file.
199-
write_rollup <- function(combined_output, seen_file, output_file) {
200-
output_df <- combined_output[["output_df"]]
201-
newly_seen_files <- combined_output[["newly_seen_files"]]
202-
can_overwrite <- combined_output[["can_overwrite"]]
203-
204-
# If some input files have been seen before, overwrite any existing output
205-
# file. If no input files have been seen before, we can append directly to the
206-
# output file. File is created if it doesn't already exist.
207-
if (can_overwrite) {
208-
# Automatically uses gzip compression based on output file name. Overwrites
209-
# existing file of the same name.
210-
write_csv(output_df, output_file)
211-
} else {
212-
write_csv(output_df, output_file, append=file.exists(output_file))
213-
}
176+
write_rollup <- function(newly_seen_files, seen_file, output_df, output_file) {
177+
# Automatically uses gzip compression based on output file name. Overwrites
178+
# existing file of the same name.
179+
write_csv(output_df, output_file)
214180

215181
if (length(newly_seen_files) > 0) {
216182
write(newly_seen_files, seen_file, append=TRUE)
@@ -229,7 +195,4 @@ if (length(args) < 2) {
229195
input_path <- args[1]
230196
output_path <- args[2]
231197

232-
input_path <- "~/Downloads/0418_tables/"
233-
output_path <- "~/Downloads/rollup_test_FB_press_conf"
234-
235198
invisible(run_rollup(input_path, output_path))

0 commit comments

Comments
 (0)