Skip to content

Commit 31d882b

Browse files
authored
Merge pull request #168 from cmu-delphi/rename_signals
safegraph: standardizing signal names
2 parents eed8564 + b972968 commit 31d882b

File tree

7 files changed

+179
-96
lines changed

7 files changed

+179
-96
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
2+
3+
HOME_DWELL = 'median_home_dwell_time'
4+
COMPLETELY_HOME = 'completely_home_prop'
5+
FULL_TIME_WORK = 'full_time_work_prop'
6+
PART_TIME_WORK = 'part_time_work_prop'
7+
8+
SIGNALS = [
9+
HOME_DWELL,
10+
COMPLETELY_HOME,
11+
FULL_TIME_WORK,
12+
PART_TIME_WORK
13+
]
14+
15+
GEO_RESOLUTIONS = [
16+
'county',
17+
'state',
18+
]

safegraph/delphi_safegraph/process.py

Lines changed: 104 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,60 +1,120 @@
1-
# -*- coding: utf-8 -*-
1+
import covidcast
22
import numpy as np
33
import pandas as pd
44

5+
from .constants import HOME_DWELL, COMPLETELY_HOME, FULL_TIME_WORK, PART_TIME_WORK
56
from .geo import FIPS_TO_STATE
67

78
# Magic number for modular arithmetic; CBG -> FIPS
89
MOD = 10000000
910

