From db6c4bfdf30a020e9968a5bd6e7e82fbc2148106 Mon Sep 17 00:00:00 2001 From: Nat DeFries <42820733+nmdefries@users.noreply.github.com> Date: Sat, 1 May 2021 11:39:43 -0400 Subject: [PATCH 01/12] Create script to combine contingency tables with shared grouping vars --- facebook/contingency-combine.R | 144 +++++++++++++++++++++++++++++++++ 1 file changed, 144 insertions(+) create mode 100644 facebook/contingency-combine.R diff --git a/facebook/contingency-combine.R b/facebook/contingency-combine.R new file mode 100644 index 000000000..44a6fa6ea --- /dev/null +++ b/facebook/contingency-combine.R @@ -0,0 +1,144 @@ +#!/usr/bin/env Rscript + +## Combine and compress contingency tables by aggregation. +## +## Usage: +## +## Rscript contingency-combine.R path/to/individual/files/ path/to/rollup/files/ +## +## Appends a set of newly-generated contingency tables to a rollup CSV that +## contains all dates for a given set of grouping variables. Can also be used to +## combine a directory of tables spanning multiple time periods. + +suppressPackageStartupMessages({ + library(dplyr) + library(readr) + library(purrr) +}) + + +#' Fetch all tables in a chosen directory and combine according to grouping +#' used. +#' +#' @param input_dir Directory in which to look for survey CSV files, relative to +#' the current working directory. +#' @param output_dir Directory in which to look for existing rollup files or +#' create new ones, relative to the current working directory. +#' @param pattern Regular expression indicating which files in that directory to +#' open. By default, selects all `.csv` files with standard table date prefix. +run_rollup <- function(input_dir, output_dir, pattern = "^[0-9]{8}_[0-9]{8}.*[.]csv$") { + files <- list.files(input_dir, pattern = pattern) + + if (length(files) == 0) { + stop("No matching data files.") + } + + files <- map_dfr(files, get_file_properties) + + # Reformat files as a list such that input files with same grouping variables + # (and thus same output file) are in a character vector named with the output + # file. + files <- lapply(split(files, files$rollupname), function(x) {x$filename}) + + for (output_name in names(files)) { + combine_and_save_tables( + file.path(input_dir, files[[output_name]]), + file.path(output_dir, output_name)) + } + + return(NULL) +} + +## Helper function to extract info from each file's filename. +get_file_properties <- function(filename) { + short <- strsplit(filename, ".", fixed = TRUE)[[1]][1] + parts <- strsplit(short, "_", fixed = TRUE)[[1]] + + group <- parts[3:length(parts)] + # Specify compression format in name, to be parsed by `write_csv` later. + partialname <- paste0(paste0(group, collapse="_"), ".csv.gz") + + return(data.frame( + filename=filename, + rollupname=partialname)) +} + +#' Combine set of input files with existing output file, and save to disk. +#' +#' If a date range has been seen before, the input and output data are +#' deduplicated to use the newer set of data. Output is saved in gzip-compressed +#' format. +#' +#' @param input_files Vector of paths to input files that share a set of +#' grouping variables. +#' @param output_file Path to corresponding output file. +combine_and_save_tables <- function(input_files, output_file) { + cols <- cols( + .default = col_guess(), + survey_geo = col_character(), + period_type = col_character(), + geo_type = col_character(), + aggregation_type = col_character(), + country = col_character(), + ISO_3 = col_character(), + GID_0 = col_character(), + region = col_character(), + GID_1 = col_character(), + state = col_character(), + state_fips = col_character(), + county = col_character(), + county_fips = col_character() + ) + + input_df <- map_dfr( + input_files, + function(f) { + read_csv(f, col_types = cols) + } + ) + + if (!file.exists(output_file)) { + warning(paste0("Output file ", output_file, " does not exist. Creating a new copy.")) + # Create an empty starting df with the expected column names, order, and type. + output_df <- input_df[FALSE,] + } else { + output_df <- read_csv(output_file, col_types = cols) + } + + # For finding unique group/geo-level/date combinations, use all columns up to + # the first "val" column. This generalizes the process of finding unique rows, + # when we might be using different grouping variables or different geo levels + # (county/state/nation appear in different columns). + group_names <- names(output_df) + group_names <- group_names[ 1:min(which(startsWith(group_names, "val_")))-1 ] + + ## Deduplicate, keeping newest version by issue date of each unique row. + # Merge the new data with the existing data, taking the last issue date for + # any given grouping/geo level/date combo. This prevents duplication in case + # of reissues. Note that the order matters: since arrange() uses order(), + # which is a stable sort, ties will result in the input data being used in + # preference over the existing rollup data. + output_df <- bind_rows(output_df, input_df) %>% + arrange(issue_date) %>% + group_by(across(all_of(group_names))) %>% + slice_tail() %>% + ungroup() + + # Automatically uses gzip compression based on output name. + write_csv(output_df, output_file) + + return(NULL) +} + + + +args <- commandArgs(TRUE) + +if (length(args) < 2) { + stop("Usage: Rscript contingency-combine.R path/to/individual/files/ path/to/rollup/files/") +} + +input_path <- args[1] +output_path <- args[2] + +invisible(run_rollup(input_path, output_path)) From e43fc130a0aacaec0aa57cdb0777fbe4891fccfb Mon Sep 17 00:00:00 2001 From: Nat DeFries <42820733+nmdefries@users.noreply.github.com> Date: Mon, 3 May 2021 18:06:12 -0400 Subject: [PATCH 02/12] Allow direct append if never seen file --- facebook/contingency-combine.R | 113 ++++++++++++++++++++++----------- 1 file changed, 75 insertions(+), 38 deletions(-) diff --git a/facebook/contingency-combine.R b/facebook/contingency-combine.R index 44a6fa6ea..9aafe0c69 100644 --- a/facebook/contingency-combine.R +++ b/facebook/contingency-combine.R @@ -14,6 +14,7 @@ suppressPackageStartupMessages({ library(dplyr) library(readr) library(purrr) + library(delphiFacebook) }) @@ -28,7 +29,6 @@ suppressPackageStartupMessages({ #' open. By default, selects all `.csv` files with standard table date prefix. run_rollup <- function(input_dir, output_dir, pattern = "^[0-9]{8}_[0-9]{8}.*[.]csv$") { files <- list.files(input_dir, pattern = pattern) - if (length(files) == 0) { stop("No matching data files.") } @@ -38,12 +38,21 @@ run_rollup <- function(input_dir, output_dir, pattern = "^[0-9]{8}_[0-9]{8}.*[.] # Reformat files as a list such that input files with same grouping variables # (and thus same output file) are in a character vector named with the output # file. - files <- lapply(split(files, files$rollupname), function(x) {x$filename}) + files <- lapply(split(files, files$rollup_name), function(x) {x$filename}) + + if (!dir.exists(output_dir)) { dir.create(output_dir) } + seen_file <- file.path(output_dir, "seen.txt") + seen_files <- load_seen_file(seen_file) for (output_name in names(files)) { - combine_and_save_tables( - file.path(input_dir, files[[output_name]]), + browser + newly_seen_files <- combine_and_save_tables( + seen_files, + input_dir, + files[[output_name]], file.path(output_dir, output_name)) + browser() + write(newly_seen_files, seen_file, append=TRUE) } return(NULL) @@ -55,24 +64,40 @@ get_file_properties <- function(filename) { parts <- strsplit(short, "_", fixed = TRUE)[[1]] group <- parts[3:length(parts)] - # Specify compression format in name, to be parsed by `write_csv` later. - partialname <- paste0(paste0(group, collapse="_"), ".csv.gz") + # Specify compression format via name, to be parsed by `write_csv` later. + partial_name <- paste0(paste0(group, collapse="_"), ".csv.gz") return(data.frame( filename=filename, - rollupname=partialname)) + rollup_name=partial_name)) +} + +## Helper function to load "seen" file. +load_seen_file <- function(seen_file) { + if (!file.exists(seen_file)) { + file.create(seen_file) + } + + seen_files <- readLines(seen_file) + return(seen_files) } #' Combine set of input files with existing output file, and save to disk. #' -#' If a date range has been seen before, the input and output data are +#' If an input filename has been seen before, the input and output data are #' deduplicated to use the newer set of data. Output is saved in gzip-compressed #' format. #' +#' @param seen_files Vector of filenames that have been previously loaded into +#' an output file. +#' @param input_dir Directory in which to look for survey CSV files, relative to +#' the current working directory. #' @param input_files Vector of paths to input files that share a set of #' grouping variables. #' @param output_file Path to corresponding output file. -combine_and_save_tables <- function(input_files, output_file) { +#' +#' @return Character vector of newly-seen filenames. +combine_and_save_tables <- function(seen_files, input_dir, input_files, output_file) { cols <- cols( .default = col_guess(), survey_geo = col_character(), @@ -90,48 +115,60 @@ combine_and_save_tables <- function(input_files, output_file) { county_fips = col_character() ) + # Get input data. input_df <- map_dfr( - input_files, + file.path(input_dir, input_files), function(f) { read_csv(f, col_types = cols) } ) - if (!file.exists(output_file)) { - warning(paste0("Output file ", output_file, " does not exist. Creating a new copy.")) - # Create an empty starting df with the expected column names, order, and type. - output_df <- input_df[FALSE,] - } else { - output_df <- read_csv(output_file, col_types = cols) + if (file.exists(output_file)) { + output_names <- names(read_csv(output_file, n_max = 0L)) + assert(identical(output_names, names(input_df)), + paste0("Column names and/or order differ between new and old input for ", output_file)) } - # For finding unique group/geo-level/date combinations, use all columns up to - # the first "val" column. This generalizes the process of finding unique rows, - # when we might be using different grouping variables or different geo levels - # (county/state/nation appear in different columns). - group_names <- names(output_df) - group_names <- group_names[ 1:min(which(startsWith(group_names, "val_")))-1 ] + # If no input files have been seen before, we can append directly to the + # output file without needing to deduplicate. File is created if it doesn't + # already exist. + any_prev_seen <- any(input_files %in% seen_files) - ## Deduplicate, keeping newest version by issue date of each unique row. - # Merge the new data with the existing data, taking the last issue date for - # any given grouping/geo level/date combo. This prevents duplication in case - # of reissues. Note that the order matters: since arrange() uses order(), - # which is a stable sort, ties will result in the input data being used in - # preference over the existing rollup data. - output_df <- bind_rows(output_df, input_df) %>% - arrange(issue_date) %>% - group_by(across(all_of(group_names))) %>% - slice_tail() %>% - ungroup() - - # Automatically uses gzip compression based on output name. - write_csv(output_df, output_file) + if (!any_prev_seen) { + write_csv(input_df, output_file, append=file.exists(output_file)) + } else { + assert(file.exists(output_file), + paste0("The output file ", output_file, " does not exist, but ", + "non-zero files using the same grouping have been seen before.")) + + output_df <- read_csv(output_file, col_types = cols) + + # Use all columns up to the first "val" column to find unique rows. + group_names <- names(output_df) + ind_first_val_col <- min(which(startsWith(group_names, "val_"))) + group_names <- group_names[ 1:ind_first_val_col-1 ] + + ## Deduplicate, keeping newest version by issue date of each unique row. + # Merge the new data with the existing data, taking the last issue date for + # any given grouping/geo level/date combo. This prevents duplication in case + # of reissues. Note that the order matters: since arrange() uses order(), + # which is a stable sort, ties will result in the input data being used in + # preference over the existing rollup data. + output_df <- bind_rows(output_df, input_df) %>% + arrange(issue_date) %>% + group_by(across(all_of(group_names))) %>% + slice_tail() %>% + ungroup() + + # Automatically uses gzip compression based on output file name. + write_csv(output_df, output_file) + } - return(NULL) + newly_seen <- setdiff(input_files, seen_files) + return(newly_seen) } - args <- commandArgs(TRUE) if (length(args) < 2) { From 4b445bc96880d5ee5b69a5b76e737f799311051d Mon Sep 17 00:00:00 2001 From: Nat DeFries <42820733+nmdefries@users.noreply.github.com> Date: Tue, 4 May 2021 16:06:45 -0400 Subject: [PATCH 03/12] Sort output by date; comment cleanup --- facebook/contingency-combine.R | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/facebook/contingency-combine.R b/facebook/contingency-combine.R index 9aafe0c69..0ae5d3bbc 100644 --- a/facebook/contingency-combine.R +++ b/facebook/contingency-combine.R @@ -1,6 +1,6 @@ #!/usr/bin/env Rscript -## Combine and compress contingency tables by aggregation. +## Combine and compress contingency tables by grouping variable set. ## ## Usage: ## @@ -45,13 +45,12 @@ run_rollup <- function(input_dir, output_dir, pattern = "^[0-9]{8}_[0-9]{8}.*[.] seen_files <- load_seen_file(seen_file) for (output_name in names(files)) { - browser newly_seen_files <- combine_and_save_tables( seen_files, input_dir, files[[output_name]], file.path(output_dir, output_name)) - browser() + write(newly_seen_files, seen_file, append=TRUE) } @@ -138,8 +137,8 @@ combine_and_save_tables <- function(seen_files, input_dir, input_files, output_f write_csv(input_df, output_file, append=file.exists(output_file)) } else { assert(file.exists(output_file), - paste0("The output file ", output_file, " does not exist, but ", - "non-zero files using the same grouping have been seen before.")) + paste0("The output file ", output_file, " does not exist, but non-zero", + " files using the same grouping variables have been seen before.")) output_df <- read_csv(output_file, col_types = cols) @@ -158,9 +157,11 @@ combine_and_save_tables <- function(seen_files, input_dir, input_files, output_f arrange(issue_date) %>% group_by(across(all_of(group_names))) %>% slice_tail() %>% - ungroup() + ungroup() %>% + arrange(period_start) - # Automatically uses gzip compression based on output file name. + # Automatically uses gzip compression based on output file name. Overwrites + # existing file of the same name. write_csv(output_df, output_file) } From 0ead60dfb4a0584e34294fa946eefe0c28a666fe Mon Sep 17 00:00:00 2001 From: Kathryn M Mazaitis Date: Wed, 5 May 2021 12:38:08 -0400 Subject: [PATCH 04/12] Change max age of google-symptoms to 6 days --- ansible/templates/sir_complainsalot-params-prod.json.j2 | 2 +- sir_complainsalot/params.json.template | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ansible/templates/sir_complainsalot-params-prod.json.j2 b/ansible/templates/sir_complainsalot-params-prod.json.j2 index 8a0553638..5a0302159 100644 --- a/ansible/templates/sir_complainsalot-params-prod.json.j2 +++ b/ansible/templates/sir_complainsalot-params-prod.json.j2 @@ -16,7 +16,7 @@ "maintainers": ["U01AP8GSWG3","U01069KCRS7"] }, "google-symptoms": { - "max_age": 11, + "max_age": 6, "maintainers": ["U01AP8GSWG3","U01069KCRS7"] }, "usa-facts": { diff --git a/sir_complainsalot/params.json.template b/sir_complainsalot/params.json.template index 49408b486..916d48f5b 100644 --- a/sir_complainsalot/params.json.template +++ b/sir_complainsalot/params.json.template @@ -17,7 +17,7 @@ "maintainers": ["U01AP8GSWG3","U01069KCRS7"] }, "google-symptoms": { - "max_age": 11, + "max_age": 6, "maintainers": ["U01AP8GSWG3","U01069KCRS7"] }, "usa-facts": { From 96b599b4edfa83f53f5d8ca28b7b2d83291cda04 Mon Sep 17 00:00:00 2001 From: Nat DeFries <42820733+nmdefries@users.noreply.github.com> Date: Wed, 5 May 2021 16:42:34 -0400 Subject: [PATCH 05/12] Exclude all possible output cols in finding unique rows Generalize code that selects columns to use in finding unique rows. Now uses all columns up to but not including the first column that starts with "val_", "se_", etc (any column name that we do or can report in the contingency tables). Discuss column ordering/naming behavior in contingency pipeline comments. --- facebook/contingency-combine.R | 10 +++++++++- facebook/delphiFacebook/R/contingency_aggregate.R | 2 ++ facebook/delphiFacebook/R/contingency_write.R | 3 ++- .../delphiFacebook/man/write_contingency_tables.Rd | 3 ++- 4 files changed, 15 insertions(+), 3 deletions(-) diff --git a/facebook/contingency-combine.R b/facebook/contingency-combine.R index 0ae5d3bbc..2563e5d96 100644 --- a/facebook/contingency-combine.R +++ b/facebook/contingency-combine.R @@ -144,7 +144,15 @@ combine_and_save_tables <- function(seen_files, input_dir, input_files, output_f # Use all columns up to the first "val" column to find unique rows. group_names <- names(output_df) - ind_first_val_col <- min(which(startsWith(group_names, "val_"))) + + include_names <- c("val", "se", "sample_size", "represented", "effective_sample_size") + assert( any(include_names %in% group_names) , + "No value-reporting columns are available or their names have changed.") + + include_patterns <- paste0("^", include_names) + include_map <- grepl(paste(include_patterns, collapse="|"), group_names) + ind_first_val_col <- min(which(include_map)) + group_names <- group_names[ 1:ind_first_val_col-1 ] ## Deduplicate, keeping newest version by issue date of each unique row. diff --git a/facebook/delphiFacebook/R/contingency_aggregate.R b/facebook/delphiFacebook/R/contingency_aggregate.R index 946a40a37..d73c4efc2 100644 --- a/facebook/delphiFacebook/R/contingency_aggregate.R +++ b/facebook/delphiFacebook/R/contingency_aggregate.R @@ -75,6 +75,8 @@ produce_aggregates <- function(df, aggregations, cw_list, params) { ## To display other response columns ("val", "sample_size", "se", ## "effective_sample_size", "represented"), add here. + # If these names change (e.g. `sample_size` to `n`), update + # `contingency-combine.R`. keep_vars <- c("val", "se", "sample_size", "represented") for (agg_id in names(dfs_out)) { diff --git a/facebook/delphiFacebook/R/contingency_write.R b/facebook/delphiFacebook/R/contingency_write.R index d9ed2e0ce..1acfd6be2 100644 --- a/facebook/delphiFacebook/R/contingency_write.R +++ b/facebook/delphiFacebook/R/contingency_write.R @@ -1,7 +1,8 @@ #' Write csv file for sharing with researchers. #' #' CSV name includes date specifying start of time period aggregated, geo level, -#' and grouping variables. +#' and grouping variables. These columns are always inserted first in a table, +#' with reported values (value, standard error, sample size, etc) following. #' #' @param data a data frame to save; must contain the columns in #' `groupby_vars`. diff --git a/facebook/delphiFacebook/man/write_contingency_tables.Rd b/facebook/delphiFacebook/man/write_contingency_tables.Rd index b31c98e1a..ce687becd 100644 --- a/facebook/delphiFacebook/man/write_contingency_tables.Rd +++ b/facebook/delphiFacebook/man/write_contingency_tables.Rd @@ -24,5 +24,6 @@ calculate aggregations; used for naming the output file} } \description{ CSV name includes date specifying start of time period aggregated, geo level, -and grouping variables. +and grouping variables. These columns are always inserted first in a table, +with reported values (value, standard error, sample size, etc) following. } From f2b6c80319616d9d98d89e5128f56f16df931e8a Mon Sep 17 00:00:00 2001 From: Nat DeFries <42820733+nmdefries@users.noreply.github.com> Date: Wed, 5 May 2021 17:52:37 -0400 Subject: [PATCH 06/12] Separate combine and save steps --- facebook/contingency-combine.R | 103 ++++++++++++++++++++++----------- 1 file changed, 70 insertions(+), 33 deletions(-) diff --git a/facebook/contingency-combine.R b/facebook/contingency-combine.R index 2563e5d96..612dbc920 100644 --- a/facebook/contingency-combine.R +++ b/facebook/contingency-combine.R @@ -28,6 +28,8 @@ suppressPackageStartupMessages({ #' @param pattern Regular expression indicating which files in that directory to #' open. By default, selects all `.csv` files with standard table date prefix. run_rollup <- function(input_dir, output_dir, pattern = "^[0-9]{8}_[0-9]{8}.*[.]csv$") { + if (!dir.exists(output_dir)) { dir.create(output_dir) } + files <- list.files(input_dir, pattern = pattern) if (length(files) == 0) { stop("No matching data files.") @@ -39,19 +41,21 @@ run_rollup <- function(input_dir, output_dir, pattern = "^[0-9]{8}_[0-9]{8}.*[.] # (and thus same output file) are in a character vector named with the output # file. files <- lapply(split(files, files$rollup_name), function(x) {x$filename}) - - if (!dir.exists(output_dir)) { dir.create(output_dir) } + seen_file <- file.path(output_dir, "seen.txt") - seen_files <- load_seen_file(seen_file) + if (any(file.exists(names(files)))) { + assert(file.exists(seen_file), + paste0("If any output file exists, ", seen_file, " listing input ", + "files previously used in generating a combined table should also exist")) + } for (output_name in names(files)) { - newly_seen_files <- combine_and_save_tables( - seen_files, - input_dir, + combined_output <- combine_tables( + seen_file, + input_dir, files[[output_name]], file.path(output_dir, output_name)) - - write(newly_seen_files, seen_file, append=TRUE) + write_rollup(combined_output, seen_file, file.path(output_dir, output_name)) } return(NULL) @@ -81,22 +85,22 @@ load_seen_file <- function(seen_file) { return(seen_files) } -#' Combine set of input files with existing output file, and save to disk. +#' Combine set of input files with existing output file. #' #' If an input filename has been seen before, the input and output data are -#' deduplicated to use the newer set of data. Output is saved in gzip-compressed -#' format. +#' deduplicated to use the newer set of data. #' -#' @param seen_files Vector of filenames that have been previously loaded into -#' an output file. +#' @param seen_file Path to file listing filenames that have been previously +#' loaded into an output file. #' @param input_dir Directory in which to look for survey CSV files, relative to #' the current working directory. #' @param input_files Vector of paths to input files that share a set of #' grouping variables. #' @param output_file Path to corresponding output file. #' -#' @return Character vector of newly-seen filenames. -combine_and_save_tables <- function(seen_files, input_dir, input_files, output_file) { +#' @return Named list of combined output dataframe, character vector, and +#' boolean. +combine_tables <- function(seen_file, input_dir, input_files, output_file) { cols <- cols( .default = col_guess(), survey_geo = col_character(), @@ -126,34 +130,31 @@ combine_and_save_tables <- function(seen_files, input_dir, input_files, output_f output_names <- names(read_csv(output_file, n_max = 0L)) assert(identical(output_names, names(input_df)), paste0("Column names and/or order differ between new and old input for ", output_file)) - } + } - # If no input files have been seen before, we can append directly to the - # output file without needing to deduplicate. File is created if it doesn't - # already exist. + seen_files <- load_seen_file(seen_file) any_prev_seen <- any(input_files %in% seen_files) - if (!any_prev_seen) { - write_csv(input_df, output_file, append=file.exists(output_file)) - } else { + # If no input files have been seen before, we don't need to deduplicate. + if (any_prev_seen) { assert(file.exists(output_file), paste0("The output file ", output_file, " does not exist, but non-zero", " files using the same grouping variables have been seen before.")) output_df <- read_csv(output_file, col_types = cols) - # Use all columns up to the first "val" column to find unique rows. + # Use all columns up to the first non-aggregate column to find unique rows. group_names <- names(output_df) - include_names <- c("val", "se", "sample_size", "represented", "effective_sample_size") - assert( any(include_names %in% group_names) , + report_names <- c("val", "se", "sample_size", "represented", "effective_sample_size") + exclude_patterns <- paste0("^", report_names) + exclude_map <- grepl(paste(exclude_patterns, collapse="|"), group_names) + assert( any(exclude_map) , "No value-reporting columns are available or their names have changed.") - include_patterns <- paste0("^", include_names) - include_map <- grepl(paste(include_patterns, collapse="|"), group_names) - ind_first_val_col <- min(which(include_map)) - - group_names <- group_names[ 1:ind_first_val_col-1 ] + ind_first_report_col <- min(which(exclude_map)) + + group_names <- group_names[ 1:ind_first_report_col-1 ] ## Deduplicate, keeping newest version by issue date of each unique row. # Merge the new data with the existing data, taking the last issue date for @@ -167,14 +168,50 @@ combine_and_save_tables <- function(seen_files, input_dir, input_files, output_f slice_tail() %>% ungroup() %>% arrange(period_start) - + } else { + output_df <- input_df + } + + newly_seen <- setdiff(input_files, seen_files) + + return(list( + output_df=output_df, + newly_seen_files=newly_seen, + any_prev_seen=any_prev_seen)) +} + +#' Save a combined dataframe and list of seen files to disk. +#' +#' Output is saved using compression format specified in output file name (gzip +#' by default). +#' +#' @param combined_output Named list output from `combine_tables`. Contains an +#' `output` dataframe, a list of newly seen files, and a flag indicating +#' whether any input filenames have been seen before. +#' @param seen_file Path to file listing filenames that have been previously +#' loaded into an output file. +#' @param output_file Path to corresponding output file. +write_rollup <- function(combined_output, seen_file, output_file) { + output_df <- combined_output[["output_df"]] + newly_seen_files <- combined_output[["newly_seen_files"]] + any_prev_seen_files <- combined_output[["any_prev_seen"]] + + # If some input files have been seen before, overwrite any existing output + # file. If no input files have been seen before, we can append directly to the + # output file. File is created if it doesn't already exist. + if (any_prev_seen_files) { # Automatically uses gzip compression based on output file name. Overwrites # existing file of the same name. write_csv(output_df, output_file) + } else { + write_csv(output_df, output_file, append=file.exists(output_file)) } - newly_seen <- setdiff(input_files, seen_files) - return(newly_seen) + if (length(newly_seen_files) > 0) { + write(newly_seen_files, seen_file, append=TRUE) + } + + return(NULL) } From ea692bc0a4b115e60504dd8a6ab704d7e57fda25 Mon Sep 17 00:00:00 2001 From: Nat DeFries <42820733+nmdefries@users.noreply.github.com> Date: Wed, 5 May 2021 18:45:26 -0400 Subject: [PATCH 07/12] Make sure issue_date is always the last col --- facebook/contingency-combine.R | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/facebook/contingency-combine.R b/facebook/contingency-combine.R index 612dbc920..1222ff4b7 100644 --- a/facebook/contingency-combine.R +++ b/facebook/contingency-combine.R @@ -118,19 +118,19 @@ combine_tables <- function(seen_file, input_dir, input_files, output_file) { county_fips = col_character() ) - # Get input data. + # Get input data. Make sure `issue_date` is last column after combining. input_df <- map_dfr( file.path(input_dir, input_files), function(f) { read_csv(f, col_types = cols) - } - ) + }) %>% + relocate(issue_date, .after=last_col()) if (file.exists(output_file)) { output_names <- names(read_csv(output_file, n_max = 0L)) assert(identical(output_names, names(input_df)), paste0("Column names and/or order differ between new and old input for ", output_file)) - } + } seen_files <- load_seen_file(seen_file) any_prev_seen <- any(input_files %in% seen_files) @@ -163,6 +163,7 @@ combine_tables <- function(seen_file, input_dir, input_files, output_file) { # which is a stable sort, ties will result in the input data being used in # preference over the existing rollup data. output_df <- bind_rows(output_df, input_df) %>% + relocate(issue_date, .after=last_col()) %>% arrange(issue_date) %>% group_by(across(all_of(group_names))) %>% slice_tail() %>% From 28737b3d7ee18cfa54b88ee62ba2cb4efb9a93aa Mon Sep 17 00:00:00 2001 From: Nat DeFries <42820733+nmdefries@users.noreply.github.com> Date: Thu, 6 May 2021 10:55:54 -0400 Subject: [PATCH 08/12] Use merge process if column names don't match --- facebook/contingency-combine.R | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/facebook/contingency-combine.R b/facebook/contingency-combine.R index 1222ff4b7..248302280 100644 --- a/facebook/contingency-combine.R +++ b/facebook/contingency-combine.R @@ -128,15 +128,18 @@ combine_tables <- function(seen_file, input_dir, input_files, output_file) { if (file.exists(output_file)) { output_names <- names(read_csv(output_file, n_max = 0L)) - assert(identical(output_names, names(input_df)), - paste0("Column names and/or order differ between new and old input for ", output_file)) + identical_names <- identical(output_names, names(input_df)) + } else { + identical_names <- TRUE } seen_files <- load_seen_file(seen_file) any_prev_seen <- any(input_files %in% seen_files) - # If no input files have been seen before, we don't need to deduplicate. - if (any_prev_seen) { + # If input files have been seen before, we need to deduplicate. If there is a + # mismatch between input and output column names/order, we need to explicitly + # merge input and output data to make sure columns match up correctly. + if (any_prev_seen || !identical_names) { assert(file.exists(output_file), paste0("The output file ", output_file, " does not exist, but non-zero", " files using the same grouping variables have been seen before.")) @@ -178,7 +181,7 @@ combine_tables <- function(seen_file, input_dir, input_files, output_file) { return(list( output_df=output_df, newly_seen_files=newly_seen, - any_prev_seen=any_prev_seen)) + can_overwrite=any_prev_seen || !identical_names)) } #' Save a combined dataframe and list of seen files to disk. @@ -188,19 +191,20 @@ combine_tables <- function(seen_file, input_dir, input_files, output_file) { #' #' @param combined_output Named list output from `combine_tables`. Contains an #' `output` dataframe, a list of newly seen files, and a flag indicating -#' whether any input filenames have been seen before. +#' whether we need to overwrite the existing output file or we can append to +#' it. #' @param seen_file Path to file listing filenames that have been previously #' loaded into an output file. #' @param output_file Path to corresponding output file. write_rollup <- function(combined_output, seen_file, output_file) { output_df <- combined_output[["output_df"]] newly_seen_files <- combined_output[["newly_seen_files"]] - any_prev_seen_files <- combined_output[["any_prev_seen"]] + can_overwrite <- combined_output[["can_overwrite"]] # If some input files have been seen before, overwrite any existing output # file. If no input files have been seen before, we can append directly to the # output file. File is created if it doesn't already exist. - if (any_prev_seen_files) { + if (can_overwrite) { # Automatically uses gzip compression based on output file name. Overwrites # existing file of the same name. write_csv(output_df, output_file) @@ -225,4 +229,7 @@ if (length(args) < 2) { input_path <- args[1] output_path <- args[2] +input_path <- "~/Downloads/0418_tables/" +output_path <- "~/Downloads/rollup_test_FB_press_conf" + invisible(run_rollup(input_path, output_path)) From f4a81597d30424787999c6d9dd32e0ce874771f1 Mon Sep 17 00:00:00 2001 From: Nat DeFries <42820733+nmdefries@users.noreply.github.com> Date: Thu, 6 May 2021 11:08:41 -0400 Subject: [PATCH 09/12] Always use merge process to simplify logic --- facebook/contingency-combine.R | 151 +++++++++++++-------------------- 1 file changed, 57 insertions(+), 94 deletions(-) diff --git a/facebook/contingency-combine.R b/facebook/contingency-combine.R index 248302280..d592c84a4 100644 --- a/facebook/contingency-combine.R +++ b/facebook/contingency-combine.R @@ -6,9 +6,9 @@ ## ## Rscript contingency-combine.R path/to/individual/files/ path/to/rollup/files/ ## -## Appends a set of newly-generated contingency tables to a rollup CSV that -## contains all dates for a given set of grouping variables. Can also be used to -## combine a directory of tables spanning multiple time periods. +## Combines a set of contingency tables with a rollup CSV that contains all +## dates for a given set of grouping variables. Can also be used to combine a +## directory of tables spanning multiple time periods. suppressPackageStartupMessages({ library(dplyr) @@ -18,8 +18,7 @@ suppressPackageStartupMessages({ }) -#' Fetch all tables in a chosen directory and combine according to grouping -#' used. +#' Fetch all tables in a chosen directory. Combine and save according to grouping. #' #' @param input_dir Directory in which to look for survey CSV files, relative to #' the current working directory. @@ -31,22 +30,19 @@ run_rollup <- function(input_dir, output_dir, pattern = "^[0-9]{8}_[0-9]{8}.*[.] if (!dir.exists(output_dir)) { dir.create(output_dir) } files <- list.files(input_dir, pattern = pattern) - if (length(files) == 0) { - stop("No matching data files.") - } + if (length(files) == 0) { stop("No matching data files.") } + # Get df of input files and corresponding output files. Reformat as a list + # such that input files with same grouping variables (and thus same output + # file) are in a character vector named with the output file. files <- map_dfr(files, get_file_properties) - - # Reformat files as a list such that input files with same grouping variables - # (and thus same output file) are in a character vector named with the output - # file. files <- lapply(split(files, files$rollup_name), function(x) {x$filename}) seen_file <- file.path(output_dir, "seen.txt") - if (any(file.exists(names(files)))) { + if ( any(file.exists(names(files))) ) { assert(file.exists(seen_file), - paste0("If any output file exists, ", seen_file, " listing input ", - "files previously used in generating a combined table should also exist")) + paste0("If any output file exists, ", seen_file, ", listing input ", + "files previously used in generating a combined table, should also exist")) } for (output_name in names(files)) { @@ -55,7 +51,11 @@ run_rollup <- function(input_dir, output_dir, pattern = "^[0-9]{8}_[0-9]{8}.*[.] input_dir, files[[output_name]], file.path(output_dir, output_name)) - write_rollup(combined_output, seen_file, file.path(output_dir, output_name)) + write_rollup( + combined_output[["newly_seen_files"]], + seen_file, + combined_output[["output_df"]], + file.path(output_dir, output_name)) } return(NULL) @@ -85,10 +85,7 @@ load_seen_file <- function(seen_file) { return(seen_files) } -#' Combine set of input files with existing output file. -#' -#' If an input filename has been seen before, the input and output data are -#' deduplicated to use the newer set of data. +#' Combine data from set of input files with existing output data. #' #' @param seen_file Path to file listing filenames that have been previously #' loaded into an output file. @@ -98,8 +95,7 @@ load_seen_file <- function(seen_file) { #' grouping variables. #' @param output_file Path to corresponding output file. #' -#' @return Named list of combined output dataframe, character vector, and -#' boolean. +#' @return Named list of combined output dataframe and character vector. combine_tables <- function(seen_file, input_dir, input_files, output_file) { cols <- cols( .default = col_guess(), @@ -118,99 +114,69 @@ combine_tables <- function(seen_file, input_dir, input_files, output_file) { county_fips = col_character() ) - # Get input data. Make sure `issue_date` is last column after combining. + # Get input data. Make sure `issue_date` is the last column after combining. input_df <- map_dfr( file.path(input_dir, input_files), function(f) { read_csv(f, col_types = cols) - }) %>% - relocate(issue_date, .after=last_col()) - - if (file.exists(output_file)) { - output_names <- names(read_csv(output_file, n_max = 0L)) - identical_names <- identical(output_names, names(input_df)) - } else { - identical_names <- TRUE - } + }) %>% relocate(issue_date, .after=last_col()) seen_files <- load_seen_file(seen_file) - any_prev_seen <- any(input_files %in% seen_files) - - # If input files have been seen before, we need to deduplicate. If there is a - # mismatch between input and output column names/order, we need to explicitly - # merge input and output data to make sure columns match up correctly. - if (any_prev_seen || !identical_names) { + if (any(input_files %in% seen_files)) { assert(file.exists(output_file), paste0("The output file ", output_file, " does not exist, but non-zero", " files using the same grouping variables have been seen before.")) - + } + + if ( file.exists(output_file) ) { output_df <- read_csv(output_file, col_types = cols) - - # Use all columns up to the first non-aggregate column to find unique rows. - group_names <- names(output_df) - - report_names <- c("val", "se", "sample_size", "represented", "effective_sample_size") - exclude_patterns <- paste0("^", report_names) - exclude_map <- grepl(paste(exclude_patterns, collapse="|"), group_names) - assert( any(exclude_map) , - "No value-reporting columns are available or their names have changed.") - - ind_first_report_col <- min(which(exclude_map)) - - group_names <- group_names[ 1:ind_first_report_col-1 ] - - ## Deduplicate, keeping newest version by issue date of each unique row. - # Merge the new data with the existing data, taking the last issue date for - # any given grouping/geo level/date combo. This prevents duplication in case - # of reissues. Note that the order matters: since arrange() uses order(), - # which is a stable sort, ties will result in the input data being used in - # preference over the existing rollup data. - output_df <- bind_rows(output_df, input_df) %>% - relocate(issue_date, .after=last_col()) %>% - arrange(issue_date) %>% - group_by(across(all_of(group_names))) %>% - slice_tail() %>% - ungroup() %>% - arrange(period_start) } else { - output_df <- input_df + output_df <- input_df[FALSE,] } + # Use all columns up to the first non-aggregate column to find unique rows. + group_names <- names(output_df) + report_names <- c("val", "se", "sample_size", "represented", "effective_sample_size") + exclude_patterns <- paste0("^", report_names) + exclude_map <- grepl(paste(exclude_patterns, collapse="|"), group_names) + assert( any(exclude_map) , + "No value-reporting columns are available or their names have changed.") + + ind_first_report_col <- min(which(exclude_map)) + group_names <- group_names[ 1:ind_first_report_col-1 ] + + ## Deduplicate, keeping newest version by issue date of each unique row. + # Merge the new data with the existing data, taking the last issue date for + # any given grouping/geo level/date combo. This prevents duplication in case + # of reissues. Note that the order matters: since arrange() uses order(), + # which is a stable sort, ties will result in the input data being used in + # preference over the existing rollup data. + output_df <- bind_rows(output_df, input_df) %>% + relocate(issue_date, .after=last_col()) %>% + arrange(issue_date) %>% + group_by(across(all_of(group_names))) %>% + slice_tail() %>% + ungroup() %>% + arrange(period_start) + newly_seen <- setdiff(input_files, seen_files) return(list( output_df=output_df, - newly_seen_files=newly_seen, - can_overwrite=any_prev_seen || !identical_names)) + newly_seen_files=newly_seen)) } #' Save a combined dataframe and list of seen files to disk. #' -#' Output is saved using compression format specified in output file name (gzip -#' by default). -#' -#' @param combined_output Named list output from `combine_tables`. Contains an -#' `output` dataframe, a list of newly seen files, and a flag indicating -#' whether we need to overwrite the existing output file or we can append to -#' it. +#' @param newly_seen_files Character vector. #' @param seen_file Path to file listing filenames that have been previously #' loaded into an output file. +#' @param output_df Output dataframe. #' @param output_file Path to corresponding output file. -write_rollup <- function(combined_output, seen_file, output_file) { - output_df <- combined_output[["output_df"]] - newly_seen_files <- combined_output[["newly_seen_files"]] - can_overwrite <- combined_output[["can_overwrite"]] - - # If some input files have been seen before, overwrite any existing output - # file. If no input files have been seen before, we can append directly to the - # output file. File is created if it doesn't already exist. - if (can_overwrite) { - # Automatically uses gzip compression based on output file name. Overwrites - # existing file of the same name. - write_csv(output_df, output_file) - } else { - write_csv(output_df, output_file, append=file.exists(output_file)) - } +write_rollup <- function(newly_seen_files, seen_file, output_df, output_file) { + # Automatically uses gzip compression based on output file name. Overwrites + # existing file of the same name. + write_csv(output_df, output_file) if (length(newly_seen_files) > 0) { write(newly_seen_files, seen_file, append=TRUE) @@ -229,7 +195,4 @@ if (length(args) < 2) { input_path <- args[1] output_path <- args[2] -input_path <- "~/Downloads/0418_tables/" -output_path <- "~/Downloads/rollup_test_FB_press_conf" - invisible(run_rollup(input_path, output_path)) From b19e7ef7f2127481fbda55173b9398317715f30c Mon Sep 17 00:00:00 2001 From: Kathryn M Mazaitis Date: Thu, 6 May 2021 13:49:38 -0400 Subject: [PATCH 10/12] Fix bad field spelling on sircal log --- sir_complainsalot/delphi_sir_complainsalot/check_source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sir_complainsalot/delphi_sir_complainsalot/check_source.py b/sir_complainsalot/delphi_sir_complainsalot/check_source.py index 153af24b0..a2bd28c11 100644 --- a/sir_complainsalot/delphi_sir_complainsalot/check_source.py +++ b/sir_complainsalot/delphi_sir_complainsalot/check_source.py @@ -66,7 +66,7 @@ def check_source(data_source, meta, params, grace, logger): continue logger.info("Retrieving signal", - source=data_source, + data_source=data_source, signal=row["signal"], start_day=(datetime.now() - timedelta(days = 14)).strftime("%Y-%m-%d"), end_day=datetime.now().strftime("%Y-%m-%d"), From 165a15f642331d89ee6e95db22be6ff628b0513f Mon Sep 17 00:00:00 2001 From: Nat DeFries <42820733+nmdefries@users.noreply.github.com> Date: Thu, 6 May 2021 15:10:23 -0400 Subject: [PATCH 11/12] Use col spec from input df to read output file readr's column guessing procedure only uses the first 1000 lines, by default, of a file to guess variable type for each column. If a column is completely missing for the first 1000 lines, it is read in as a logical which causes parsing failures if the column contains non-boolean values later, outside the type guessing range. This happens when reading in output files if an indicator was newly added. To correctly specify these, use the column specification from the input file/s. All columns included in input files are at least partially non-missing and sorted alphabetically (indepdendent of missingness), so we should always see non-missing values in the first 1000 lines. --- facebook/contingency-combine.R | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/facebook/contingency-combine.R b/facebook/contingency-combine.R index d592c84a4..d4b730497 100644 --- a/facebook/contingency-combine.R +++ b/facebook/contingency-combine.R @@ -114,12 +114,12 @@ combine_tables <- function(seen_file, input_dir, input_files, output_file) { county_fips = col_character() ) - # Get input data. Make sure `issue_date` is the last column after combining. + # Get input data. input_df <- map_dfr( file.path(input_dir, input_files), function(f) { read_csv(f, col_types = cols) - }) %>% relocate(issue_date, .after=last_col()) + }) seen_files <- load_seen_file(seen_file) if (any(input_files %in% seen_files)) { @@ -128,6 +128,7 @@ combine_tables <- function(seen_file, input_dir, input_files, output_file) { " files using the same grouping variables have been seen before.")) } + cols <- cols_condense(spec(input_df)) if ( file.exists(output_file) ) { output_df <- read_csv(output_file, col_types = cols) } else { From 826614db6115ad9f62306adac43db902e6b869c8 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Fri, 7 May 2021 15:01:02 -0700 Subject: [PATCH 12/12] Nans: rename PRIVACY to CENSORED and UNKNOWN to OTHER --- _delphi_utils_python/delphi_utils/nancodes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/nancodes.py b/_delphi_utils_python/delphi_utils/nancodes.py index e9e5a915a..985b8fa99 100644 --- a/_delphi_utils_python/delphi_utils/nancodes.py +++ b/_delphi_utils_python/delphi_utils/nancodes.py @@ -8,6 +8,6 @@ class Nans(IntEnum): NOT_MISSING = 0 NOT_APPLICABLE = 1 REGION_EXCEPTION = 2 - PRIVACY = 3 + CENSORED = 3 DELETED = 4 - UNKNOWN = 5 + OTHER = 5