Skip to content

Commit 32993e5

Browse files
authored
Merge pull request #1572 from cmu-delphi/ndefries/add-weekly-weights-microdata
[CTIS] Add weekly weights to microdata
2 parents 194f23a + d0daade commit 32993e5

23 files changed

+29954
-45
lines changed

ansible/templates/facebook-params-prod.json.j2

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
"start_date": "2021-08-16",
5858
"static_dir": "./static",
5959
"weights_in_dir": "./fb-incoming",
60+
"weekly_weights_in_dir": "./fb-incoming-weekly",
6061
"weights_out_dir": "./fb-outgoing",
6162
"experimental_weights_out_dir": "./exp-fb-outgoing"
6263
}

facebook/Makefile

Lines changed: 78 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,16 @@ TODAY:=$(shell date +"%Y-%m-%d")
55
YESTERDAY:=$(shell date --date "$(TODAY) -1 day" +"%Y-%m-%d")
66
ONEWEEK:=$(shell date --date "$(TODAY) -7 day" +"%Y-%m-%d")
77
THREEWEEK:=$(shell date --date "$(TODAY) -21 day" +"%Y-%m-%d")
8+
LAST_SATURDAY:=$(shell date -d "last Saturday" +"%Y-%m-%d")
9+
LAST_SUNDAY:=$(shell date -d "$(LAST_SATURDAY) -6 day" +"%Y-%m-%d")
10+
TUESDAY:=$(shell date -d "$(LAST_SATURDAY) +3 day" +"%Y-%m-%d")
811

912
MESSAGES:="messages/$(TODAY).messages"
1013

