|
| 1 | +#!/usr/bin/env Rscript |
| 2 | + |
| 3 | +## Combine and compress contingency tables by grouping variable set. |
| 4 | +## |
| 5 | +## Usage: |
| 6 | +## |
| 7 | +## Rscript contingency-combine.R path/to/individual/files/ path/to/rollup/files/ |
| 8 | +## |
| 9 | +## Combines a set of contingency tables with a rollup CSV that contains all |
| 10 | +## dates for a given set of grouping variables. Can also be used to combine a |
| 11 | +## directory of tables spanning multiple time periods. |
| 12 | + |
| 13 | +suppressPackageStartupMessages({ |
| 14 | + library(dplyr) |
| 15 | + library(readr) |
| 16 | + library(purrr) |
| 17 | + library(delphiFacebook) |
| 18 | +}) |
| 19 | + |
| 20 | + |
| 21 | +#' Fetch all tables in a chosen directory. Combine and save according to grouping. |
| 22 | +#' |
| 23 | +#' @param input_dir Directory in which to look for survey CSV files, relative to |
| 24 | +#' the current working directory. |
| 25 | +#' @param output_dir Directory in which to look for existing rollup files or |
| 26 | +#' create new ones, relative to the current working directory. |
| 27 | +#' @param pattern Regular expression indicating which files in that directory to |
| 28 | +#' open. By default, selects all `.csv` files with standard table date prefix. |
| 29 | +run_rollup <- function(input_dir, output_dir, pattern = "^[0-9]{8}_[0-9]{8}.*[.]csv$") { |
| 30 | + if (!dir.exists(output_dir)) { dir.create(output_dir) } |
| 31 | + |
| 32 | + files <- list.files(input_dir, pattern = pattern) |
| 33 | + if (length(files) == 0) { stop("No matching data files.") } |
| 34 | + |
| 35 | + # Get df of input files and corresponding output files. Reformat as a list |
| 36 | + # such that input files with same grouping variables (and thus same output |
| 37 | + # file) are in a character vector named with the output file. |
| 38 | + files <- map_dfr(files, get_file_properties) |
| 39 | + files <- lapply(split(files, files$rollup_name), function(x) {x$filename}) |
| 40 | + |
| 41 | + seen_file <- file.path(output_dir, "seen.txt") |
| 42 | + if ( any(file.exists(names(files))) ) { |
| 43 | + assert(file.exists(seen_file), |
| 44 | + paste0("If any output file exists, ", seen_file, ", listing input ", |
| 45 | + "files previously used in generating a combined table, should also exist")) |
| 46 | + } |
| 47 | + |
| 48 | + for (output_name in names(files)) { |
| 49 | + combined_output <- combine_tables( |
| 50 | + seen_file, |
| 51 | + input_dir, |
| 52 | + files[[output_name]], |
| 53 | + file.path(output_dir, output_name)) |
| 54 | + write_rollup( |
| 55 | + combined_output[["newly_seen_files"]], |
| 56 | + seen_file, |
| 57 | + combined_output[["output_df"]], |
| 58 | + file.path(output_dir, output_name)) |
| 59 | + } |
| 60 | + |
| 61 | + return(NULL) |
| 62 | +} |
| 63 | + |
| 64 | +## Helper function to extract info from each file's filename. |
| 65 | +get_file_properties <- function(filename) { |
| 66 | + short <- strsplit(filename, ".", fixed = TRUE)[[1]][1] |
| 67 | + parts <- strsplit(short, "_", fixed = TRUE)[[1]] |
| 68 | + |
| 69 | + group <- parts[3:length(parts)] |
| 70 | + # Specify compression format via name, to be parsed by `write_csv` later. |
| 71 | + partial_name <- paste0(paste0(group, collapse="_"), ".csv.gz") |
| 72 | + |
| 73 | + return(data.frame( |
| 74 | + filename=filename, |
| 75 | + rollup_name=partial_name)) |
| 76 | +} |
| 77 | + |
| 78 | +## Helper function to load "seen" file. |
| 79 | +load_seen_file <- function(seen_file) { |
| 80 | + if (!file.exists(seen_file)) { |
| 81 | + file.create(seen_file) |
| 82 | + } |
| 83 | + |
| 84 | + seen_files <- readLines(seen_file) |
| 85 | + return(seen_files) |
| 86 | +} |
| 87 | + |
| 88 | +#' Combine data from set of input files with existing output data. |
| 89 | +#' |
| 90 | +#' @param seen_file Path to file listing filenames that have been previously |
| 91 | +#' loaded into an output file. |
| 92 | +#' @param input_dir Directory in which to look for survey CSV files, relative to |
| 93 | +#' the current working directory. |
| 94 | +#' @param input_files Vector of paths to input files that share a set of |
| 95 | +#' grouping variables. |
| 96 | +#' @param output_file Path to corresponding output file. |
| 97 | +#' |
| 98 | +#' @return Named list of combined output dataframe and character vector. |
| 99 | +combine_tables <- function(seen_file, input_dir, input_files, output_file) { |
| 100 | + cols <- cols( |
| 101 | + .default = col_guess(), |
| 102 | + survey_geo = col_character(), |
| 103 | + period_type = col_character(), |
| 104 | + geo_type = col_character(), |
| 105 | + aggregation_type = col_character(), |
| 106 | + country = col_character(), |
| 107 | + ISO_3 = col_character(), |
| 108 | + GID_0 = col_character(), |
| 109 | + region = col_character(), |
| 110 | + GID_1 = col_character(), |
| 111 | + state = col_character(), |
| 112 | + state_fips = col_character(), |
| 113 | + county = col_character(), |
| 114 | + county_fips = col_character() |
| 115 | + ) |
| 116 | + |
| 117 | + # Get input data. |
| 118 | + input_df <- map_dfr( |
| 119 | + file.path(input_dir, input_files), |
| 120 | + function(f) { |
| 121 | + read_csv(f, col_types = cols) |
| 122 | + }) |
| 123 | + |
| 124 | + seen_files <- load_seen_file(seen_file) |
| 125 | + if (any(input_files %in% seen_files)) { |
| 126 | + assert(file.exists(output_file), |
| 127 | + paste0("The output file ", output_file, " does not exist, but non-zero", |
| 128 | + " files using the same grouping variables have been seen before.")) |
| 129 | + } |
| 130 | + |
| 131 | + cols <- cols_condense(spec(input_df)) |
| 132 | + if ( file.exists(output_file) ) { |
| 133 | + output_df <- read_csv(output_file, col_types = cols) |
| 134 | + } else { |
| 135 | + output_df <- input_df[FALSE,] |
| 136 | + } |
| 137 | + |
| 138 | + # Use all columns up to the first non-aggregate column to find unique rows. |
| 139 | + group_names <- names(output_df) |
| 140 | + report_names <- c("val", "se", "sample_size", "represented", "effective_sample_size") |
| 141 | + exclude_patterns <- paste0("^", report_names) |
| 142 | + exclude_map <- grepl(paste(exclude_patterns, collapse="|"), group_names) |
| 143 | + assert( any(exclude_map) , |
| 144 | + "No value-reporting columns are available or their names have changed.") |
| 145 | + |
| 146 | + ind_first_report_col <- min(which(exclude_map)) |
| 147 | + group_names <- group_names[ 1:ind_first_report_col-1 ] |
| 148 | + |
| 149 | + ## Deduplicate, keeping newest version by issue date of each unique row. |
| 150 | + # Merge the new data with the existing data, taking the last issue date for |
| 151 | + # any given grouping/geo level/date combo. This prevents duplication in case |
| 152 | + # of reissues. Note that the order matters: since arrange() uses order(), |
| 153 | + # which is a stable sort, ties will result in the input data being used in |
| 154 | + # preference over the existing rollup data. |
| 155 | + output_df <- bind_rows(output_df, input_df) %>% |
| 156 | + relocate(issue_date, .after=last_col()) %>% |
| 157 | + arrange(issue_date) %>% |
| 158 | + group_by(across(all_of(group_names))) %>% |
| 159 | + slice_tail() %>% |
| 160 | + ungroup() %>% |
| 161 | + arrange(period_start) |
| 162 | + |
| 163 | + newly_seen <- setdiff(input_files, seen_files) |
| 164 | + |
| 165 | + return(list( |
| 166 | + output_df=output_df, |
| 167 | + newly_seen_files=newly_seen)) |
| 168 | +} |
| 169 | + |
| 170 | +#' Save a combined dataframe and list of seen files to disk. |
| 171 | +#' |
| 172 | +#' @param newly_seen_files Character vector. |
| 173 | +#' @param seen_file Path to file listing filenames that have been previously |
| 174 | +#' loaded into an output file. |
| 175 | +#' @param output_df Output dataframe. |
| 176 | +#' @param output_file Path to corresponding output file. |
| 177 | +write_rollup <- function(newly_seen_files, seen_file, output_df, output_file) { |
| 178 | + # Automatically uses gzip compression based on output file name. Overwrites |
| 179 | + # existing file of the same name. |
| 180 | + write_csv(output_df, output_file) |
| 181 | + |
| 182 | + if (length(newly_seen_files) > 0) { |
| 183 | + write(newly_seen_files, seen_file, append=TRUE) |
| 184 | + } |
| 185 | + |
| 186 | + return(NULL) |
| 187 | +} |
| 188 | + |
| 189 | + |
| 190 | +args <- commandArgs(TRUE) |
| 191 | + |
| 192 | +if (length(args) < 2) { |
| 193 | + stop("Usage: Rscript contingency-combine.R path/to/individual/files/ path/to/rollup/files/") |
| 194 | +} |
| 195 | + |
| 196 | +input_path <- args[1] |
| 197 | +output_path <- args[2] |
| 198 | + |
| 199 | +invisible(run_rollup(input_path, output_path)) |
0 commit comments