Skip to content

Commit ddadab2

Browse files
authored
Merge pull request #363 from sgsmob/aws_credentials
Gate pulling/syncing data from AWS in safegraph behind a flag in params
2 parents 2f8d4df + 7e3e516 commit ddadab2

File tree

16 files changed

+135
-56
lines changed

16 files changed

+135
-56
lines changed

safegraph/delphi_safegraph/process.py

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# Base file name for raw data CSVs.
1616
CSV_NAME = 'social-distancing.csv.gz'
1717

18+
1819
def validate(df):
1920
"""Confirms that a data frame has only one date."""
2021
timestamps = df['date_range_start'].apply(date_from_timestamp)
@@ -235,13 +236,13 @@ def process_window(df_list: List[pd.DataFrame],
235236
f'{signal}_se': 'se',
236237
f'{signal}_n': 'sample_size',
237238
}, axis=1)
238-
df_export.to_csv(f'{export_dir}/{date}_{geo_res}_{signal}.csv',
239+
date_str = date.strftime('%Y%m%d')
240+
df_export.to_csv(f'{export_dir}/{date_str}_{geo_res}_{signal}.csv',
239241
na_rep='NA',
240242
index=False, )
241243

242244

243-
def process(current_filename: str,
244-
previous_filenames: List[str],
245+
def process(filenames: List[str],
245246
signal_names: List[str],
246247
wip_signal,
247248
geo_resolutions: List[str],
@@ -250,11 +251,11 @@ def process(current_filename: str,
250251
as averaged over the previous week.
251252
Parameters
252253
----------
253-
current_filename: str
254-
path to file holding the target date's data.
255-
previous_filenames: List[str]
256-
paths to files holding data from each day in the week preceding the
257-
target date.
254+
current_filename: List[str]
255+
paths to files holding data.
256+
The first entry of the list should correspond to the target date while
257+
the remaining entries should correspond to the dates from each day in
258+
the week preceding the target date.
258259
signal_names: List[str]
259260
signal names to be processed for a single date.
260261
A second version of each such signal named {SIGNAL}_7d_avg will be
@@ -274,8 +275,8 @@ def process(current_filename: str,
274275
one for the data averaged over the previous week to
275276
{export_dir}/{date}_{resolution}_{signal}_7d_avg.csv.
276277
"""
277-
past_week = [pd.read_csv(current_filename)]
278-
for fname in previous_filenames:
278+
past_week = []
279+
for fname in filenames:
279280
if os.path.exists(fname):
280281
past_week.append(pd.read_csv(fname))
281282

@@ -286,8 +287,8 @@ def process(current_filename: str,
286287
export_dir)
287288
# ...then as part of the whole window.
288289
process_window(past_week,
289-
add_prefix(add_suffix(signal_names, '_7d_avg'),
290-
wip_signal,
291-
'wip_'),
292-
geo_resolutions,
293-
export_dir)
290+
add_prefix(add_suffix(signal_names, '_7d_avg'),
291+
wip_signal,
292+
'wip_'),
293+
geo_resolutions,
294+
export_dir)

safegraph/delphi_safegraph/run.py

Lines changed: 40 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
when the module is run with `python -m MODULE_NAME`.
44
"""
55
import glob
6+
import functools
67
import multiprocessing as mp
78
import subprocess
89

@@ -15,50 +16,61 @@
1516
def run_module():
1617
"""Creates the Safegraph indicator."""
1718
params = read_params()
19+
20+
# Place to write output files.
1821
export_dir = params["export_dir"]
22+
# Location of input files.
1923
raw_data_dir = params["raw_data_dir"]
24+
25+
# Number of cores to use in multiprocessing.
2026
n_core = int(params["n_core"])
27+
28+
# AWS credentials
2129
aws_access_key_id = params["aws_access_key_id"]
2230
aws_secret_access_key = params["aws_secret_access_key"]
2331
aws_default_region = params["aws_default_region"]
2432
aws_endpoint = params["aws_endpoint"]
25-
wip_signal = params["wip_signal"]
33+
# Whether to sync `raw_data_dir` with an AWS backend.
34+
# Must be a bool in the JSON file (rather than the string "True" or "False")
35+
sync = params["sync"]
2636

27-
def process_file(current_filename):
28-
"""Wrapper around `process()` that only takes a single argument.
37+
# List of work-in-progress signal names.
38+
wip_signal = params["wip_signal"]
2939

30-
A single argument function is necessary to use `pool.map()` below.
31-
Because each call to `process()` has two arguments that are dependent
32-
on the input file name (`current_filename` and `previous_filenames`),
33-
we choose to use this wrapper rather than something like
34-
`functools.partial()`.
35-
"""
36-
return process(current_filename,
37-
files_in_past_week(current_filename),
38-
signal_names=SIGNALS,
39-
wip_signal=wip_signal,
40-
geo_resolutions=GEO_RESOLUTIONS,
41-
export_dir=export_dir,
42-
)
40+
# Convert `process()` to a single-argument function for use in `pool.map`.
41+
single_arg_process = functools.partial(
42+
process,
43+
signal_names=SIGNALS,
44+
wip_signal=wip_signal,
45+
geo_resolutions=GEO_RESOLUTIONS,
46+
export_dir=export_dir,
47+
)
4348

4449
# Update raw data
4550
# Why call subprocess rather than using a native Python client, e.g. boto3?
4651
# Because boto3 does not have a simple rsync-like call that can perform
4752
# the following behavior elegantly.
48-
subprocess.run(
49-
f'aws s3 sync s3://sg-c19-response/social-distancing/v2/ '
50-
f'{raw_data_dir}/social-distancing/ --endpoint {aws_endpoint}',
51-
env={
52-
'AWS_ACCESS_KEY_ID': aws_access_key_id,
53-
'AWS_SECRET_ACCESS_KEY': aws_secret_access_key,
54-
'AWS_DEFAULT_REGION': aws_default_region,
55-
},
56-
shell=True,
57-
check=True,
58-
)
53+
if sync:
54+
subprocess.run(
55+
f'aws s3 sync s3://sg-c19-response/social-distancing/v2/ '
56+
f'{raw_data_dir}/social-distancing/ --endpoint {aws_endpoint}',
57+
env={
58+
'AWS_ACCESS_KEY_ID': aws_access_key_id,
59+
'AWS_SECRET_ACCESS_KEY': aws_secret_access_key,
60+
'AWS_DEFAULT_REGION': aws_default_region,
61+
},
62+
shell=True,
63+
check=True,
64+
)
5965

6066
files = glob.glob(f'{raw_data_dir}/social-distancing/**/*.csv.gz',
6167
recursive=True)
6268

69+
files_with_previous_weeks = []
70+
for fname in files:
71+
previous_week = [fname]
72+
previous_week.extend(files_in_past_week(fname))
73+
files_with_previous_weeks.append(previous_week)
74+
6375
with mp.Pool(n_core) as pool:
64-
pool.map(process_file, files)
76+
pool.map(single_arg_process, files_with_previous_weeks)

safegraph/params.json.template

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,9 @@
88
"aws_secret_access_key": "",
99
"aws_default_region": "",
1010
"aws_endpoint": "",
11-
"wip_signal" : ""
11+
"sync": true,
12+
"wip_signal" : ["median_home_dwell_time_7d_avg",
13+
"completely_home_prop_7d_avg",
14+
"part_time_work_prop_7d_avg",
15+
"full_time_work_prop_7d_avg"]
1216
}

safegraph/tests/conftest.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# -*- coding: utf-8 -*-
2+
3+
import pytest
4+
5+
import os
6+
from os.path import join
7+
8+
from delphi_safegraph.run import run_module
9+
10+
11+
@pytest.fixture(scope="session")
12+
def run_as_module():
13+
# Clean receiving directory
14+
for fname in os.listdir("receiving"):
15+
if ".csv" in fname:
16+
os.remove(join("receiving", fname))
17+
run_module()

safegraph/tests/params.json.template

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"static_file_dir": "./static",
3-
"raw_data_dir": "/mnt/data/safegraph/",
3+
"raw_data_dir": "./raw_data",
44
"export_dir": "./receiving",
55
"cache_dir": "./cache",
66
"n_core": "12",
@@ -11,5 +11,6 @@
1111
"wip_signal" : ["median_home_dwell_time_7d_avg",
1212
"completely_home_prop_7d_avg",
1313
"part_time_work_prop_7d_avg",
14-
"full_time_work_prop_7d_avg"]
14+
"full_time_work_prop_7d_avg"],
15+
"sync": false
1516
}

safegraph/tests/receiving/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
*.csv

safegraph/tests/test_process.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ def test_process_window(self, tmp_path):
128128
'sample_size': [2, 2]
129129
})
130130
actual = pd.read_csv(
131-
export_dir / '2020-02-14_county_completely_home_prop.csv')
131+
export_dir / '20200214_county_completely_home_prop.csv')
132132
pd.testing.assert_frame_equal(expected, actual)
133133

