Skip to content

Commit 09cd133

Browse files
authored
Merge pull request #1628 from cmu-delphi/release/indicators_v0.3.14_utils_v0.3.4
Release covidcast-indicators 0.3.14
2 parents 9060a48 + 769ee8e commit 09cd133

32 files changed

+30371
-128
lines changed

.bumpversion.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 0.3.13
2+
current_version = 0.3.14
33
commit = True
44
message = chore: bump covidcast-indicators to {new_version}
55
tag = False

ansible/templates/facebook-params-prod.json.j2

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
"start_date": "2021-08-16",
5858
"static_dir": "./static",
5959
"weights_in_dir": "./fb-incoming",
60+
"weekly_weights_in_dir": "./fb-incoming-weekly",
6061
"weights_out_dir": "./fb-outgoing",
6162
"experimental_weights_out_dir": "./exp-fb-outgoing"
6263
}

dsew_community_profile/delphi_dsew_community_profile/pull.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,7 @@ def as_cached_filename(params, config):
408408
def fetch_listing(params):
409409
"""Generate the list of report files to process."""
410410
export_start_date = params['indicator'].get(
411-
'export_start_date', datetime.datetime.fromtimestamp(0).date()
411+
'export_start_date', datetime.datetime.utcfromtimestamp(0).date()
412412
)
413413

414414
listing = requests.get(DOWNLOAD_LISTING).json()['metadata']['attachments']

dsew_community_profile/tests/test_pull.py

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ def _set_df_dtypes(df: pd.DataFrame, dtypes: Dict[str, Any]) -> pd.DataFrame:
3939
df[k] = df[k].astype(v)
4040
return df
4141