10-
def construct_signals(cbg_df, signal_names):
11-
'''Construct Census-block level signals.
11+
def add_prefix(signal_names, wip_signal, prefix: str):
12+
"""Adds prefix to signal if there is a WIP signal
13+
Parameters
14+
----------
15+
signal_names: List[str]
16+
Names of signals to be exported
17+
prefix : 'wip_'
18+
prefix for new/non public signals
19+
wip_signal : List[str] or bool
20+
a list of wip signals: [], OR
21+
all signals in the registry: True OR
22+
only signals that have never been published: False
23+
Returns
24+
-------
25+
List of signal names
26+
wip/non wip signals for further computation
27+
"""
28+
29+
if wip_signal is True:
30+
return [prefix + signal for signal in signal_names]
31+
if isinstance(wip_signal, list):
32+
make_wip = set(wip_signal)
33+
return [
34+
(prefix if signal in make_wip else "") + signal
35+
for signal in signal_names
36+
]
37+
if wip_signal in {False, ""}:
38+
return [
39+
signal if public_signal(signal)
40+
else prefix + signal
41+
for signal in signal_names
42+
]
43+
raise ValueError("Supply True | False or '' or [] | list()")
44+
45+
# Check if the signal name is public
46+
def public_signal(signal_):
47+
"""Checks if the signal name is already public using COVIDcast
48+
Parameters
49+
----------
50+
signal_ : str
51+
Name of the signal
52+
Returns
53+
-------
54+
bool
55+
True if the signal is present
56+
False if the signal is not present
57+
"""
58+
epidata_df = covidcast.metadata()
59+
for index in range(len(epidata_df)):
60+
if epidata_df['signal'][index] == signal_:
61+
return True
62+
return False
1263

64+
65+
def construct_signals(cbg_df, signal_names):
66+
"""Construct Census-block level signals.
1367
In its current form, we prepare the following signals in addition to those
1468
already available in raw form from Safegraph:
15-
1669
- completely_home_prop, defined as:
1770
completely_home_device_count / device_count
1871
- full_time_work_prop, defined as:
1972
full_time_work_behavior_devices / device_count
2073
- part_time_work_prop, defined as:
2174
part_time_work_behavior_devices / device_count
22-
2375
Documentation for the social distancing metrics:
2476
https://docs.safegraph.com/docs/social-distancing-metrics
25-
2677
Parameters
2778
----------
2879
cbg_df: pd.DataFrame
2980
Census block group-level dataframe with raw social distancing
3081
indicators from Safegraph.
3182
signal_names: List[str]
3283
Names of signals to be exported.
33-
3484
Returns
3585
-------
3686
pd.DataFrame
3787
Dataframe with columns: timestamp, county_fips, and
3888
{each signal described above}.
39-
'''
89+
"""
90+
4091
# Preparation
4192
cbg_df['timestamp'] = cbg_df['date_range_start'].apply(
42-
lambda x: str(x).split('T')[0])
93+
lambda x: str(x).split('T')[0])
4394
cbg_df['county_fips'] = (cbg_df['origin_census_block_group'] // MOD).apply(
44-
lambda x: f'{int(x):05d}')
95+
lambda x: f'{int(x):05d}')
96+
4597
# Transformation: create signal not available in raw data
46-
cbg_df['completely_home_prop'] = (cbg_df['completely_home_device_count']
47-
/ cbg_df['device_count'])
48-
cbg_df['full_time_work_prop'] = (cbg_df['full_time_work_behavior_devices']
49-
/ cbg_df['device_count'])
50-
cbg_df['part_time_work_prop'] = (cbg_df['part_time_work_behavior_devices']
51-
/ cbg_df['device_count'])
98+
for signal in signal_names:
99+
if signal.endswith(FULL_TIME_WORK):
100+
cbg_df[signal] = (cbg_df['full_time_work_behavior_devices']
101+
/ cbg_df['device_count'])
102+
elif signal.endswith(COMPLETELY_HOME):
103+
cbg_df[signal] = (cbg_df['completely_home_device_count']
104+
/ cbg_df['device_count'])
105+
elif signal.endswith(PART_TIME_WORK):
106+
cbg_df[signal] = (cbg_df['part_time_work_behavior_devices']
107+
/ cbg_df['device_count'])
108+
elif signal.endswith(HOME_DWELL):
109+
cbg_df[signal] = (cbg_df['median_home_dwell_time'])
110+
111+
52112
# Subsetting
53113
return cbg_df[['timestamp', 'county_fips'] + signal_names]
54114

115+
55116
def aggregate(df, signal_names, geo_resolution='county'):
56117
'''Aggregate signals to appropriate resolution and produce standard errors.
57-
58118
Parameters
59119
----------
60120
df: pd.DataFrame
@@ -64,7 +124,6 @@ def aggregate(df, signal_names, geo_resolution='county'):
64124
Names of signals to be exported.
65125
geo_resolution: str
66126
One of ('county', 'state')
67-
68127
Returns
69128
-------
70129
pd.DataFrame:
@@ -77,68 +136,64 @@ def aggregate(df, signal_names, geo_resolution='county'):
77136
df['geo_id'] = df['county_fips']
78137
elif geo_resolution == 'state':
79138
df['geo_id'] = df['county_fips'].apply(lambda x:
80-
FIPS_TO_STATE[x[:2]])
139+
FIPS_TO_STATE[x[:2]])
81140
else:
82141
raise ValueError(f'`geo_resolution` must be one of {GEO_RESOLUTION}.')
83142

84143
# Aggregation and signal creation
85144
df_mean = df.groupby(['geo_id', 'timestamp'])[
86-
signal_names
87-
].mean()
145+
signal_names
146+
].mean()
88147
df_sd = df.groupby(['geo_id', 'timestamp'])[
89-
signal_names
90-
].std()
148+
signal_names
149+
].std()
91150
df_n = df.groupby(['geo_id', 'timestamp'])[
92-
signal_names
93-
].count()
151+
signal_names
152+
].count()
94153
agg_df = pd.DataFrame.join(df_mean, df_sd,
95-
lsuffix='_mean', rsuffix='_sd')
154+
lsuffix='_mean', rsuffix='_sd')
96155
agg_df = pd.DataFrame.join(agg_df, df_n.rename({
97-
signal: signal+'_n' for signal in signal_names
98-
}, axis=1))
156+
signal: signal + '_n' for signal in signal_names
157+
}, axis=1))
99158
for signal in signal_names:
100159
agg_df[f'{signal}_se'] = (agg_df[f'{signal}_sd']
101-
/np.sqrt(agg_df[f'{signal}_n']))
160+
/ np.sqrt(agg_df[f'{signal}_n']))
102161
return agg_df.reset_index()
103162

104-
def process(fname, signals, geo_resolutions, export_dir):
163+
164+
def process(fname, signal_names, geo_resolutions, export_dir):
105165
'''Process an input census block group-level CSV and export it. Assumes
106166
that the input file has _only_ one date of data.
107-
108167
Parameters
109168
----------
169+
export_dir
170+
path where the output files are saved
171+
signal_names : List[str]
172+
signal names to be processed
110173
fname: str
111174
Input filename.
112-
signals: List[Tuple[str, bool]]
113-
List of (signal_name, wip).
114175
geo_resolutions: List[str]
115176
List of geo resolutions to export the data.
116-
117177
Returns
118178
-------
119179
None
120180
'''
121-
signal_names, wip = (list(x) for x in zip(*signals))
122181
cbg_df = construct_signals(pd.read_csv(fname), signal_names)
123182
unique_date = cbg_df['timestamp'].unique()
124183
if len(unique_date) != 1:
125184
raise ValueError(f'More than one timestamp found in input file {fname}.')
126185
date = unique_date[0].replace('-', '')
127186
for geo_res in geo_resolutions:
128187
df = aggregate(cbg_df, signal_names, geo_res)
129-
for signal, wip in signals:
188+
for signal in signal_names:
130189
df_export = df[
131-
['geo_id']
132-
+ [f'{signal}_{x}' for x in ('mean', 'se', 'n')]
133-
].rename({
134-
f'{signal}_mean': 'val',
135-
f'{signal}_se': 'se',
136-
f'{signal}_n': 'sample_size',
137-
}, axis=1)
138-
if wip:
139-
signal = 'wip_' + signal
190+
['geo_id']
191+
+ [f'{signal}_{x}' for x in ('mean', 'se', 'n')]
192+
].rename({
193+
f'{signal}_mean': 'val',
194+
f'{signal}_se': 'se',
195+
f'{signal}_n': 'sample_size',
196+
}, axis=1)
140197
df_export.to_csv(f'{export_dir}/{date}_{geo_res}_{signal}.csv',
141-
na_rep='NA',
142-
index=False,)
143-
return
144-
198+
na_rep='NA',
199+
index=False, )

safegraph/delphi_safegraph/run.py

Lines changed: 17 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,16 @@
1-
# -*- coding: utf-8 -*-
21
"""Functions to call when running the function.
3-
42
This module should contain a function called `run_module`, that is executed
53
when the module is run with `python -m MODULE_NAME`.
64
"""
75
import glob
86
import multiprocessing as mp
97
import subprocess
10-
from datetime import datetime
118
from functools import partial
129

13-
import numpy as np
14-
import pandas as pd
1510
from delphi_utils import read_params
1611

17-
from .process import process
18-
19-
SIGNALS = [
20-
# signal_name wip
21-
('median_home_dwell_time', False),
22-
('completely_home_prop', False),
23-
('full_time_work_prop', False),
24-
('part_time_work_prop', False),
25-
]
26-
GEO_RESOLUTIONS = [
27-
'county',
28-
'state',
29-
]
30-
12+
from .constants import SIGNALS, GEO_RESOLUTIONS
13+
from .process import process, add_prefix
3114

3215
def run_module():
3316

@@ -39,31 +22,31 @@ def run_module():
3922
aws_secret_access_key = params["aws_secret_access_key"]
4023
aws_default_region = params["aws_default_region"]
4124
aws_endpoint = params["aws_endpoint"]
25+
wip_signal = params["wip_signal"]
4226

4327
process_file = partial(process,
44-
signals=SIGNALS,
45-
geo_resolutions=GEO_RESOLUTIONS,
46-
export_dir=export_dir,
47-
)
28+
signal_names=add_prefix(SIGNALS, wip_signal, prefix='wip_'),
29+
geo_resolutions=GEO_RESOLUTIONS,
30+
export_dir=export_dir,
31+
)
4832

4933
# Update raw data
5034
# Why call subprocess rather than using a native Python client, e.g. boto3?
5135
# Because boto3 does not have a simple rsync-like call that can perform
5236
# the following behavior elegantly.
5337
subprocess.run(
54-
f'aws s3 sync s3://sg-c19-response/social-distancing/v2/ '
55-
f'{raw_data_dir}/social-distancing/ --endpoint {aws_endpoint}',
56-
env={
57-
'AWS_ACCESS_KEY_ID': aws_access_key_id,
58-
'AWS_SECRET_ACCESS_KEY': aws_secret_access_key,
59-
'AWS_DEFAULT_REGION': aws_default_region,
60-
},
61-
shell=True,
62-
)
38+
f'aws s3 sync s3://sg-c19-response/social-distancing/v2/ '
39+
f'{raw_data_dir}/social-distancing/ --endpoint {aws_endpoint}',
40+
env={
41+
'AWS_ACCESS_KEY_ID': aws_access_key_id,
42+
'AWS_SECRET_ACCESS_KEY': aws_secret_access_key,
43+
'AWS_DEFAULT_REGION': aws_default_region,
44+
},
45+
shell=True,
46+
)
6347

6448
files = glob.glob(f'{raw_data_dir}/social-distancing/**/*.csv.gz',
65-
recursive=True)
49+
recursive=True)
6650

6751
with mp.Pool(n_core) as pool:
6852
pool.map(process_file, files)
69-

safegraph/params.json.template

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@
77
"aws_access_key_id": "",
88
"aws_secret_access_key": "",
99
"aws_default_region": "",
10-
"aws_endpoint": ""
10+
"aws_endpoint": "",
11+
"wip_signal" : ""
1112
}

safegraph/setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from setuptools import find_packages
33

44
required = [
5+
"covidcast",
56
"numpy",
67
"pandas",
78
"pytest",

safegraph/tests/params.json.template

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"static_file_dir": "./static",
3+
"raw_data_dir": "/mnt/data/safegraph/",
4+
"export_dir": "./receiving",
5+
"cache_dir": "./cache",
6+
"n_core": "12",
7+
"aws_access_key_id": "",
8+
"aws_secret_access_key": "",
9+
"aws_default_region": "",
10+
"aws_endpoint": "",
11+
"wip_signal" : ""
12+
}

0 commit comments

Comments
 (0)