Skip to content

Commit d266d21

Browse files
sgsmobkrivard
authored andcommitted
migrate safegraph.run back onto using functools.partial
1 parent d243e3f commit d266d21

File tree

3 files changed

+32
-34
lines changed

3 files changed

+32
-34
lines changed

safegraph/delphi_safegraph/process.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# Base file name for raw data CSVs.
1616
CSV_NAME = 'social-distancing.csv.gz'
1717

18+
1819
def validate(df):
1920
"""Confirms that a data frame has only one date."""
2021
timestamps = df['date_range_start'].apply(date_from_timestamp)
@@ -240,8 +241,7 @@ def process_window(df_list: List[pd.DataFrame],
240241
index=False, )
241242

242243

243-
def process(current_filename: str,
244-
previous_filenames: List[str],
244+
def process(filenames: List[str],
245245
signal_names: List[str],
246246
wip_signal,
247247
geo_resolutions: List[str],
@@ -250,11 +250,11 @@ def process(current_filename: str,
250250
as averaged over the previous week.
251251
Parameters
252252
----------
253-
current_filename: str
254-
path to file holding the target date's data.
255-
previous_filenames: List[str]
256-
paths to files holding data from each day in the week preceding the
257-
target date.
253+
current_filename: List[str]
254+
paths to files holding data.
255+
The first entry of the list should correspond to the target date while
256+
the remaining entries should correspond to the dates from each day in
257+
the week preceding the target date.
258258
signal_names: List[str]
259259
signal names to be processed for a single date.
260260
A second version of each such signal named {SIGNAL}_7d_avg will be
@@ -274,8 +274,8 @@ def process(current_filename: str,
274274
one for the data averaged over the previous week to
275275
{export_dir}/{date}_{resolution}_{signal}_7d_avg.csv.
276276
"""
277-
past_week = [pd.read_csv(current_filename)]
278-
for fname in previous_filenames:
277+
past_week = []
278+
for fname in filenames:
279279
if os.path.exists(fname):
280280
past_week.append(pd.read_csv(fname))
281281

@@ -286,8 +286,8 @@ def process(current_filename: str,
286286
export_dir)
287287
# ...then as part of the whole window.
288288
process_window(past_week,
289-
add_prefix(add_suffix(signal_names, '_7d_avg'),
290-
wip_signal,
291-
'wip_'),
292-
geo_resolutions,
293-
export_dir)
289+
add_prefix(add_suffix(signal_names, '_7d_avg'),
290+
wip_signal,
291+
'wip_'),
292+
geo_resolutions,
293+
export_dir)

safegraph/delphi_safegraph/run.py

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
when the module is run with `python -m MODULE_NAME`.
44
"""
55
import glob
6+
import functools
67
import multiprocessing as mp
78
import subprocess
89

@@ -24,22 +25,13 @@ def run_module():
2425
aws_endpoint = params["aws_endpoint"]
2526
wip_signal = params["wip_signal"]
2627

27-
def process_file(current_filename):
28-
"""Wrapper around `process()` that only takes a single argument.
29-
30-
A single argument function is necessary to use `pool.map()` below.
31-
Because each call to `process()` has two arguments that are dependent
32-
on the input file name (`current_filename` and `previous_filenames`),
33-
we choose to use this wrapper rather than something like
34-
`functools.partial()`.
35-
"""
36-
return process(current_filename,
37-
files_in_past_week(current_filename),
38-
signal_names=SIGNALS,
39-
wip_signal=wip_signal,
40-
geo_resolutions=GEO_RESOLUTIONS,
41-
export_dir=export_dir,
42-
)
28+
single_arg_process = functools.partial(
29+
process,
30+
signal_names=SIGNALS,
31+
wip_signal=wip_signal,
32+
geo_resolutions=GEO_RESOLUTIONS,
33+
export_dir=export_dir,
34+
)
4335

4436
# Update raw data
4537
# Why call subprocess rather than using a native Python client, e.g. boto3?
@@ -60,5 +52,11 @@ def process_file(current_filename):
6052
files = glob.glob(f'{raw_data_dir}/social-distancing/**/*.csv.gz',
6153
recursive=True)
6254

55+
files_with_previous_weeks = []
56+
for fname in files:
57+
previous_week = [fname]
58+
previous_week.extend(files_in_past_week(fname))
59+
files_with_previous_weeks.append(previous_week)
60+
6361
with mp.Pool(n_core) as pool:
64-
pool.map(process_file, files)
62+
pool.map(single_arg_process, files_with_previous_weeks)

safegraph/tests/test_process.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,9 @@ def test_process(self, tmp_path):
137137
export_dir = tmp_path / 'export'
138138
export_dir.mkdir()
139139

140-
process('raw_data/small_raw_data_0.csv',
141-
# File 2 does not exist.
142-
['raw_data/small_raw_data_1.csv',
140+
process(['raw_data/small_raw_data_0.csv',
141+
'raw_data/small_raw_data_1.csv',
142+
# File 2 does not exist.
143143
'raw_data/small_raw_data_2.csv',
144144
'raw_data/small_raw_data_3.csv', ],
145145
SIGNALS,

0 commit comments

Comments
 (0)