1114
PYTHON:=env/bin/python
1215
QUALTRICS=$(shell $(PYTHON) -m delphi_utils get input_dir)
1316
WEIGHTS=$(shell $(PYTHON) -m delphi_utils get weights_in_dir)
17+
WEEKLY_WEIGHTS=$(shell $(PYTHON) -m delphi_utils get weekly_weights_in_dir)
1418
CIDS=$(shell $(PYTHON) -m delphi_utils get weights_out_dir)
1519
CIDS_EXP=$(shell $(PYTHON) -m delphi_utils get experimental_weights_out_dir)
1620
INDIVIDUAL=$(shell $(PYTHON) -m delphi_utils get individual_dir)
@@ -24,6 +28,7 @@ DELPHI_SURVEY_EMAIL_USER=$(shell $(PYTHON) -m delphi_utils get delphi_survey_ema
2428
SFTP_OPTIONS=$(shell $(PYTHON) -m delphi_utils get sftp_options)
2529

2630
MAX_WEIGHTED=ls -1 $(WEIGHTS) | grep dap | tail -1 | sed 's/_.*//;s/-//g;'
31+
MAX_WEEKLY_WEIGHTED=ls -1 $(WEEKLY_WEIGHTS) | grep map | tail -1 | sed 's/_.*//;s/-//g;'
2732

2833
ANTIJOIN:="antijoin.cids.sorted.txt"
2934
ANTIJOIN_EXP:="antijoin.experimental.cids.sorted.txt"
@@ -42,24 +47,35 @@ else
4247
SFTP_POST:=sshpass -p $(DELPHI_SURVEY_SFTP_PASSWORD) sftp $(SFTP_OPTIONS) -b <(echo -e "$${BATCH}") -P 2222 $(DELPHI_SURVEY_SFTP_USER)
4348
endif
4449

50+
ifneq ("$(wildcard params.json)","")
51+
ifeq ($(WEIGHTS),$(WEEKLY_WEIGHTS))
52+
$(error "'weights_in_dir' and 'weekly_weights_in_dir' must be different.")
53+
endif
54+
endif
55+
4556
default:
4657
@echo No default implemented yet
4758

4859
scratch:
4960
mkdir scratch
5061
rm -rf scratch/*
5162

52-
tidy: receiving
53-
rm -rf tidy/$(RECEIVING)
54-
rm -rf tidy/$(INDIVIDUAL)
55-
rm -f tidy/params.json
56-
mkdir -p tidy tidy/$(RECEIVING) tidy/$(INDIVIDUAL)
57-
cp params.json tidy/
58-
mv $(RECEIVING)/*.csv tidy/$(RECEIVING)
59-
mv $(INDIVIDUAL)/*.csv* tidy/$(INDIVIDUAL)
60-
mv $(INDIVIDUAL_RACEETH)/*.csv* tidy/$(INDIVIDUAL_RACEETH)
61-
tar -czf scratch/tidy-`date +"%Y-%m-%d-%H%M%S"`.tgz --exclude='tidy-*.tgz' tidy
62-
mv scratch/*.tgz tidy/
63+
$(INDIVIDUAL) $(INDIVIDUAL_RACEETH):
64+
mkdir $@
65+
66+
tidy_%: receiving
67+
rm -rf $@/$(RECEIVING)
68+
rm -rf $@/$(INDIVIDUAL)
69+
rm -rf $@/$(INDIVIDUAL_RACEETH)
70+
rm -f $@/params.json
71+
mkdir -p $@ $@/$(RECEIVING) $@/$(INDIVIDUAL) $@/$(INDIVIDUAL_RACEETH)
72+
cp params.json $@/
73+
# Check for _any_ matching files using https://stackoverflow.com/a/6364244/14401472
74+
if compgen -G "$(RECEIVING)/*.csv" > /dev/null; then mv $(RECEIVING)/*.csv $@/$(RECEIVING); fi
75+
mv $(INDIVIDUAL)/*.csv* $@/$(INDIVIDUAL)
76+
mv $(INDIVIDUAL_RACEETH)/*.csv* $@/$(INDIVIDUAL_RACEETH)
77+
tar -czf scratch/$@-`date +"%Y-%m-%d-%H%M%S"`.tgz --exclude='tidy*-*.tgz' --exclude='*.done' $@
78+
mv scratch/*.tgz $@/
6379

6480
clean:
6581
rm -f $(RECEIVING)/*.csv $(INDIVIDUAL)/*.csv $(INDIVIDUAL_RACEETH)/*.csv $(CIDS)/*.csv $(CIDS_EXP)/*.csv
@@ -114,6 +130,17 @@ params.json: $(TODAY)
114130
output cids,individual,covidalert,archive,community \
115131
start_date $(YESTERDAY)
116132

133+
params.weekly-weights.json: $(TODAY)
134+
PAT=`grep fb-survey params.json | awk 'BEGIN{FS="\""}{print $$2}' | sed 's/ /_/g;s/^/-e /'`; \
135+
$(PYTHON) -m delphi_utils set \
136+
debug false \
137+
produce_individual_raceeth true \
138+
end_date $(LAST_SATURDAY) \
139+
input <(find $(QUALTRICS) -maxdepth 1 -newer $< -type f -name "*.csv" | sort | grep $${PAT} | tr '\n' ',' | sed 's_$(QUALTRICS)/__g;s/,$$//' ) \
140+
parallel true \
141+
output individual \
142+
start_date $(LAST_SUNDAY)
143+
117144
$(WEIGHTS): $(TODAY)
118145
[ -f $(WEIGHTS) ] || mkdir -p $(WEIGHTS)
119146
cd "$(WEIGHTS)"; \
@@ -133,6 +160,40 @@ $(WEIGHTS): $(TODAY)
133160
echo "WARNING: $${MSG}" >> $(MESSAGES); \
134161
fi
135162

163+
$(WEEKLY_WEIGHTS): $(TODAY)
164+
# This runs every day as a dependency of `pipeline`. A pipeline run is triggered when new weekly weights files are available.
165+
[ -f $(WEEKLY_WEIGHTS) ] || mkdir -p $(WEEKLY_WEIGHTS)
166+
cd "$(WEEKLY_WEIGHTS)"; \
167+
BATCH="cd fb-interchange/cmu_respondent_ww_weights\nls -1"; \
168+
NEW=`LC_ALL=C comm -23 <(sshpass -p $(DELPHI_SURVEY_SFTP_PASSWORD) sftp $(SFTP_OPTIONS) -b <(echo -e "$${BATCH}") -P 2222 $(DELPHI_SURVEY_SFTP_USER) | grep "^202" | LC_ALL=C sort) <(ls -1 | LC_ALL=C sort)`; \
169+
echo "New weekly weights files:"; \
170+
echo $${NEW}; \
171+
for f in $${NEW}; do \
172+
BATCH="$${BATCH}\nget $$f"; \
173+
done; \
174+
sshpass -p $(DELPHI_SURVEY_SFTP_PASSWORD) sftp $(SFTP_OPTIONS) -b <(echo -e "$${BATCH}") -P 2222 $(DELPHI_SURVEY_SFTP_USER) || exit 90; \
175+
cd -; \
176+
touch -d $(YESTERDAY) $(WEEKLY_WEIGHTS); \
177+
EXPECTED_WEEKLY_WEIGHTED=`date --date='$(LAST_SUNDAY)' +'%Y%m%d'`; \
178+
MIN_NEW_WEEKLY_WEIGHTED=`grep map <<< $${NEW} | head -1 | sed 's/_.*//;s/-//g;'`; \
179+
if [[ `wc -w <<< $${NEW}` -gt 0 ]] && [[ $$MIN_NEW_WEEKLY_WEIGHTED -ne $$EXPECTED_WEEKLY_WEIGHTED ]]; then \
180+
MSG="Expected new weekly weights files to start on: $$EXPECTED_WEEKLY_WEIGHTED; Actual new files starts on: $$MIN_NEW_WEEKLY_WEIGHTED"; \
181+
echo "WARNING: $${MSG}" >> $(MESSAGES); \
182+
fi; \
183+
MAX_WEEKLY_WEIGHTED=`$(MAX_WEEKLY_WEIGHTED)`; \
184+
if [[ `date --date='$(TODAY)' +'%Y%m%d'` -gt `date --date='$(TUESDAY)' +'%Y%m%d'` ]] && [[ $$MAX_WEEKLY_WEIGHTED -lt $$EXPECTED_WEEKLY_WEIGHTED ]]; then \
185+
MSG="Weekly weights are old; Expected most recent weekly weights file to start on: $$EXPECTED_WEEKLY_WEIGHTED; Actual most recent file starts on: $$MAX_WEEKLY_WEIGHTED"; \
186+
echo "WARNING: $${MSG}" >> $(MESSAGES); \
187+
fi; \
188+
if [[ ! -f tidy_weekly/$(LAST_SUNDAY)-weekly-weights.done ]] && [[ $$MAX_WEEKLY_WEIGHTED -eq $$EXPECTED_WEEKLY_WEIGHTED ]]; then \
189+
if [ -f params.json ]; then cp params.json params.daily.json; fi; \
190+
$(MAKE) weekly-weights-pipeline; \
191+
if [ -f params.daily.json ]; then \
192+
cp params.daily.json params.json; \
193+
rm -f params.daily.json; \
194+
fi; \
195+
fi
196+
136197
dev: delphiFacebook_1.0.tar.gz
137198
R CMD INSTALL delphiFacebook_1.0.tar.gz
138199

@@ -146,13 +207,18 @@ run-R: $(CIDS) $(CIDS_EXP)
146207
grep "scheduled core" tmp ; \
147208
[ "$$?" -eq 1 ]
148209

149-
pipeline: scratch init-qualtrics params.json $(WEIGHTS) run-R post-cids post-experimental-cids post-individual post-individual-raceeth post-done tidy
210+
pipeline: scratch init-qualtrics params.json $(WEIGHTS) run-R post-cids post-experimental-cids post-individual post-individual-raceeth post-done tidy_daily $(WEEKLY_WEIGHTS)
150211
grep $(TODAY) params.json
151212
[ -f $(YESTERDAY) ] && rm $(YESTERDAY) || true
152213
touch $@
153214
echo "SUCCESS: $(DRY_MESSAGE)pipeline complete" >> $(MESSAGES)
154215
chmod o+w $(MESSAGES)
155216

217+
weekly-weights-pipeline: scratch init-qualtrics params.weekly-weights.json run-R post-individual post-individual-raceeth tidy_weekly
218+
touch $@
219+
echo "SUCCESS: $(DRY_MESSAGE)completed weekly weights pipeline" >> $(MESSAGES)
220+
touch tidy_weekly/$(LAST_SUNDAY)-weekly-weights.done
221+
156222
coverage:
157223
Rscript -e 'covr::package_coverage("delphiFacebook")'
158224

facebook/delphiFacebook/NAMESPACE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# Generated by roxygen2: do not edit by hand
22

3+
export(add_weights)
34
export(apply_privacy_censoring)
45
export(assert)
56
export(ceiling_epiweek)
@@ -21,7 +22,6 @@ export(get_range_prev_full_period)
2122
export(get_range_prev_full_week)
2223
export(get_sparse_filenames)
2324
export(jeffreys_se)
24-
export(join_weights)
2525
export(load_archive)
2626
export(load_response_one)
2727
export(load_responses_all)

facebook/delphiFacebook/R/contingency_run.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ run_contingency_tables_one_period <- function(params, aggregations)
144144
return()
145145
}
146146

