Skip to content

Commit 9e398ae

Browse files
committed
Update: added new use case in safegraph
1 parent 53aeb39 commit 9e398ae

File tree

4 files changed

+113
-82
lines changed

4 files changed

+113
-82
lines changed

safegraph/delphi_safegraph/process.py

Lines changed: 71 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,60 +1,91 @@
1-
# -*- coding: utf-8 -*-
21
import numpy as np
32
import pandas as pd
43

54
from .geo import FIPS_TO_STATE
65

76
# Magic number for modular arithmetic; CBG -> FIPS
87
MOD = 10000000
8+
from delphi_utils import read_params
9+
from delphi_epidata import Epidata
10+
11+
12+
# Add prefix to the signal name, if needed
13+
def signal_name(signal_names, wip_signal, prefix):
14+
if wip_signal is not None:
15+
if wip_signal and type(wip_signal) == bool:
16+
new_signal_list = []
17+
[new_signal_list.append(prefix + signal) if epidata_signal(signal) else new_signal_list.append(signal) for
18+
signal in signal_names]
19+
return new_signal_list
20+
if type(wip_signal) == list:
21+
for signal in wip_signal:
22+
if epidata_signal(signal):
23+
new_list = [prefix + signal]
24+
signal_names.remove(signal)
25+
signal_names.extend(new_list)
26+
return signal_names
27+
28+
29+
# Check if the signal name is public
30+
def epidata_signal(signal_):
31+
epidata_df = Epidata.covidcast_meta()
32+
for index in range(len(epidata_df['epidata'])):
33+
for key in epidata_df['epidata'][index]:
34+
if key == 'signal':
35+
if epidata_df['epidata'][index][key] == signal_:
36+
return False
37+
return True
938

