Skip to content

Gate pulling/syncing data from AWS in safegraph behind a flag in params #363

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Oct 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions safegraph/delphi_safegraph/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# Base file name for raw data CSVs.
CSV_NAME = 'social-distancing.csv.gz'


def validate(df):
"""Confirms that a data frame has only one date."""
timestamps = df['date_range_start'].apply(date_from_timestamp)
Expand Down Expand Up @@ -235,13 +236,13 @@ def process_window(df_list: List[pd.DataFrame],
f'{signal}_se': 'se',
f'{signal}_n': 'sample_size',
}, axis=1)
df_export.to_csv(f'{export_dir}/{date}_{geo_res}_{signal}.csv',
date_str = date.strftime('%Y%m%d')
df_export.to_csv(f'{export_dir}/{date_str}_{geo_res}_{signal}.csv',
na_rep='NA',
index=False, )


def process(current_filename: str,
previous_filenames: List[str],
def process(filenames: List[str],
signal_names: List[str],
wip_signal,
geo_resolutions: List[str],
Expand All @@ -250,11 +251,11 @@ def process(current_filename: str,
as averaged over the previous week.
Parameters
----------
current_filename: str
path to file holding the target date's data.
previous_filenames: List[str]
paths to files holding data from each day in the week preceding the
target date.
current_filename: List[str]
paths to files holding data.
The first entry of the list should correspond to the target date while
the remaining entries should correspond to the dates from each day in
the week preceding the target date.
signal_names: List[str]
signal names to be processed for a single date.
A second version of each such signal named {SIGNAL}_7d_avg will be
Expand All @@ -274,8 +275,8 @@ def process(current_filename: str,
one for the data averaged over the previous week to
{export_dir}/{date}_{resolution}_{signal}_7d_avg.csv.
"""
past_week = [pd.read_csv(current_filename)]
for fname in previous_filenames:
past_week = []
for fname in filenames:
if os.path.exists(fname):
past_week.append(pd.read_csv(fname))

Expand All @@ -286,8 +287,8 @@ def process(current_filename: str,
export_dir)
# ...then as part of the whole window.
process_window(past_week,
add_prefix(add_suffix(signal_names, '_7d_avg'),
wip_signal,
'wip_'),
geo_resolutions,
export_dir)
add_prefix(add_suffix(signal_names, '_7d_avg'),
wip_signal,
'wip_'),
geo_resolutions,
export_dir)
68 changes: 40 additions & 28 deletions safegraph/delphi_safegraph/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
when the module is run with `python -m MODULE_NAME`.
"""
import glob
import functools
import multiprocessing as mp
import subprocess

Expand All @@ -15,50 +16,61 @@
def run_module():
"""Creates the Safegraph indicator."""
params = read_params()

# Place to write output files.
export_dir = params["export_dir"]
# Location of input files.
raw_data_dir = params["raw_data_dir"]

# Number of cores to use in multiprocessing.
n_core = int(params["n_core"])

# AWS credentials
aws_access_key_id = params["aws_access_key_id"]
aws_secret_access_key = params["aws_secret_access_key"]
aws_default_region = params["aws_default_region"]
aws_endpoint = params["aws_endpoint"]
wip_signal = params["wip_signal"]
# Whether to sync `raw_data_dir` with an AWS backend.
# Must be a bool in the JSON file (rather than the string "True" or "False")
sync = params["sync"]

def process_file(current_filename):
"""Wrapper around `process()` that only takes a single argument.
# List of work-in-progress signal names.
wip_signal = params["wip_signal"]

A single argument function is necessary to use `pool.map()` below.
Because each call to `process()` has two arguments that are dependent
on the input file name (`current_filename` and `previous_filenames`),
we choose to use this wrapper rather than something like
`functools.partial()`.
"""
return process(current_filename,
files_in_past_week(current_filename),
signal_names=SIGNALS,
wip_signal=wip_signal,
geo_resolutions=GEO_RESOLUTIONS,
export_dir=export_dir,
)
# Convert `process()` to a single-argument function for use in `pool.map`.
single_arg_process = functools.partial(
process,
signal_names=SIGNALS,
wip_signal=wip_signal,
geo_resolutions=GEO_RESOLUTIONS,
export_dir=export_dir,
)

# Update raw data
# Why call subprocess rather than using a native Python client, e.g. boto3?
# Because boto3 does not have a simple rsync-like call that can perform
# the following behavior elegantly.
subprocess.run(
f'aws s3 sync s3://sg-c19-response/social-distancing/v2/ '
f'{raw_data_dir}/social-distancing/ --endpoint {aws_endpoint}',
env={
'AWS_ACCESS_KEY_ID': aws_access_key_id,
'AWS_SECRET_ACCESS_KEY': aws_secret_access_key,
'AWS_DEFAULT_REGION': aws_default_region,
},
shell=True,
check=True,
)
if sync:
subprocess.run(
f'aws s3 sync s3://sg-c19-response/social-distancing/v2/ '
f'{raw_data_dir}/social-distancing/ --endpoint {aws_endpoint}',
env={
'AWS_ACCESS_KEY_ID': aws_access_key_id,
'AWS_SECRET_ACCESS_KEY': aws_secret_access_key,
'AWS_DEFAULT_REGION': aws_default_region,
},
shell=True,
check=True,
)

files = glob.glob(f'{raw_data_dir}/social-distancing/**/*.csv.gz',
recursive=True)

files_with_previous_weeks = []
for fname in files:
previous_week = [fname]
previous_week.extend(files_in_past_week(fname))
files_with_previous_weeks.append(previous_week)

with mp.Pool(n_core) as pool:
pool.map(process_file, files)
pool.map(single_arg_process, files_with_previous_weeks)
6 changes: 5 additions & 1 deletion safegraph/params.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,9 @@
"aws_secret_access_key": "",
"aws_default_region": "",
"aws_endpoint": "",
"wip_signal" : ""
"sync": true,
"wip_signal" : ["median_home_dwell_time_7d_avg",
"completely_home_prop_7d_avg",
"part_time_work_prop_7d_avg",
"full_time_work_prop_7d_avg"]
}
17 changes: 17 additions & 0 deletions safegraph/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-

import pytest

import os
from os.path import join

from delphi_safegraph.run import run_module


@pytest.fixture(scope="session")
def run_as_module():
# Clean receiving directory
for fname in os.listdir("receiving"):
if ".csv" in fname:
os.remove(join("receiving", fname))
run_module()
5 changes: 3 additions & 2 deletions safegraph/tests/params.json.template
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"static_file_dir": "./static",
"raw_data_dir": "/mnt/data/safegraph/",
"raw_data_dir": "./raw_data",
"export_dir": "./receiving",
"cache_dir": "./cache",
"n_core": "12",
Expand All @@ -11,5 +11,6 @@
"wip_signal" : ["median_home_dwell_time_7d_avg",
"completely_home_prop_7d_avg",
"part_time_work_prop_7d_avg",
"full_time_work_prop_7d_avg"]
"full_time_work_prop_7d_avg"],
"sync": false
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
1 change: 1 addition & 0 deletions safegraph/tests/receiving/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.csv
12 changes: 6 additions & 6 deletions safegraph/tests/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def test_process_window(self, tmp_path):
'sample_size': [2, 2]
})
actual = pd.read_csv(
export_dir / '2020-02-14_county_completely_home_prop.csv')
export_dir / '20200214_county_completely_home_prop.csv')
pd.testing.assert_frame_equal(expected, actual)