147-
data_agg <- join_weights(data_agg, params, weights = "full")$df
147+
data_agg <- add_weights(data_agg, params, weights = "full")$df
148148
msg_df("response data to aggregate", data_agg)
149149

150150
produce_aggregates(data_agg, aggregations, cw_list, params)

facebook/delphiFacebook/R/responses.R

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -779,10 +779,10 @@ filter_complete_responses <- function(data_full, params)
779779
data_full <- select(data_full, -.data$zip5)
780780

781781
# 9 includes StartDatetime, EndDatetime, Date, token, wave, geo_id,
782-
# UserLanguage + two questions (ignore raceethnicity, module, and
783-
# w12_assignment fields which may or may not exist, depending on params and
782+
# UserLanguage + two questions (ignore raceethnicity, module,
783+
# w12_assignment, and weekly weights fields which may or may not exist, depending on params and
784784
# survey version)
785-
ignore_cols <- c("raceethnicity", "w12_assignment", "module")
785+
ignore_cols <- c("raceethnicity", "w12_assignment", "module", "weight_wf", "weight_wp")
786786
valid_row_filter <- rowSums( !is.na(data_full[, !(names(data_full) %in% ignore_cols)]) ) >= 9
787787
data_full <- data_full[valid_row_filter, ]
788788