134134
def test_process(self, tmp_path):
@@ -137,11 +137,11 @@ def test_process(self, tmp_path):
137137
export_dir = tmp_path / 'export'
138138
export_dir.mkdir()
139139

140-
process('raw_data/small_raw_data_0.csv',
141-
# File 2 does not exist.
142-
['raw_data/small_raw_data_1.csv',
140+
process(['raw_data/small_raw_data_0.csv',
141+
'raw_data/small_raw_data_1.csv',
142+
# File 2 does not exist.
143143
'raw_data/small_raw_data_2.csv',
144-
'raw_data/small_raw_data_3.csv', ],
144+
'raw_data/small_raw_data_3.csv'],
145145
SIGNALS,
146146
['median_home_dwell_time',
147147
'completely_home_prop_7d_avg'],
@@ -199,7 +199,7 @@ def test_process(self, tmp_path):
199199
})
200200
}
201201
actual = {signal: pd.read_csv(
202-
export_dir / f'2020-06-12_state_{signal}.csv')
202+
export_dir / f'20200612_state_{signal}.csv')
203203
for signal in expected}
204204
for signal in expected:
205205
pd.testing.assert_frame_equal(expected[signal], actual[signal])

safegraph/tests/test_run.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
"""Tests for the `run_module()` function."""
2+
import os
3+
4+
import pandas as pd
5+
6+
from delphi_safegraph.constants import (SIGNALS,
7+
GEO_RESOLUTIONS)
8+
9+
10+
class TestRun:
11+
"""Tests for the `run_module()` function."""
12+
13+
def test_output_files_exist(self, run_as_module):
14+
"""Tests that the outputs of `run_module` exist."""
15+
csv_files = set(
16+
x for x in os.listdir("receiving") if x.endswith(".csv"))
17+
expected_files = set()
18+
for date in ("20200612", "20200611", "20200610"):
19+
for geo in GEO_RESOLUTIONS:
20+
for signal in SIGNALS:
21+
print(date, geo, signal)
22+
single_date_signal = "_".join([date, geo, signal]) + ".csv"
23+
expected_files.add(single_date_signal)
24+
single_date_signal = "_".join(
25+
[date, geo, "wip", signal, "7d_avg"]) + ".csv"
26+
expected_files.add(single_date_signal)
27+
28+
assert expected_files == csv_files
29+
30+
def test_output_files_format(self, run_as_module):
31+
"""Tests that output files are in the correct format."""
32+
csv_files = os.listdir("receiving")
33+
for filename in csv_files:
34+
if not filename.endswith(".csv"):
35+
continue
36+
# Print the file name so that we can tell which file (if any)
37+
# triggered the error.
38+
print(filename)
39+
df = pd.read_csv(os.path.join("receiving", filename))
40+
assert (df.columns.values ==
41+
["geo_id", "val", "se", "sample_size"]).all()