42+
4243
class TestPull:
4344
def test_DatasetTimes(self):
4445
examples = [
@@ -158,31 +159,35 @@ def test_Dataset_parse_sheet(self):
158159
# TODO
159160
pass
160161

161-
@patch('requests.get')
162-
@patch('os.path.exists')
163-
def test_fetch_listing(self, mock_listing, mock_exists):
162+
def test_fetch_listing(self):
164163
inst = namedtuple("attachment", "assetId filename publish cache")
165164
instances = list(chain(*[
166165
[
167-
inst(f"{i}", f"2021010{i}.xlsx", date(2021, 1, i), f"{i}---2021010{i}.xlsx"),
168-
inst(f"p{i}", f"2021010{i}.pdf", date(2021, 1, i), f"p{i}---2021010{i}.pdf"),
166+
inst(f"{i}", f"2021010{i}.xlsx", date(2021, 1, i), f"2021010{i}--{i}.xlsx"),
167+
inst(f"p{i}", f"2021010{i}.pdf", date(2021, 1, i), f"2021010{i}--p{i}.pdf"),
169168
]
170169
for i in [1, 2, 3, 4, 5]
171170
]))
172171

173-
mock_listing.return_value = Mock()
174-
mock_listing.return_value.json = Mock(
175-
return_value = {
176-
'metadata': {
177-
'attachments': [
178-
{"assetId": i.assetId, "filename": i.filename}
179-
for i in instances
180-
]
181-
}
182-
}
183-
)
184-
185-
mock_exists.reset_mock(return_value=False)
172+
# Solution from https://stackoverflow.com/questions/15753390/
173+
#how-can-i-mock-requests-and-the-response
174+
def mocked_requests_get(*args, **kwargs):
175+
class MockResponse:
176+
def __init__(self, json_data):
177+
self.json_data = json_data
178+
179+
def json(self):
180+
return self.json_data
181+
182+
return MockResponse({
183+
'metadata': {
184+
'attachments': [
185+
{"assetId": i.assetId, "filename": i.filename}
186+
for i in instances
187+
]
188+
}
189+
}
190+
)
186191

187192
def as_listing(instance):
188193
return {
@@ -192,15 +197,20 @@ def as_listing(instance):
192197
"publish_date": instance.publish
193198
}
194199
ex = example(
195-
{'indicator':{'reports':'new'}},
200+
{'indicator':{'reports':'new', 'input_cache':''}},
196201
[
197202
as_listing(instance)
198203
for i, instance in filter(lambda x: x[0]%2 == 0, enumerate(instances))
199204
]
200205
)
201206

202-
for actual, expected in zip(fetch_listing(ex.given), ex.expected):
203-
assert actual == expected
207+
with patch('requests.get', side_effect=mocked_requests_get):
208+
with patch('os.path.exists', return_value=False):
209+
for actual, expected in zip(fetch_listing(ex.given), ex.expected):
210+
assert actual == expected
211+
212+
with patch('os.path.exists', return_value=True):
213+
assert fetch_listing(ex.given) == []
204214

205215
def test_nation_from_state(self):
206216
geomapper = GeoMapper()

facebook/Makefile

Lines changed: 78 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,16 @@ TODAY:=$(shell date +"%Y-%m-%d")
55
YESTERDAY:=$(shell date --date "$(TODAY) -1 day" +"%Y-%m-%d")
66
ONEWEEK:=$(shell date --date "$(TODAY) -7 day" +"%Y-%m-%d")
77
THREEWEEK:=$(shell date --date "$(TODAY) -21 day" +"%Y-%m-%d")
8+
LAST_SATURDAY:=$(shell date -d "last Saturday" +"%Y-%m-%d")
9+
LAST_SUNDAY:=$(shell date -d "$(LAST_SATURDAY) -6 day" +"%Y-%m-%d")
10+
TUESDAY:=$(shell date -d "$(LAST_SATURDAY) +3 day" +"%Y-%m-%d")
811

912
MESSAGES:="messages/$(TODAY).messages"
1013

1114
PYTHON:=env/bin/python
1215
QUALTRICS=$(shell $(PYTHON) -m delphi_utils get input_dir)
1316
WEIGHTS=$(shell $(PYTHON) -m delphi_utils get weights_in_dir)
17+
WEEKLY_WEIGHTS=$(shell $(PYTHON) -m delphi_utils get weekly_weights_in_dir)
1418
CIDS=$(shell $(PYTHON) -m delphi_utils get weights_out_dir)
1519
CIDS_EXP=$(shell $(PYTHON) -m delphi_utils get experimental_weights_out_dir)
1620
INDIVIDUAL=$(shell $(PYTHON) -m delphi_utils get individual_dir)
@@ -24,6 +28,7 @@ DELPHI_SURVEY_EMAIL_USER=$(shell $(PYTHON) -m delphi_utils get delphi_survey_ema
2428
SFTP_OPTIONS=$(shell $(PYTHON) -m delphi_utils get sftp_options)
2529

2630
MAX_WEIGHTED=ls -1 $(WEIGHTS) | grep dap | tail -1 | sed 's/_.*//;s/-//g;'
31+
MAX_WEEKLY_WEIGHTED=ls -1 $(WEEKLY_WEIGHTS) | grep map | tail -1 | sed 's/_.*//;s/-//g;'
2732

2833
ANTIJOIN:="antijoin.cids.sorted.txt"
2934
ANTIJOIN_EXP:="antijoin.experimental.cids.sorted.txt"
@@ -42,24 +47,35 @@ else
4247
SFTP_POST:=sshpass -p $(DELPHI_SURVEY_SFTP_PASSWORD) sftp $(SFTP_OPTIONS) -b <(echo -e "$${BATCH}") -P 2222 $(DELPHI_SURVEY_SFTP_USER)
4348
endif
4449

50+
ifneq ("$(wildcard params.json)","")
51+
ifeq ($(WEIGHTS),$(WEEKLY_WEIGHTS))
52+
$(error "'weights_in_dir' and 'weekly_weights_in_dir' must be different.")
53+
endif
54+
endif
55+
4556
default:
4657
@echo No default implemented yet
4758

4859
scratch:
4960
mkdir scratch
5061
rm -rf scratch/*
5162

52-
tidy: receiving
53-
rm -rf tidy/$(RECEIVING)
54-
rm -rf tidy/$(INDIVIDUAL)
55-
rm -f tidy/params.json
56-
mkdir -p tidy tidy/$(RECEIVING) tidy/$(INDIVIDUAL)
57-
cp params.json tidy/
58-
mv $(RECEIVING)/*.csv tidy/$(RECEIVING)
59-
mv $(INDIVIDUAL)/*.csv* tidy/$(INDIVIDUAL)
60-
mv $(INDIVIDUAL_RACEETH)/*.csv* tidy/$(INDIVIDUAL_RACEETH)
61-
tar -czf scratch/tidy-`date +"%Y-%m-%d-%H%M%S"`.tgz --exclude='tidy-*.tgz' tidy
62-
mv scratch/*.tgz tidy/
63+
$(INDIVIDUAL) $(INDIVIDUAL_RACEETH):
64+
mkdir $@
65+
66+
tidy_%: receiving
67+
rm -rf $@/$(RECEIVING)
68+
rm -rf $@/$(INDIVIDUAL)
69+
rm -rf $@/$(INDIVIDUAL_RACEETH)
70+
rm -f $@/params.json
71+
mkdir -p $@ $@/$(RECEIVING) $@/$(INDIVIDUAL) $@/$(INDIVIDUAL_RACEETH)
72+
cp params.json $@/
73+
# Check for _any_ matching files using https://stackoverflow.com/a/6364244/14401472
74+
if compgen -G "$(RECEIVING)/*.csv" > /dev/null; then mv $(RECEIVING)/*.csv $@/$(RECEIVING); fi
75+
mv $(INDIVIDUAL)/*.csv* $@/$(INDIVIDUAL)
76+
mv $(INDIVIDUAL_RACEETH)/*.csv* $@/$(INDIVIDUAL_RACEETH)
77+
tar -czf scratch/$@-`date +"%Y-%m-%d-%H%M%S"`.tgz --exclude='tidy*-*.tgz' --exclude='*.done' $@
78+
mv scratch/*.tgz $@/
6379

6480
clean:
6581
rm -f $(RECEIVING)/*.csv $(INDIVIDUAL)/*.csv $(INDIVIDUAL_RACEETH)/*.csv $(CIDS)/*.csv $(CIDS_EXP)/*.csv
@@ -114,6 +130,17 @@ params.json: $(TODAY)
114130
output cids,individual,covidalert,archive,community \
115131
start_date $(YESTERDAY)
116132

133+
params.weekly-weights.json: $(TODAY)
134+
PAT=`grep fb-survey params.json | awk 'BEGIN{FS="\""}{print $$2}' | sed 's/ /_/g;s/^/-e /'`; \
135+
$(PYTHON) -m delphi_utils set \
136+
debug false \
137+
produce_individual_raceeth true \
138+
end_date $(LAST_SATURDAY) \
139+
input <(find $(QUALTRICS) -maxdepth 1 -newer $< -type f -name "*.csv" | sort | grep $${PAT} | tr '\n' ',' | sed 's_$(QUALTRICS)/__g;s/,$$//' ) \
140+
parallel true \
141+
output individual \
142+
start_date $(LAST_SUNDAY)
143+
117144
$(WEIGHTS): $(TODAY)
118145
[ -f $(WEIGHTS) ] || mkdir -p $(WEIGHTS)
119146
cd "$(WEIGHTS)"; \
@@ -133,6 +160,40 @@ $(WEIGHTS): $(TODAY)
133160
echo "WARNING: $${MSG}" >> $(MESSAGES); \
134161
fi
135162

163+
$(WEEKLY_WEIGHTS): $(TODAY)
164+
# This runs every day as a dependency of `pipeline`. A pipeline run is triggered when new weekly weights files are available.
165+
[ -f $(WEEKLY_WEIGHTS) ] || mkdir -p $(WEEKLY_WEIGHTS)
166+
cd "$(WEEKLY_WEIGHTS)"; \
167+
BATCH="cd fb-interchange/cmu_respondent_ww_weights\nls -1"; \
168+
NEW=`LC_ALL=C comm -23 <(sshpass -p $(DELPHI_SURVEY_SFTP_PASSWORD) sftp $(SFTP_OPTIONS) -b <(echo -e "$${BATCH}") -P 2222 $(DELPHI_SURVEY_SFTP_USER) | grep "^202" | LC_ALL=C sort) <(ls -1 | LC_ALL=C sort)`; \
169+
echo "New weekly weights files:"; \
170+
echo $${NEW}; \
171+
for f in $${NEW}; do \
172+
BATCH="$${BATCH}\nget $$f"; \
173+
done; \
174+
sshpass -p $(DELPHI_SURVEY_SFTP_PASSWORD) sftp $(SFTP_OPTIONS) -b <(echo -e "$${BATCH}") -P 2222 $(DELPHI_SURVEY_SFTP_USER) || exit 90; \
175+
cd -; \
176+
touch -d $(YESTERDAY) $(WEEKLY_WEIGHTS); \
177+
EXPECTED_WEEKLY_WEIGHTED=`date --date='$(LAST_SUNDAY)' +'%Y%m%d'`; \
178+
MIN_NEW_WEEKLY_WEIGHTED=`grep map <<< $${NEW} | head -1 | sed 's/_.*//;s/-//g;'`; \
179+
if [[ `wc -w <<< $${NEW}` -gt 0 ]] && [[ $$MIN_NEW_WEEKLY_WEIGHTED -ne $$EXPECTED_WEEKLY_WEIGHTED ]]; then \
180+
MSG="Expected new weekly weights files to start on: $$EXPECTED_WEEKLY_WEIGHTED; Actual new files starts on: $$MIN_NEW_WEEKLY_WEIGHTED"; \
181+
echo "WARNING: $${MSG}" >> $(MESSAGES); \
182+
fi; \
183+
MAX_WEEKLY_WEIGHTED=`$(MAX_WEEKLY_WEIGHTED)`; \
184+
if [[ `date --date='$(TODAY)' +'%Y%m%d'` -gt `date --date='$(TUESDAY)' +'%Y%m%d'` ]] && [[ $$MAX_WEEKLY_WEIGHTED -lt $$EXPECTED_WEEKLY_WEIGHTED ]]; then \
185+
MSG="Weekly weights are old; Expected most recent weekly weights file to start on: $$EXPECTED_WEEKLY_WEIGHTED; Actual most recent file starts on: $$MAX_WEEKLY_WEIGHTED"; \
186+
echo "WARNING: $${MSG}" >> $(MESSAGES); \
187+
fi; \
188+
if [[ ! -f tidy_weekly/$(LAST_SUNDAY)-weekly-weights.done ]] && [[ $$MAX_WEEKLY_WEIGHTED -eq $$EXPECTED_WEEKLY_WEIGHTED ]]; then \
189+
if [ -f params.json ]; then cp params.json params.daily.json; fi; \
190+
$(MAKE) weekly-weights-pipeline; \
191+
if [ -f params.daily.json ]; then \
192+
cp params.daily.json params.json; \
193+
rm -f params.daily.json; \
194+
fi; \
195+
fi
196+
136197
dev: delphiFacebook_1.0.tar.gz
137198
R CMD INSTALL delphiFacebook_1.0.tar.gz
138199

@@ -146,13 +207,18 @@ run-R: $(CIDS) $(CIDS_EXP)
146207
grep "scheduled core" tmp ; \
147208
[ "$$?" -eq 1 ]
148209

149-
pipeline: scratch init-qualtrics params.json $(WEIGHTS) run-R post-cids post-experimental-cids post-individual post-individual-raceeth post-done tidy
210+
pipeline: scratch init-qualtrics params.json $(WEIGHTS) run-R post-cids post-experimental-cids post-individual post-individual-raceeth post-done tidy_daily $(WEEKLY_WEIGHTS)
150211
grep $(TODAY) params.json
151212
[ -f $(YESTERDAY) ] && rm $(YESTERDAY) || true
152213
touch $@
153214
echo "SUCCESS: $(DRY_MESSAGE)pipeline complete" >> $(MESSAGES)
154215
chmod o+w $(MESSAGES)
155216

217+
weekly-weights-pipeline: scratch init-qualtrics params.weekly-weights.json run-R post-individual post-individual-raceeth tidy_weekly
218+
touch $@
219+
echo "SUCCESS: $(DRY_MESSAGE)completed weekly weights pipeline" >> $(MESSAGES)
220+
touch tidy_weekly/$(LAST_SUNDAY)-weekly-weights.done
221+
156222
coverage:
157223
Rscript -e 'covr::package_coverage("delphiFacebook")'
158224

facebook/delphiFacebook/NAMESPACE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# Generated by roxygen2: do not edit by hand
22

3+
export(add_weights)
34
export(apply_privacy_censoring)
45
export(assert)
56
export(ceiling_epiweek)
@@ -21,7 +22,6 @@ export(get_range_prev_full_period)
2122
export(get_range_prev_full_week)
2223
export(get_sparse_filenames)
2324
export(jeffreys_se)
24-
export(join_weights)
2525
export(load_archive)
2626
export(load_response_one)
2727
export(load_responses_all)

facebook/delphiFacebook/R/contingency_run.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ run_contingency_tables_one_period <- function(params, aggregations)
144144
return()
145145
}
146146

147-
data_agg <- join_weights(data_agg, params, weights = "full")$df
147+
data_agg <- add_weights(data_agg, params, weights = "full")$df
148148
msg_df("response data to aggregate", data_agg)
149149

150150
produce_aggregates(data_agg, aggregations, cw_list, params)

facebook/delphiFacebook/R/responses.R

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -779,10 +779,10 @@ filter_complete_responses <- function(data_full, params)
779779
data_full <- select(data_full, -.data$zip5)
780780

781781
# 9 includes StartDatetime, EndDatetime, Date, token, wave, geo_id,
782-
# UserLanguage + two questions (ignore raceethnicity, module, and
783-
# w12_assignment fields which may or may not exist, depending on params and
782+
# UserLanguage + two questions (ignore raceethnicity, module,
783+
# w12_assignment, and weekly weights fields which may or may not exist, depending on params and
784784
# survey version)
785-
ignore_cols <- c("raceethnicity", "w12_assignment", "module")
785+
ignore_cols <- c("raceethnicity", "w12_assignment", "module", "weight_wf", "weight_wp")
786786
valid_row_filter <- rowSums( !is.na(data_full[, !(names(data_full) %in% ignore_cols)]) ) >= 9
787787
data_full <- data_full[valid_row_filter, ]
788788

facebook/delphiFacebook/R/run.R

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ run_facebook <- function(params)
2727
# create data that will be aggregated for covidcast
2828
data_agg <- create_data_for_aggregation(input_data)
2929
data_agg <- filter_data_for_aggregation(data_agg, params, lead_days = 12)
30-
weight_result <- join_weights(data_agg, params, weights = "step1")
30+
weight_result <- add_weights(data_agg, params, weights = "step1")
3131
data_agg <- weight_result$df
3232
latest_weight_date_step1 <- weight_result$weight_date
3333
msg_df("response data to aggregate", data_agg)
@@ -36,10 +36,10 @@ run_facebook <- function(params)
3636
is.na(latest_weight_date_step1), as.Date(params$end_date), latest_weight_date_step1
3737
)
3838

39-
# create "complete" data that will be shared with research partners
39+
# create "complete" data (microdata) that will be shared with research partners
4040
data_full <- create_complete_responses(input_data, cw_list$county, params)
4141
data_full <- filter_complete_responses(data_full, params)
42-
data_full <- join_weights(data_full, params, weights = "full")$df
42+
data_full <- add_weights(data_full, params, weights = "full", add_weekly_weights = TRUE)$df
4343
msg_df("full data to share with research partners", data_full)
4444

4545
# create module-complete data used to create CID lists separately for each module

0 commit comments

Comments
 (0)