facebook/delphiFacebook/R/run.R

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ run_facebook <- function(params)
2727
# create data that will be aggregated for covidcast
2828
data_agg <- create_data_for_aggregation(input_data)
2929
data_agg <- filter_data_for_aggregation(data_agg, params, lead_days = 12)
30-
weight_result <- join_weights(data_agg, params, weights = "step1")
30+
weight_result <- add_weights(data_agg, params, weights = "step1")
3131
data_agg <- weight_result$df
3232
latest_weight_date_step1 <- weight_result$weight_date
3333
msg_df("response data to aggregate", data_agg)
@@ -36,10 +36,10 @@ run_facebook <- function(params)
3636
is.na(latest_weight_date_step1), as.Date(params$end_date), latest_weight_date_step1
3737
)
3838

39-
# create "complete" data that will be shared with research partners
39+
# create "complete" data (microdata) that will be shared with research partners
4040
data_full <- create_complete_responses(input_data, cw_list$county, params)
4141
data_full <- filter_complete_responses(data_full, params)
42-
data_full <- join_weights(data_full, params, weights = "full")$df
42+
data_full <- add_weights(data_full, params, weights = "full", add_weekly_weights = TRUE)$df
4343
msg_df("full data to share with research partners", data_full)
4444

4545
# create module-complete data used to create CID lists separately for each module