def test_process(self, tmp_path):
Expand All @@ -137,11 +137,11 @@ def test_process(self, tmp_path):
export_dir = tmp_path / 'export'
export_dir.mkdir()

process('raw_data/small_raw_data_0.csv',
# File 2 does not exist.
['raw_data/small_raw_data_1.csv',
process(['raw_data/small_raw_data_0.csv',
'raw_data/small_raw_data_1.csv',
# File 2 does not exist.
'raw_data/small_raw_data_2.csv',
'raw_data/small_raw_data_3.csv', ],
'raw_data/small_raw_data_3.csv'],
SIGNALS,
['median_home_dwell_time',
'completely_home_prop_7d_avg'],
Expand Down Expand Up @@ -199,7 +199,7 @@ def test_process(self, tmp_path):
})
}
actual = {signal: pd.read_csv(
export_dir / f'2020-06-12_state_{signal}.csv')
export_dir / f'20200612_state_{signal}.csv')
for signal in expected}
for signal in expected:
pd.testing.assert_frame_equal(expected[signal], actual[signal])
41 changes: 41 additions & 0 deletions safegraph/tests/test_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Tests for the `run_module()` function."""
import os

import pandas as pd

from delphi_safegraph.constants import (SIGNALS,
GEO_RESOLUTIONS)


class TestRun:
"""Tests for the `run_module()` function."""

def test_output_files_exist(self, run_as_module):
"""Tests that the outputs of `run_module` exist."""
csv_files = set(
x for x in os.listdir("receiving") if x.endswith(".csv"))
expected_files = set()
for date in ("20200612", "20200611", "20200610"):
for geo in GEO_RESOLUTIONS:
for signal in SIGNALS:
print(date, geo, signal)
single_date_signal = "_".join([date, geo, signal]) + ".csv"
expected_files.add(single_date_signal)
single_date_signal = "_".join(
[date, geo, "wip", signal, "7d_avg"]) + ".csv"
expected_files.add(single_date_signal)

assert expected_files == csv_files

def test_output_files_format(self, run_as_module):
"""Tests that output files are in the correct format."""
csv_files = os.listdir("receiving")
for filename in csv_files:
if not filename.endswith(".csv"):
continue
# Print the file name so that we can tell which file (if any)
# triggered the error.
print(filename)
df = pd.read_csv(os.path.join("receiving", filename))
assert (df.columns.values ==
["geo_id", "val", "se", "sample_size"]).all()
3 changes: 2 additions & 1 deletion safegraph_patterns/delphi_safegraph_patterns/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ def run_module():
# Why call subprocess rather than using a native Python client, e.g. boto3?
# Because boto3 does not have a simple rsync-like call that can perform
# the following behavior elegantly.
if not bool(params["test"]):
if params["sync"]:
subprocess.run(
f'aws s3 sync s3://sg-c19-response/{ver[1]}/ '
f'{raw_data_dir}/{ver[1]}/ --endpoint {aws_endpoint}',
env=env_vars,
shell=True,
check=True
)

brand_df = pd.read_csv(
Expand Down
2 changes: 1 addition & 1 deletion safegraph_patterns/params.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
"aws_secret_access_key": "",
"aws_default_region": "",
"aws_endpoint": "",
"test": "False"
"sync": true,
}
2 changes: 1 addition & 1 deletion safegraph_patterns/tests/params.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
"aws_secret_access_key": "",
"aws_default_region": "",
"aws_endpoint": "",
"test": "True"
"sync": false
}
1 change: 1 addition & 0 deletions safegraph_patterns/tests/receiving/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.csv
2 changes: 1 addition & 1 deletion safegraph_patterns/tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from delphi_safegraph_patterns.run import (run_module, METRICS,
SENSORS, GEO_RESOLUTIONS)


class TestRun:
def test_output_files(self, run_as_module):
Expand Down