safegraph_patterns/delphi_safegraph_patterns/run.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,13 @@ def run_module():
5858
# Why call subprocess rather than using a native Python client, e.g. boto3?
5959
# Because boto3 does not have a simple rsync-like call that can perform
6060
# the following behavior elegantly.
61-
if not bool(params["test"]):
61+
if params["sync"]:
6262
subprocess.run(
6363
f'aws s3 sync s3://sg-c19-response/{ver[1]}/ '
6464
f'{raw_data_dir}/{ver[1]}/ --endpoint {aws_endpoint}',
6565
env=env_vars,
6666
shell=True,
67+
check=True
6768
)
6869

6970
brand_df = pd.read_csv(

safegraph_patterns/params.json.template

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@
88
"aws_secret_access_key": "",
99
"aws_default_region": "",
1010
"aws_endpoint": "",
11-
"test": "False"
11+
"sync": true,
1212
}

safegraph_patterns/tests/params.json.template

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@
88
"aws_secret_access_key": "",
99
"aws_default_region": "",
1010
"aws_endpoint": "",
11-
"test": "True"
11+
"sync": false
1212
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
*.csv

safegraph_patterns/tests/test_run.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from delphi_safegraph_patterns.run import (run_module, METRICS,
77
SENSORS, GEO_RESOLUTIONS)
8-
8+
99

1010
class TestRun:
1111
def test_output_files(self, run_as_module):

0 commit comments

Comments
 (0)