diff --git a/_delphi_utils_python/delphi_utils/nancodes.py b/_delphi_utils_python/delphi_utils/nancodes.py index e9e5a915a..985b8fa99 100644 --- a/_delphi_utils_python/delphi_utils/nancodes.py +++ b/_delphi_utils_python/delphi_utils/nancodes.py @@ -8,6 +8,6 @@ class Nans(IntEnum): NOT_MISSING = 0 NOT_APPLICABLE = 1 REGION_EXCEPTION = 2 - PRIVACY = 3 + CENSORED = 3 DELETED = 4 - UNKNOWN = 5 + OTHER = 5 diff --git a/ansible/templates/sir_complainsalot-params-prod.json.j2 b/ansible/templates/sir_complainsalot-params-prod.json.j2 index 8a0553638..5a0302159 100644 --- a/ansible/templates/sir_complainsalot-params-prod.json.j2 +++ b/ansible/templates/sir_complainsalot-params-prod.json.j2 @@ -16,7 +16,7 @@ "maintainers": ["U01AP8GSWG3","U01069KCRS7"] }, "google-symptoms": { - "max_age": 11, + "max_age": 6, "maintainers": ["U01AP8GSWG3","U01069KCRS7"] }, "usa-facts": { diff --git a/facebook/contingency-combine.R b/facebook/contingency-combine.R new file mode 100644 index 000000000..d4b730497 --- /dev/null +++ b/facebook/contingency-combine.R @@ -0,0 +1,199 @@ +#!/usr/bin/env Rscript + +## Combine and compress contingency tables by grouping variable set. +## +## Usage: +## +## Rscript contingency-combine.R path/to/individual/files/ path/to/rollup/files/ +## +## Combines a set of contingency tables with a rollup CSV that contains all +## dates for a given set of grouping variables. Can also be used to combine a +## directory of tables spanning multiple time periods. + +suppressPackageStartupMessages({ + library(dplyr) + library(readr) + library(purrr) + library(delphiFacebook) +}) + + +#' Fetch all tables in a chosen directory. Combine and save according to grouping. +#' +#' @param input_dir Directory in which to look for survey CSV files, relative to +#' the current working directory. +#' @param output_dir Directory in which to look for existing rollup files or +#' create new ones, relative to the current working directory. +#' @param pattern Regular expression indicating which files in that directory to +#' open. By default, selects all `.csv` files with standard table date prefix. +run_rollup <- function(input_dir, output_dir, pattern = "^[0-9]{8}_[0-9]{8}.*[.]csv$") { + if (!dir.exists(output_dir)) { dir.create(output_dir) } + + files <- list.files(input_dir, pattern = pattern) + if (length(files) == 0) { stop("No matching data files.") } + + # Get df of input files and corresponding output files. Reformat as a list + # such that input files with same grouping variables (and thus same output + # file) are in a character vector named with the output file. + files <- map_dfr(files, get_file_properties) + files <- lapply(split(files, files$rollup_name), function(x) {x$filename}) + + seen_file <- file.path(output_dir, "seen.txt") + if ( any(file.exists(names(files))) ) { + assert(file.exists(seen_file), + paste0("If any output file exists, ", seen_file, ", listing input ", + "files previously used in generating a combined table, should also exist")) + } + + for (output_name in names(files)) { + combined_output <- combine_tables( + seen_file, + input_dir, + files[[output_name]], + file.path(output_dir, output_name)) + write_rollup( + combined_output[["newly_seen_files"]], + seen_file, + combined_output[["output_df"]], + file.path(output_dir, output_name)) + } + + return(NULL) +} + +## Helper function to extract info from each file's filename. +get_file_properties <- function(filename) { + short <- strsplit(filename, ".", fixed = TRUE)[[1]][1] + parts <- strsplit(short, "_", fixed = TRUE)[[1]] + + group <- parts[3:length(parts)] + # Specify compression format via name, to be parsed by `write_csv` later. + partial_name <- paste0(paste0(group, collapse="_"), ".csv.gz") + + return(data.frame( + filename=filename, + rollup_name=partial_name)) +} + +## Helper function to load "seen" file. +load_seen_file <- function(seen_file) { + if (!file.exists(seen_file)) { + file.create(seen_file) + } + + seen_files <- readLines(seen_file) + return(seen_files) +} + +#' Combine data from set of input files with existing output data. +#' +#' @param seen_file Path to file listing filenames that have been previously +#' loaded into an output file. +#' @param input_dir Directory in which to look for survey CSV files, relative to +#' the current working directory. +#' @param input_files Vector of paths to input files that share a set of +#' grouping variables. +#' @param output_file Path to corresponding output file. +#' +#' @return Named list of combined output dataframe and character vector. +combine_tables <- function(seen_file, input_dir, input_files, output_file) { + cols <- cols( + .default = col_guess(), + survey_geo = col_character(), + period_type = col_character(), + geo_type = col_character(), + aggregation_type = col_character(), + country = col_character(), + ISO_3 = col_character(), + GID_0 = col_character(), + region = col_character(), + GID_1 = col_character(), + state = col_character(), + state_fips = col_character(), + county = col_character(), + county_fips = col_character() + ) + + # Get input data. + input_df <- map_dfr( + file.path(input_dir, input_files), + function(f) { + read_csv(f, col_types = cols) + }) + + seen_files <- load_seen_file(seen_file) + if (any(input_files %in% seen_files)) { + assert(file.exists(output_file), + paste0("The output file ", output_file, " does not exist, but non-zero", + " files using the same grouping variables have been seen before.")) + } + + cols <- cols_condense(spec(input_df)) + if ( file.exists(output_file) ) { + output_df <- read_csv(output_file, col_types = cols) + } else { + output_df <- input_df[FALSE,] + } + + # Use all columns up to the first non-aggregate column to find unique rows. + group_names <- names(output_df) + report_names <- c("val", "se", "sample_size", "represented", "effective_sample_size") + exclude_patterns <- paste0("^", report_names) + exclude_map <- grepl(paste(exclude_patterns, collapse="|"), group_names) + assert( any(exclude_map) , + "No value-reporting columns are available or their names have changed.") + + ind_first_report_col <- min(which(exclude_map)) + group_names <- group_names[ 1:ind_first_report_col-1 ] + + ## Deduplicate, keeping newest version by issue date of each unique row. + # Merge the new data with the existing data, taking the last issue date for + # any given grouping/geo level/date combo. This prevents duplication in case + # of reissues. Note that the order matters: since arrange() uses order(), + # which is a stable sort, ties will result in the input data being used in + # preference over the existing rollup data. + output_df <- bind_rows(output_df, input_df) %>% + relocate(issue_date, .after=last_col()) %>% + arrange(issue_date) %>% + group_by(across(all_of(group_names))) %>% + slice_tail() %>% + ungroup() %>% + arrange(period_start) + + newly_seen <- setdiff(input_files, seen_files) + + return(list( + output_df=output_df, + newly_seen_files=newly_seen)) +} + +#' Save a combined dataframe and list of seen files to disk. +#' +#' @param newly_seen_files Character vector. +#' @param seen_file Path to file listing filenames that have been previously +#' loaded into an output file. +#' @param output_df Output dataframe. +#' @param output_file Path to corresponding output file. +write_rollup <- function(newly_seen_files, seen_file, output_df, output_file) { + # Automatically uses gzip compression based on output file name. Overwrites + # existing file of the same name. + write_csv(output_df, output_file) + + if (length(newly_seen_files) > 0) { + write(newly_seen_files, seen_file, append=TRUE) + } + + return(NULL) +} + + +args <- commandArgs(TRUE) + +if (length(args) < 2) { + stop("Usage: Rscript contingency-combine.R path/to/individual/files/ path/to/rollup/files/") +} + +input_path <- args[1] +output_path <- args[2] + +invisible(run_rollup(input_path, output_path)) diff --git a/facebook/delphiFacebook/R/contingency_aggregate.R b/facebook/delphiFacebook/R/contingency_aggregate.R index 946a40a37..d73c4efc2 100644 --- a/facebook/delphiFacebook/R/contingency_aggregate.R +++ b/facebook/delphiFacebook/R/contingency_aggregate.R @@ -75,6 +75,8 @@ produce_aggregates <- function(df, aggregations, cw_list, params) { ## To display other response columns ("val", "sample_size", "se", ## "effective_sample_size", "represented"), add here. + # If these names change (e.g. `sample_size` to `n`), update + # `contingency-combine.R`. keep_vars <- c("val", "se", "sample_size", "represented") for (agg_id in names(dfs_out)) { diff --git a/facebook/delphiFacebook/R/contingency_write.R b/facebook/delphiFacebook/R/contingency_write.R index d9ed2e0ce..1acfd6be2 100644 --- a/facebook/delphiFacebook/R/contingency_write.R +++ b/facebook/delphiFacebook/R/contingency_write.R @@ -1,7 +1,8 @@ #' Write csv file for sharing with researchers. #' #' CSV name includes date specifying start of time period aggregated, geo level, -#' and grouping variables. +#' and grouping variables. These columns are always inserted first in a table, +#' with reported values (value, standard error, sample size, etc) following. #' #' @param data a data frame to save; must contain the columns in #' `groupby_vars`. diff --git a/facebook/delphiFacebook/man/write_contingency_tables.Rd b/facebook/delphiFacebook/man/write_contingency_tables.Rd index b31c98e1a..ce687becd 100644 --- a/facebook/delphiFacebook/man/write_contingency_tables.Rd +++ b/facebook/delphiFacebook/man/write_contingency_tables.Rd @@ -24,5 +24,6 @@ calculate aggregations; used for naming the output file} } \description{ CSV name includes date specifying start of time period aggregated, geo level, -and grouping variables. +and grouping variables. These columns are always inserted first in a table, +with reported values (value, standard error, sample size, etc) following. } diff --git a/sir_complainsalot/delphi_sir_complainsalot/check_source.py b/sir_complainsalot/delphi_sir_complainsalot/check_source.py index 153af24b0..a2bd28c11 100644 --- a/sir_complainsalot/delphi_sir_complainsalot/check_source.py +++ b/sir_complainsalot/delphi_sir_complainsalot/check_source.py @@ -66,7 +66,7 @@ def check_source(data_source, meta, params, grace, logger): continue logger.info("Retrieving signal", - source=data_source, + data_source=data_source, signal=row["signal"], start_day=(datetime.now() - timedelta(days = 14)).strftime("%Y-%m-%d"), end_day=datetime.now().strftime("%Y-%m-%d"), diff --git a/sir_complainsalot/params.json.template b/sir_complainsalot/params.json.template index 49408b486..916d48f5b 100644 --- a/sir_complainsalot/params.json.template +++ b/sir_complainsalot/params.json.template @@ -17,7 +17,7 @@ "maintainers": ["U01AP8GSWG3","U01069KCRS7"] }, "google-symptoms": { - "max_age": 11, + "max_age": 6, "maintainers": ["U01AP8GSWG3","U01069KCRS7"] }, "usa-facts": {