facebook/delphiFacebook/R/weights.R

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -83,17 +83,15 @@ generate_cid_list_filename <- function(type_name, params, module_type) {
8383
#' exclusively for producing individual response files.
8484
#'
8585
#' @param data a data frame containing a column called "token"
86-
#' @param params a named list containing a value "weights_in_dir" indicating where the
87-
#' weights files are stored
86+
#' @param params a named list containing value "weights_in_dir", indicating where the
87+
#' weights files are stored, and "weekly_weights_in_dir", indicating
88+
#' where the weekly weights files are stored, if `add_weekly_weights` is TRUE
8889
#' @param weights Which weights to use -- step1 or full?
90+
#' @param add_weekly_weights boolean indicating whether to add weekly partial and full
91+
#' weights in addition to daily weights
8992
#'
90-
#' @importFrom dplyr bind_rows left_join
91-
#' @importFrom data.table fread
92-
#' @importFrom stringi stri_extract_first
93-
#' @importFrom utils tail
94-
#'
9593
#' @export
96-
join_weights <- function(data, params, weights = c("step1", "full"))
94+
add_weights <- function(data, params, weights = c("step1", "full"), add_weekly_weights = FALSE)
9795
{
9896
weights <- match.arg(weights)
9997

@@ -103,14 +101,47 @@ join_weights <- function(data, params, weights = c("step1", "full"))
103101
pattern <- "finish_full_survey_weights.csv$"
104102
}
105103

106-
weights_files <- dir(params$weights_in_dir, pattern = pattern, full.names = TRUE)
104+
weight_result <- load_and_join_weights(data, params$weights_in_dir, pattern)
105+
data <- weight_result$df
106+
latest_weight_date <- weight_result$weight_date
107+
108+
if (add_weekly_weights) {
109+
# Since each weight column is joined on as `weight`, we need to rename
110+
# each new weight column before performing the next join to avoid
111+
# overwriting any weights
112+
data <- rename(data, daily_weight = weight)
113+
data <- load_and_join_weights(
114+
data, params$weekly_weights_in_dir, pattern = "partial_weekly_weights.csv$"
115+
)$df %>%
116+
rename(weight_wp = weight)
117+
data <- load_and_join_weights(
118+
data, params$weekly_weights_in_dir, pattern = "full_weekly_weights.csv$"
119+
)$df %>%
120+
rename(weight_wf = weight, weight = daily_weight)
121+
}
122+
123+
return( list(df = data, weight_date = latest_weight_date) )
124+
}
125+
126+
#' Add a single type of weights to a dataset of responses as field `weight`
127+
#'
128+
#' @param data a data frame containing a column called "token"
129+
#' @param weights_dir directory to look for the weights in
130+
#' @param pattern regular expression matching desired weights files
131+
#'
132+
#' @importFrom dplyr bind_rows left_join
133+
#' @importFrom data.table fread
134+
#' @importFrom stringi stri_extract_first
135+
#' @importFrom utils tail
136+
load_and_join_weights <- function(data, weights_dir, pattern) {
137+
weights_files <- dir(weights_dir, pattern = pattern, full.names = TRUE)
107138
weights_files <- sort(weights_files)
108139

109140
latest_weight <- tail(weights_files, n = 1)
110141
latest_weight_date <- as.Date(
111142
stri_extract_first(basename(latest_weight), regex = "^[0-9]{4}-[0-9]{2}-[0-9]{2}")
112143
)
113-
144+
114145
col_types <- c("character", "double")
115146
col_names <- c("cid", "weight")
116147
agg_weights <- bind_rows(lapply(
@@ -122,6 +153,6 @@ join_weights <- function(data, params, weights = c("step1", "full"))
122153
)
123154
agg_weights <- agg_weights[!duplicated(cid),]
124155
data <- left_join(data, agg_weights, by = c("token" = "cid"))
125-
156+
126157
return( list(df = data, weight_date = latest_weight_date) )
127158
}

facebook/delphiFacebook/integration-tests/testthat/params-full.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
"archive_dir": "./archive",
1010
"individual_dir": "./individual_full",
1111
"weights_in_dir": "./weights_full",
12+
"weekly_weights_in_dir": "./weights_full_weekly",
1213
"weights_out_dir": "./weights_out",
1314
"experimental_weights_out_dir": "./exp_weights_out",
1415
"input_dir": "./input",

facebook/delphiFacebook/integration-tests/testthat/params-test.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
"archive_dir": "./archive",
1010
"individual_dir": "./individual",
1111
"weights_in_dir": "./weights_in",
12+
"weekly_weights_in_dir": "./weights_in_weekly",
1213
"weights_out_dir": "./weights_out",
1314
"experimental_weights_out_dir": "./exp_weights_out",
1415
"input_dir": "./input",

facebook/delphiFacebook/integration-tests/testthat/test-contingency-run.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ set.seed(0)
161161
rand_weights <- runif(2000)
162162
rand_weights <- rand_weights / sum(rand_weights)
163163

164-
mock_join_weights <- function(data, params, weights = c("step1", "full")) {
164+
mock_add_weights <- function(data, params, weights = c("step1", "full")) {
165165
data <- cbind(as.data.table(data), weight=rand_weights)
166166
return( list(df = data, weight_date = NA) )
167167
}
@@ -190,7 +190,7 @@ test_that("simple weighted dataset produces correct percents", {
190190
params <- get_params(tdir)
191191
create_dir_not_exist(params$export_dir)
192192

193-
local_mock("delphiFacebook::join_weights" = mock_join_weights)
193+
local_mock("delphiFacebook::add_weights" = mock_add_weights)
194194
local_mock("delphiFacebook::mix_weights" = mock_mix_weights)
195195
local_mock("delphiFacebook::load_archive" = mock_load_archive)
196196
local_mock("delphiFacebook::add_geo_vars" = mock_geo_vars)

facebook/delphiFacebook/integration-tests/testthat/test-gold-individual.R

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ test_that("individual output files exist", {
2020
test_that("contents are the same", {
2121
for (f in expected_files) {
2222
test_df <- read.csv(test_path("individual_full", f))
23-
gold_df <- read.csv(test_path("gold_individual", f))
23+
gold_df <- read.csv(test_path("gold_individual", f)) %>%
24+
mutate(weight_wp = weight, weight_wf = weight)
2425

2526
expect_equal(nrow(gold_df), nrow(test_df))
2627

facebook/delphiFacebook/integration-tests/testthat/test-integration.R

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ test_that("testing weighted community values files", {
152152
params <- relativize_params(read_params(test_path("params-test.json")))
153153

154154
input_data <- load_responses_all(params)
155-
input_data <- join_weights(input_data, params)$df
155+
input_data <- add_weights(input_data, params)$df
156156

157157
# there are 2 / 4 households in PA on 2020-05-11 for community
158158
these <- input_data[input_data$date == "2020-05-11" & input_data$zip5 == "15106",]
@@ -194,7 +194,7 @@ test_that("testing weighted smoothed community values files", {
194194
params <- relativize_params(read_params(test_path("params-test.json")))
195195

196196
input_data <- load_responses_all(params)
197-
input_data <- join_weights(input_data, params)$df
197+
input_data <- add_weights(input_data, params)$df
198198

199199
# there are 2 / 4 households in PA on 2020-05-11 for community
200200
these <- input_data[
@@ -328,7 +328,7 @@ test_that("testing weighted ili/cli values files", {
328328
params <- relativize_params(read_params(test_path("params-test.json")))
329329

330330
input_data <- load_responses_all(params)
331-
input_data <- join_weights(input_data, params)$df
331+
input_data <- add_weights(input_data, params)$df
332332
data_agg <- create_data_for_aggregation(input_data)
333333

334334
## There are 4 households in PA on 2020-05-11, one with ILI.

0 commit comments

Comments
 (0)