diff --git a/safegraph/delphi_safegraph/process.py b/safegraph/delphi_safegraph/process.py index f51b1faff..5baf6266b 100644 --- a/safegraph/delphi_safegraph/process.py +++ b/safegraph/delphi_safegraph/process.py @@ -15,6 +15,7 @@ # Base file name for raw data CSVs. CSV_NAME = 'social-distancing.csv.gz' + def validate(df): """Confirms that a data frame has only one date.""" timestamps = df['date_range_start'].apply(date_from_timestamp) @@ -235,13 +236,13 @@ def process_window(df_list: List[pd.DataFrame], f'{signal}_se': 'se', f'{signal}_n': 'sample_size', }, axis=1) - df_export.to_csv(f'{export_dir}/{date}_{geo_res}_{signal}.csv', + date_str = date.strftime('%Y%m%d') + df_export.to_csv(f'{export_dir}/{date_str}_{geo_res}_{signal}.csv', na_rep='NA', index=False, ) -def process(current_filename: str, - previous_filenames: List[str], +def process(filenames: List[str], signal_names: List[str], wip_signal, geo_resolutions: List[str], @@ -250,11 +251,11 @@ def process(current_filename: str, as averaged over the previous week. Parameters ---------- - current_filename: str - path to file holding the target date's data. - previous_filenames: List[str] - paths to files holding data from each day in the week preceding the - target date. + current_filename: List[str] + paths to files holding data. + The first entry of the list should correspond to the target date while + the remaining entries should correspond to the dates from each day in + the week preceding the target date. signal_names: List[str] signal names to be processed for a single date. A second version of each such signal named {SIGNAL}_7d_avg will be @@ -274,8 +275,8 @@ def process(current_filename: str, one for the data averaged over the previous week to {export_dir}/{date}_{resolution}_{signal}_7d_avg.csv. """ - past_week = [pd.read_csv(current_filename)] - for fname in previous_filenames: + past_week = [] + for fname in filenames: if os.path.exists(fname): past_week.append(pd.read_csv(fname)) @@ -286,8 +287,8 @@ def process(current_filename: str, export_dir) # ...then as part of the whole window. process_window(past_week, - add_prefix(add_suffix(signal_names, '_7d_avg'), - wip_signal, - 'wip_'), - geo_resolutions, - export_dir) + add_prefix(add_suffix(signal_names, '_7d_avg'), + wip_signal, + 'wip_'), + geo_resolutions, + export_dir) diff --git a/safegraph/delphi_safegraph/run.py b/safegraph/delphi_safegraph/run.py index 3fa4105af..305a9772f 100644 --- a/safegraph/delphi_safegraph/run.py +++ b/safegraph/delphi_safegraph/run.py @@ -3,6 +3,7 @@ when the module is run with `python -m MODULE_NAME`. """ import glob +import functools import multiprocessing as mp import subprocess @@ -15,50 +16,61 @@ def run_module(): """Creates the Safegraph indicator.""" params = read_params() + + # Place to write output files. export_dir = params["export_dir"] + # Location of input files. raw_data_dir = params["raw_data_dir"] + + # Number of cores to use in multiprocessing. n_core = int(params["n_core"]) + + # AWS credentials aws_access_key_id = params["aws_access_key_id"] aws_secret_access_key = params["aws_secret_access_key"] aws_default_region = params["aws_default_region"] aws_endpoint = params["aws_endpoint"] - wip_signal = params["wip_signal"] + # Whether to sync `raw_data_dir` with an AWS backend. + # Must be a bool in the JSON file (rather than the string "True" or "False") + sync = params["sync"] - def process_file(current_filename): - """Wrapper around `process()` that only takes a single argument. + # List of work-in-progress signal names. + wip_signal = params["wip_signal"] - A single argument function is necessary to use `pool.map()` below. - Because each call to `process()` has two arguments that are dependent - on the input file name (`current_filename` and `previous_filenames`), - we choose to use this wrapper rather than something like - `functools.partial()`. - """ - return process(current_filename, - files_in_past_week(current_filename), - signal_names=SIGNALS, - wip_signal=wip_signal, - geo_resolutions=GEO_RESOLUTIONS, - export_dir=export_dir, - ) + # Convert `process()` to a single-argument function for use in `pool.map`. + single_arg_process = functools.partial( + process, + signal_names=SIGNALS, + wip_signal=wip_signal, + geo_resolutions=GEO_RESOLUTIONS, + export_dir=export_dir, + ) # Update raw data # Why call subprocess rather than using a native Python client, e.g. boto3? # Because boto3 does not have a simple rsync-like call that can perform # the following behavior elegantly. - subprocess.run( - f'aws s3 sync s3://sg-c19-response/social-distancing/v2/ ' - f'{raw_data_dir}/social-distancing/ --endpoint {aws_endpoint}', - env={ - 'AWS_ACCESS_KEY_ID': aws_access_key_id, - 'AWS_SECRET_ACCESS_KEY': aws_secret_access_key, - 'AWS_DEFAULT_REGION': aws_default_region, - }, - shell=True, - check=True, - ) + if sync: + subprocess.run( + f'aws s3 sync s3://sg-c19-response/social-distancing/v2/ ' + f'{raw_data_dir}/social-distancing/ --endpoint {aws_endpoint}', + env={ + 'AWS_ACCESS_KEY_ID': aws_access_key_id, + 'AWS_SECRET_ACCESS_KEY': aws_secret_access_key, + 'AWS_DEFAULT_REGION': aws_default_region, + }, + shell=True, + check=True, + ) files = glob.glob(f'{raw_data_dir}/social-distancing/**/*.csv.gz', recursive=True) + files_with_previous_weeks = [] + for fname in files: + previous_week = [fname] + previous_week.extend(files_in_past_week(fname)) + files_with_previous_weeks.append(previous_week) + with mp.Pool(n_core) as pool: - pool.map(process_file, files) + pool.map(single_arg_process, files_with_previous_weeks) diff --git a/safegraph/params.json.template b/safegraph/params.json.template index 7e6096821..c0f200031 100644 --- a/safegraph/params.json.template +++ b/safegraph/params.json.template @@ -8,5 +8,9 @@ "aws_secret_access_key": "", "aws_default_region": "", "aws_endpoint": "", - "wip_signal" : "" + "sync": true, + "wip_signal" : ["median_home_dwell_time_7d_avg", + "completely_home_prop_7d_avg", + "part_time_work_prop_7d_avg", + "full_time_work_prop_7d_avg"] } diff --git a/safegraph/tests/conftest.py b/safegraph/tests/conftest.py new file mode 100644 index 000000000..ca172928b --- /dev/null +++ b/safegraph/tests/conftest.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- + +import pytest + +import os +from os.path import join + +from delphi_safegraph.run import run_module + + +@pytest.fixture(scope="session") +def run_as_module(): + # Clean receiving directory + for fname in os.listdir("receiving"): + if ".csv" in fname: + os.remove(join("receiving", fname)) + run_module() diff --git a/safegraph/tests/params.json.template b/safegraph/tests/params.json.template index 6839ac4e6..1b6e18f99 100644 --- a/safegraph/tests/params.json.template +++ b/safegraph/tests/params.json.template @@ -1,6 +1,6 @@ { "static_file_dir": "./static", - "raw_data_dir": "/mnt/data/safegraph/", + "raw_data_dir": "./raw_data", "export_dir": "./receiving", "cache_dir": "./cache", "n_core": "12", @@ -11,5 +11,6 @@ "wip_signal" : ["median_home_dwell_time_7d_avg", "completely_home_prop_7d_avg", "part_time_work_prop_7d_avg", - "full_time_work_prop_7d_avg"] + "full_time_work_prop_7d_avg"], + "sync": false } diff --git a/safegraph/tests/raw_data/social-distancing/2020/06/10/2020-06-10-social-distancing.csv.gz b/safegraph/tests/raw_data/social-distancing/2020/06/10/2020-06-10-social-distancing.csv.gz new file mode 100644 index 000000000..81f737597 Binary files /dev/null and b/safegraph/tests/raw_data/social-distancing/2020/06/10/2020-06-10-social-distancing.csv.gz differ diff --git a/safegraph/tests/raw_data/social-distancing/2020/06/11/2020-06-11-social-distancing.csv.gz b/safegraph/tests/raw_data/social-distancing/2020/06/11/2020-06-11-social-distancing.csv.gz new file mode 100644 index 000000000..195d12ca6 Binary files /dev/null and b/safegraph/tests/raw_data/social-distancing/2020/06/11/2020-06-11-social-distancing.csv.gz differ diff --git a/safegraph/tests/raw_data/social-distancing/2020/06/12/2020-06-12-social-distancing.csv.gz b/safegraph/tests/raw_data/social-distancing/2020/06/12/2020-06-12-social-distancing.csv.gz new file mode 100644 index 000000000..56fa6ac41 Binary files /dev/null and b/safegraph/tests/raw_data/social-distancing/2020/06/12/2020-06-12-social-distancing.csv.gz differ diff --git a/safegraph/tests/receiving/.gitignore b/safegraph/tests/receiving/.gitignore new file mode 100644 index 000000000..afed0735d --- /dev/null +++ b/safegraph/tests/receiving/.gitignore @@ -0,0 +1 @@ +*.csv diff --git a/safegraph/tests/test_process.py b/safegraph/tests/test_process.py index e6d6ddc0e..4c4640ca6 100644 --- a/safegraph/tests/test_process.py +++ b/safegraph/tests/test_process.py @@ -128,7 +128,7 @@ def test_process_window(self, tmp_path): 'sample_size': [2, 2] }) actual = pd.read_csv( - export_dir / '2020-02-14_county_completely_home_prop.csv') + export_dir / '20200214_county_completely_home_prop.csv') pd.testing.assert_frame_equal(expected, actual) def test_process(self, tmp_path): @@ -137,11 +137,11 @@ def test_process(self, tmp_path): export_dir = tmp_path / 'export' export_dir.mkdir() - process('raw_data/small_raw_data_0.csv', - # File 2 does not exist. - ['raw_data/small_raw_data_1.csv', + process(['raw_data/small_raw_data_0.csv', + 'raw_data/small_raw_data_1.csv', + # File 2 does not exist. 'raw_data/small_raw_data_2.csv', - 'raw_data/small_raw_data_3.csv', ], + 'raw_data/small_raw_data_3.csv'], SIGNALS, ['median_home_dwell_time', 'completely_home_prop_7d_avg'], @@ -199,7 +199,7 @@ def test_process(self, tmp_path): }) } actual = {signal: pd.read_csv( - export_dir / f'2020-06-12_state_{signal}.csv') + export_dir / f'20200612_state_{signal}.csv') for signal in expected} for signal in expected: pd.testing.assert_frame_equal(expected[signal], actual[signal]) diff --git a/safegraph/tests/test_run.py b/safegraph/tests/test_run.py new file mode 100644 index 000000000..689122b46 --- /dev/null +++ b/safegraph/tests/test_run.py @@ -0,0 +1,41 @@ +"""Tests for the `run_module()` function.""" +import os + +import pandas as pd + +from delphi_safegraph.constants import (SIGNALS, + GEO_RESOLUTIONS) + + +class TestRun: + """Tests for the `run_module()` function.""" + + def test_output_files_exist(self, run_as_module): + """Tests that the outputs of `run_module` exist.""" + csv_files = set( + x for x in os.listdir("receiving") if x.endswith(".csv")) + expected_files = set() + for date in ("20200612", "20200611", "20200610"): + for geo in GEO_RESOLUTIONS: + for signal in SIGNALS: + print(date, geo, signal) + single_date_signal = "_".join([date, geo, signal]) + ".csv" + expected_files.add(single_date_signal) + single_date_signal = "_".join( + [date, geo, "wip", signal, "7d_avg"]) + ".csv" + expected_files.add(single_date_signal) + + assert expected_files == csv_files + + def test_output_files_format(self, run_as_module): + """Tests that output files are in the correct format.""" + csv_files = os.listdir("receiving") + for filename in csv_files: + if not filename.endswith(".csv"): + continue + # Print the file name so that we can tell which file (if any) + # triggered the error. + print(filename) + df = pd.read_csv(os.path.join("receiving", filename)) + assert (df.columns.values == + ["geo_id", "val", "se", "sample_size"]).all() diff --git a/safegraph_patterns/delphi_safegraph_patterns/run.py b/safegraph_patterns/delphi_safegraph_patterns/run.py index be2618f2d..ce615f787 100644 --- a/safegraph_patterns/delphi_safegraph_patterns/run.py +++ b/safegraph_patterns/delphi_safegraph_patterns/run.py @@ -58,12 +58,13 @@ def run_module(): # Why call subprocess rather than using a native Python client, e.g. boto3? # Because boto3 does not have a simple rsync-like call that can perform # the following behavior elegantly. - if not bool(params["test"]): + if params["sync"]: subprocess.run( f'aws s3 sync s3://sg-c19-response/{ver[1]}/ ' f'{raw_data_dir}/{ver[1]}/ --endpoint {aws_endpoint}', env=env_vars, shell=True, + check=True ) brand_df = pd.read_csv( diff --git a/safegraph_patterns/params.json.template b/safegraph_patterns/params.json.template index 317c1693f..fe94f416b 100644 --- a/safegraph_patterns/params.json.template +++ b/safegraph_patterns/params.json.template @@ -8,5 +8,5 @@ "aws_secret_access_key": "", "aws_default_region": "", "aws_endpoint": "", - "test": "False" + "sync": true, } diff --git a/safegraph_patterns/tests/params.json.template b/safegraph_patterns/tests/params.json.template index b4eb1d471..78fd73fe5 100644 --- a/safegraph_patterns/tests/params.json.template +++ b/safegraph_patterns/tests/params.json.template @@ -8,5 +8,5 @@ "aws_secret_access_key": "", "aws_default_region": "", "aws_endpoint": "", - "test": "True" + "sync": false } diff --git a/safegraph_patterns/tests/receiving/.gitignore b/safegraph_patterns/tests/receiving/.gitignore index e69de29bb..afed0735d 100644 --- a/safegraph_patterns/tests/receiving/.gitignore +++ b/safegraph_patterns/tests/receiving/.gitignore @@ -0,0 +1 @@ +*.csv diff --git a/safegraph_patterns/tests/test_run.py b/safegraph_patterns/tests/test_run.py index b4bd90ceb..8644eb6c4 100644 --- a/safegraph_patterns/tests/test_run.py +++ b/safegraph_patterns/tests/test_run.py @@ -5,7 +5,7 @@ from delphi_safegraph_patterns.run import (run_module, METRICS, SENSORS, GEO_RESOLUTIONS) - + class TestRun: def test_output_files(self, run_as_module):