Skip to content

Deploy renamed nancodes to production #1045

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
May 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions _delphi_utils_python/delphi_utils/nancodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ class Nans(IntEnum):
NOT_MISSING = 0
NOT_APPLICABLE = 1
REGION_EXCEPTION = 2
PRIVACY = 3
CENSORED = 3
DELETED = 4
UNKNOWN = 5
OTHER = 5
2 changes: 1 addition & 1 deletion ansible/templates/sir_complainsalot-params-prod.json.j2
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"maintainers": ["U01AP8GSWG3","U01069KCRS7"]
},
"google-symptoms": {
"max_age": 11,
"max_age": 6,
"maintainers": ["U01AP8GSWG3","U01069KCRS7"]
},
"usa-facts": {
Expand Down
199 changes: 199 additions & 0 deletions facebook/contingency-combine.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
#!/usr/bin/env Rscript

## Combine and compress contingency tables by grouping variable set.
##
## Usage:
##
## Rscript contingency-combine.R path/to/individual/files/ path/to/rollup/files/
##
## Combines a set of contingency tables with a rollup CSV that contains all
## dates for a given set of grouping variables. Can also be used to combine a
## directory of tables spanning multiple time periods.

suppressPackageStartupMessages({
library(dplyr)
library(readr)
library(purrr)
library(delphiFacebook)
})


#' Fetch all tables in a chosen directory. Combine and save according to grouping.
#'
#' @param input_dir Directory in which to look for survey CSV files, relative to
#' the current working directory.
#' @param output_dir Directory in which to look for existing rollup files or
#' create new ones, relative to the current working directory.
#' @param pattern Regular expression indicating which files in that directory to
#' open. By default, selects all `.csv` files with standard table date prefix.
run_rollup <- function(input_dir, output_dir, pattern = "^[0-9]{8}_[0-9]{8}.*[.]csv$") {
if (!dir.exists(output_dir)) { dir.create(output_dir) }

files <- list.files(input_dir, pattern = pattern)
if (length(files) == 0) { stop("No matching data files.") }

# Get df of input files and corresponding output files. Reformat as a list
# such that input files with same grouping variables (and thus same output
# file) are in a character vector named with the output file.
files <- map_dfr(files, get_file_properties)
files <- lapply(split(files, files$rollup_name), function(x) {x$filename})

seen_file <- file.path(output_dir, "seen.txt")
if ( any(file.exists(names(files))) ) {
assert(file.exists(seen_file),
paste0("If any output file exists, ", seen_file, ", listing input ",
"files previously used in generating a combined table, should also exist"))
}

for (output_name in names(files)) {
combined_output <- combine_tables(
seen_file,
input_dir,
files[[output_name]],
file.path(output_dir, output_name))
write_rollup(
combined_output[["newly_seen_files"]],
seen_file,
combined_output[["output_df"]],
file.path(output_dir, output_name))
}

return(NULL)
}

## Helper function to extract info from each file's filename.
get_file_properties <- function(filename) {
short <- strsplit(filename, ".", fixed = TRUE)[[1]][1]
parts <- strsplit(short, "_", fixed = TRUE)[[1]]

group <- parts[3:length(parts)]
# Specify compression format via name, to be parsed by `write_csv` later.
partial_name <- paste0(paste0(group, collapse="_"), ".csv.gz")

return(data.frame(
filename=filename,
rollup_name=partial_name))
}

## Helper function to load "seen" file.
load_seen_file <- function(seen_file) {
if (!file.exists(seen_file)) {
file.create(seen_file)
}

seen_files <- readLines(seen_file)
return(seen_files)
}

#' Combine data from set of input files with existing output data.
#'
#' @param seen_file Path to file listing filenames that have been previously
#' loaded into an output file.
#' @param input_dir Directory in which to look for survey CSV files, relative to
#' the current working directory.
#' @param input_files Vector of paths to input files that share a set of
#' grouping variables.
#' @param output_file Path to corresponding output file.
#'
#' @return Named list of combined output dataframe and character vector.
combine_tables <- function(seen_file, input_dir, input_files, output_file) {
cols <- cols(
.default = col_guess(),
survey_geo = col_character(),
period_type = col_character(),
geo_type = col_character(),
aggregation_type = col_character(),
country = col_character(),
ISO_3 = col_character(),
GID_0 = col_character(),
region = col_character(),
GID_1 = col_character(),
state = col_character(),
state_fips = col_character(),
county = col_character(),
county_fips = col_character()
)

# Get input data.
input_df <- map_dfr(
file.path(input_dir, input_files),
function(f) {
read_csv(f, col_types = cols)
})

seen_files <- load_seen_file(seen_file)
if (any(input_files %in% seen_files)) {
assert(file.exists(output_file),
paste0("The output file ", output_file, " does not exist, but non-zero",
" files using the same grouping variables have been seen before."))
}

cols <- cols_condense(spec(input_df))
if ( file.exists(output_file) ) {
output_df <- read_csv(output_file, col_types = cols)
} else {
output_df <- input_df[FALSE,]
}

# Use all columns up to the first non-aggregate column to find unique rows.
group_names <- names(output_df)
report_names <- c("val", "se", "sample_size", "represented", "effective_sample_size")
exclude_patterns <- paste0("^", report_names)
exclude_map <- grepl(paste(exclude_patterns, collapse="|"), group_names)
assert( any(exclude_map) ,
"No value-reporting columns are available or their names have changed.")

ind_first_report_col <- min(which(exclude_map))
group_names <- group_names[ 1:ind_first_report_col-1 ]

## Deduplicate, keeping newest version by issue date of each unique row.
# Merge the new data with the existing data, taking the last issue date for
# any given grouping/geo level/date combo. This prevents duplication in case
# of reissues. Note that the order matters: since arrange() uses order(),
# which is a stable sort, ties will result in the input data being used in
# preference over the existing rollup data.
output_df <- bind_rows(output_df, input_df) %>%
relocate(issue_date, .after=last_col()) %>%
arrange(issue_date) %>%
group_by(across(all_of(group_names))) %>%
slice_tail() %>%
ungroup() %>%
arrange(period_start)

newly_seen <- setdiff(input_files, seen_files)

return(list(
output_df=output_df,
newly_seen_files=newly_seen))
}

#' Save a combined dataframe and list of seen files to disk.
#'
#' @param newly_seen_files Character vector.
#' @param seen_file Path to file listing filenames that have been previously
#' loaded into an output file.
#' @param output_df Output dataframe.
#' @param output_file Path to corresponding output file.
write_rollup <- function(newly_seen_files, seen_file, output_df, output_file) {
# Automatically uses gzip compression based on output file name. Overwrites
# existing file of the same name.
write_csv(output_df, output_file)

if (length(newly_seen_files) > 0) {
write(newly_seen_files, seen_file, append=TRUE)
}

return(NULL)
}


args <- commandArgs(TRUE)

if (length(args) < 2) {
stop("Usage: Rscript contingency-combine.R path/to/individual/files/ path/to/rollup/files/")
}

input_path <- args[1]
output_path <- args[2]

invisible(run_rollup(input_path, output_path))
2 changes: 2 additions & 0 deletions facebook/delphiFacebook/R/contingency_aggregate.R
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ produce_aggregates <- function(df, aggregations, cw_list, params) {

## To display other response columns ("val", "sample_size", "se",
## "effective_sample_size", "represented"), add here.
# If these names change (e.g. `sample_size` to `n`), update
# `contingency-combine.R`.
keep_vars <- c("val", "se", "sample_size", "represented")

for (agg_id in names(dfs_out)) {
Expand Down
3 changes: 2 additions & 1 deletion facebook/delphiFacebook/R/contingency_write.R
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#' Write csv file for sharing with researchers.
#'
#' CSV name includes date specifying start of time period aggregated, geo level,
#' and grouping variables.
#' and grouping variables. These columns are always inserted first in a table,
#' with reported values (value, standard error, sample size, etc) following.
#'
#' @param data a data frame to save; must contain the columns in
#' `groupby_vars`.
Expand Down
3 changes: 2 additions & 1 deletion facebook/delphiFacebook/man/write_contingency_tables.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sir_complainsalot/delphi_sir_complainsalot/check_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def check_source(data_source, meta, params, grace, logger):
continue

logger.info("Retrieving signal",
source=data_source,
data_source=data_source,
signal=row["signal"],
start_day=(datetime.now() - timedelta(days = 14)).strftime("%Y-%m-%d"),
end_day=datetime.now().strftime("%Y-%m-%d"),
Expand Down
2 changes: 1 addition & 1 deletion sir_complainsalot/params.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"maintainers": ["U01AP8GSWG3","U01069KCRS7"]
},
"google-symptoms": {
"max_age": 11,
"max_age": 6,
"maintainers": ["U01AP8GSWG3","U01069KCRS7"]
},
"usa-facts": {
Expand Down