10-
def construct_signals(cbg_df, signal_names):
11-
'''Construct Census-block level signals.
1239

40+
def construct_signals(cbg_df, signal_names):
41+
"""Construct Census-block level signals.
1342
In its current form, we prepare the following signals in addition to those
1443
already available in raw form from Safegraph:
15-
1644
- completely_home_prop, defined as:
1745
completely_home_device_count / device_count
1846
- full_time_work_prop, defined as:
1947
full_time_work_behavior_devices / device_count
2048
- part_time_work_prop, defined as:
2149
part_time_work_behavior_devices / device_count
22-
2350
Documentation for the social distancing metrics:
2451
https://docs.safegraph.com/docs/social-distancing-metrics
25-
2652
Parameters
2753
----------
2854
cbg_df: pd.DataFrame
2955
Census block group-level dataframe with raw social distancing
3056
indicators from Safegraph.
3157
signal_names: List[str]
3258
Names of signals to be exported.
33-
3459
Returns
3560
-------
3661
pd.DataFrame
3762
Dataframe with columns: timestamp, county_fips, and
3863
{each signal described above}.
39-
'''
64+
"""
65+
66+
COMPLETELY_HOME = signal_names[1]
67+
FULL_TIME_WORK = signal_names[2]
68+
PART_TIME_WORK = signal_names[3]
69+
4070
# Preparation
4171
cbg_df['timestamp'] = cbg_df['date_range_start'].apply(
42-
lambda x: str(x).split('T')[0])
72+
lambda x: str(x).split('T')[0])
4373
cbg_df['county_fips'] = (cbg_df['origin_census_block_group'] // MOD).apply(
44-
lambda x: f'{int(x):05d}')
74+
lambda x: f'{int(x):05d}')
4575
# Transformation: create signal not available in raw data
46-
cbg_df['completely_home_prop'] = (cbg_df['completely_home_device_count']
47-
/ cbg_df['device_count'])
48-
cbg_df['full_time_work_prop'] = (cbg_df['full_time_work_behavior_devices']
49-
/ cbg_df['device_count'])
50-
cbg_df['part_time_work_prop'] = (cbg_df['part_time_work_behavior_devices']
51-
/ cbg_df['device_count'])
76+
cbg_df[COMPLETELY_HOME] = (cbg_df['completely_home_device_count']
77+
/ cbg_df['device_count'])
78+
cbg_df[FULL_TIME_WORK] = (cbg_df['full_time_work_behavior_devices']
79+
/ cbg_df['device_count'])
80+
cbg_df[PART_TIME_WORK] = (cbg_df['part_time_work_behavior_devices']
81+
/ cbg_df['device_count'])
82+
5283
# Subsetting
5384
return cbg_df[['timestamp', 'county_fips'] + signal_names]
5485

86+
5587
def aggregate(df, signal_names, geo_resolution='county'):
5688
'''Aggregate signals to appropriate resolution and produce standard errors.
57-
5889
Parameters
5990
----------
6091
df: pd.DataFrame
@@ -64,7 +95,6 @@ def aggregate(df, signal_names, geo_resolution='county'):
6495
Names of signals to be exported.
6596
geo_resolution: str
6697
One of ('county', 'state')
67-
6898
Returns
6999
-------
70100
pd.DataFrame:
@@ -77,34 +107,34 @@ def aggregate(df, signal_names, geo_resolution='county'):
77107
df['geo_id'] = df['county_fips']
78108
elif geo_resolution == 'state':
79109
df['geo_id'] = df['county_fips'].apply(lambda x:
80-
FIPS_TO_STATE[x[:2]])
110+
FIPS_TO_STATE[x[:2]])
81111
else:
82112
raise ValueError(f'`geo_resolution` must be one of {GEO_RESOLUTION}.')
83113

84114
# Aggregation and signal creation
85115
df_mean = df.groupby(['geo_id', 'timestamp'])[
86-
signal_names
87-
].mean()
116+
signal_names
117+
].mean()
88118
df_sd = df.groupby(['geo_id', 'timestamp'])[
89-
signal_names
90-
].std()
119+
signal_names
120+
].std()
91121
df_n = df.groupby(['geo_id', 'timestamp'])[
92-
signal_names
93-
].count()
122+
signal_names
123+
].count()
94124
agg_df = pd.DataFrame.join(df_mean, df_sd,
95-
lsuffix='_mean', rsuffix='_sd')
125+
lsuffix='_mean', rsuffix='_sd')
96126
agg_df = pd.DataFrame.join(agg_df, df_n.rename({
97-
signal: signal+'_n' for signal in signal_names
98-
}, axis=1))
127+
signal: signal + '_n' for signal in signal_names
128+
}, axis=1))
99129
for signal in signal_names:
100130
agg_df[f'{signal}_se'] = (agg_df[f'{signal}_sd']
101-
/np.sqrt(agg_df[f'{signal}_n']))
131+
/ np.sqrt(agg_df[f'{signal}_n']))
102132
return agg_df.reset_index()
103133

104-
def process(fname, signals, geo_resolutions, export_dir):
134+
135+
def process(fname, signal_names, geo_resolutions, export_dir):
105136
'''Process an input census block group-level CSV and export it. Assumes
106137
that the input file has _only_ one date of data.
107-
108138
Parameters
109139
----------
110140
fname: str
@@ -113,32 +143,27 @@ def process(fname, signals, geo_resolutions, export_dir):
113143
List of (signal_name, wip).
114144
geo_resolutions: List[str]
115145
List of geo resolutions to export the data.
116-
117146
Returns
118147
-------
119148
None
120149
'''
121-
signal_names, wip = (list(x) for x in zip(*signals))
122150
cbg_df = construct_signals(pd.read_csv(fname), signal_names)
123151
unique_date = cbg_df['timestamp'].unique()
124152
if len(unique_date) != 1:
125153
raise ValueError(f'More than one timestamp found in input file {fname}.')
126154
date = unique_date[0].replace('-', '')
127155
for geo_res in geo_resolutions:
128156
df = aggregate(cbg_df, signal_names, geo_res)
129-
for signal, wip in signals:
157+
for signal in signal_names:
130158
df_export = df[
131-
['geo_id']
132-
+ [f'{signal}_{x}' for x in ('mean', 'se', 'n')]
133-
].rename({
134-
f'{signal}_mean': 'val',
135-
f'{signal}_se': 'se',
136-
f'{signal}_n': 'sample_size',
137-
}, axis=1)
138-
if wip:
139-
signal = 'wip_' + signal
159+
['geo_id']
160+
+ [f'{signal}_{x}' for x in ('mean', 'se', 'n')]
161+
].rename({
162+
f'{signal}_mean': 'val',
163+
f'{signal}_se': 'se',
164+
f'{signal}_n': 'sample_size',
165+
}, axis=1)
140166
df_export.to_csv(f'{export_dir}/{date}_{geo_res}_{signal}.csv',
141-
na_rep='NA',
142-
index=False,)
167+
na_rep='NA',
168+
index=False, )
143169
return
144-

safegraph/delphi_safegraph/run.py

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
# -*- coding: utf-8 -*-
21
"""Functions to call when running the function.
3-
42
This module should contain a function called `run_module`, that is executed
53
when the module is run with `python -m MODULE_NAME`.
64
"""
@@ -14,15 +12,16 @@
1412
import pandas as pd
1513
from delphi_utils import read_params
1614

17-
from .process import process
15+
from .process import process, signal_name
1816

1917
SIGNALS = [
20-
# signal_name wip
21-
('median_home_dwell_time', False),
22-
('completely_home_prop', False),
23-
('full_time_work_prop', False),
24-
('part_time_work_prop', False),
18+
'median_home_dwell_time',
19+
'completely_home_prop',
20+
'full_time_work_prop',
21+
'part_time_work_prop'
22+
2523
]
24+
2625
GEO_RESOLUTIONS = [
2726
'county',
2827
'state',
@@ -39,31 +38,31 @@ def run_module():
3938
aws_secret_access_key = params["aws_secret_access_key"]
4039
aws_default_region = params["aws_default_region"]
4140
aws_endpoint = params["aws_endpoint"]
41+
wip_signal = params["wip_signal"]
4242

4343
process_file = partial(process,
44-
signals=SIGNALS,
45-
geo_resolutions=GEO_RESOLUTIONS,
46-
export_dir=export_dir,
47-
)
44+
signal_names=signal_name(SIGNALS, wip_signal, prefix='wip_'),
45+
geo_resolutions=GEO_RESOLUTIONS,
46+
export_dir=export_dir,
47+
)
4848

4949
# Update raw data
5050
# Why call subprocess rather than using a native Python client, e.g. boto3?
5151
# Because boto3 does not have a simple rsync-like call that can perform
5252
# the following behavior elegantly.
5353
subprocess.run(
54-
f'aws s3 sync s3://sg-c19-response/social-distancing/v2/ '
55-
f'{raw_data_dir}/social-distancing/ --endpoint {aws_endpoint}',
56-
env={
57-
'AWS_ACCESS_KEY_ID': aws_access_key_id,
58-
'AWS_SECRET_ACCESS_KEY': aws_secret_access_key,
59-
'AWS_DEFAULT_REGION': aws_default_region,
60-
},
61-
shell=True,
62-
)
54+
f'aws s3 sync s3://sg-c19-response/social-distancing/v2/ '
55+
f'{raw_data_dir}/social-distancing/ --endpoint {aws_endpoint}',
56+
env={
57+
'AWS_ACCESS_KEY_ID': aws_access_key_id,
58+
'AWS_SECRET_ACCESS_KEY': aws_secret_access_key,
59+
'AWS_DEFAULT_REGION': aws_default_region,
60+
},
61+
shell=True,
62+
)
6363

6464
files = glob.glob(f'{raw_data_dir}/social-distancing/**/*.csv.gz',
65-
recursive=True)
65+
recursive=True)
6666

6767
with mp.Pool(n_core) as pool:
6868
pool.map(process_file, files)
69-

safegraph/params.json.template

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@
77
"aws_access_key_id": "",
88
"aws_secret_access_key": "",
99
"aws_default_region": "",
10-
"aws_endpoint": ""
10+
"aws_endpoint": "",
11+
"wip_prefix" : []
1112
}

safegraph/tests/test_process.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,48 +6,54 @@
66
import numpy as np
77
import pandas as pd
88
from delphi_safegraph.process import (
9-
construct_signals,
10-
aggregate,
11-
)
9+
construct_signals,
10+
aggregate,
11+
signal_name
12+
)
1213
from delphi_safegraph.run import SIGNALS
14+
from delphi_utils import read_params
15+
signal_names = SIGNALS
1316

14-
signal_names, _ = (list(x) for x in zip(*SIGNALS))
1517

1618
class TestProcess:
1719
def test_construct_signals_present(self):
18-
1920
cbg_df = construct_signals(pd.read_csv('raw_data/sample_raw_data.csv'),
20-
signal_names)
21+
signal_names)
2122
assert 'completely_home_prop' in set(cbg_df.columns)
2223
assert 'full_time_work_prop' in set(cbg_df.columns)
2324
assert 'part_time_work_prop' in set(cbg_df.columns)
2425
assert 'median_home_dwell_time' in set(cbg_df.columns)
2526

2627
def test_construct_signals_proportions(self):
27-
2828
cbg_df = construct_signals(pd.read_csv('raw_data/sample_raw_data.csv'),
29-
signal_names)
29+
signal_names)
3030
assert np.all(cbg_df['completely_home_prop'].values <= 1)
3131
assert np.all(cbg_df['full_time_work_prop'].values <= 1)
3232
assert np.all(cbg_df['part_time_work_prop'].values <= 1)
3333

3434
def test_aggregate_county(self):
35-
3635
cbg_df = construct_signals(pd.read_csv('raw_data/sample_raw_data.csv'),
37-
signal_names)
36+
signal_names)
3837
df = aggregate(cbg_df, signal_names, 'county')
3938

4039
assert np.all(df[f'{signal_names[0]}_n'].values > 0)
4140
x = df[f'{signal_names[0]}_se'].values
4241
assert np.all(x[~np.isnan(x)] >= 0)
4342

4443
def test_aggregate_state(self):
45-
4644
cbg_df = construct_signals(pd.read_csv('raw_data/sample_raw_data.csv'),
47-
signal_names)
45+
signal_names)
4846
df = aggregate(cbg_df, signal_names, 'state')
4947

5048
assert np.all(df[f'{signal_names[0]}_n'].values > 0)
5149
x = df[f'{signal_names[0]}_se'].values
5250
assert np.all(x[~np.isnan(x)] >= 0)
5351

52+
def test_signal_name(self):
53+
assert read_params()["wip_signal"] is not None, "supply value in params"
54+
assert type(read_params()["wip_signal"]) == list or type(read_params()["wip_signal"]) == bool, "Supply True|False|list()"
55+
signals = signal_name(signal_names, wip_signal=read_params()['wip_signal'],prefix='wip_')
56+
assert (len(signals) >= len(signal_names))
57+
58+
59+

0 commit comments

Comments
 (0)