Skip to content

Commit 511aca6

Browse files
authored
Merge pull request #308 from cmu-delphi/run-quidel-geo-rf
Switch to geo utils for combined Quidel pipeline
2 parents 05a8349 + 5986293 commit 511aca6

File tree

8 files changed

+97
-132
lines changed

8 files changed

+97
-132
lines changed

quidel/.pylintrc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
[DESIGN]
2+
3+
min-public-methods=1
4+
5+
6+
[MESSAGES CONTROL]
7+
8+
disable=R0801, C0330, E1101, E0611, C0114, C0116, C0103, R0913, R0914, W0702, W0212, E1136

quidel/delphi_quidel/export.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,8 @@ def export_csv(df, geo_name, sensor, receiving_dir, start_date, end_date):
2323
end_date: datetime.datetime
2424
The last date to report
2525
"""
26-
27-
df = df.copy()
2826
df = df[np.logical_and(df["timestamp"] >= start_date,
29-
df["timestamp"] <= end_date)]
27+
df["timestamp"] <= end_date)].copy()
3028

3129
for date in df["timestamp"].unique():
3230
t = pd.to_datetime(str(date))

quidel/delphi_quidel/geo_maps.py

Lines changed: 44 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,85 +1,49 @@
11
"""Contains geographic mapping tools."""
2-
def geo_map(geo_res, data, map_df):
3-
if geo_res == "county":
4-
return zip_to_county(data, map_df)
5-
if geo_res == "msa":
6-
return zip_to_msa(data, map_df)
7-
if geo_res == "hrr":
8-
return zip_to_hrr(data, map_df)
9-
return zip_to_state(data, map_df)
10-
11-
def zip_to_msa(data, map_df):
12-
"""Map from zipcode to MSA (along with parent state).
13-
Args:
14-
data: dataframe at the day-zip resolution.
15-
Returns:
16-
tuple, a dataframe at day-msa, with parent state column, and their string keys
2+
from delphi_utils import GeoMapper
3+
4+
date_col = "timestamp"
5+
data_cols = ['totalTest', 'numUniqueDevices', 'positiveTest', "population"]
6+
gmpr = GeoMapper() # Use geo utils
7+
GEO_KEY_DICT = {
8+
"county": "fips",
9+
"msa": "msa",
10+
"hrr": "hrr",
11+
"state": "state_id"
12+
}
13+
def geo_map(geo_res, df):
14+
data = df.copy()
15+
geo_key = GEO_KEY_DICT[geo_res]
16+
# Add population for each zipcode
17+
data = gmpr.add_population_column(data, "zip")
18+
# zip -> geo_res
19+
data = gmpr.replace_geocode(data, "zip", geo_key,
20+
date_col=date_col, data_cols=data_cols)
21+
if geo_res == "state":
22+
return data
23+
# Add parent state
24+
data = add_parent_state(data, geo_res, geo_key)
25+
return data, geo_key
26+
27+
def add_parent_state(data, geo_res, geo_key):
1728
"""
18-
# zip -> msa
19-
zip_map = map_df[["zip", "cbsa_id"]].dropna().drop_duplicates()
20-
# forget about the rest of the zips that aren't in MSA
21-
data = data.merge(zip_map, how="left", on="zip").dropna().drop(columns=["zip"], axis=1)
22-
23-
# msa + parent state
24-
# msa_map has mapping from msa to state, going by the state with the largest
25-
# population (since a msa may span multiple states)
26-
msa_map = map_df[["cbsa_id", "state_id", "population"]]
27-
msa_map = msa_map.groupby(["cbsa_id"]).max().reset_index()
28-
data = data.merge(msa_map, how="left", on="cbsa_id").drop(
29-
columns=["population"]).dropna()
30-
data = data.groupby(["timestamp", "cbsa_id", "state_id"]).sum().reset_index()
31-
data["cbsa_id"] = data["cbsa_id"].apply(lambda x: str(int(x)).zfill(5))
32-
33-
return data, "cbsa_id"
34-
35-
def zip_to_hrr(data, map_df):
36-
"""Map from zipcode to HRR (along with parent state).
37-
Args:
38-
data: dataframe at the day-zip resolution.
39-
Returns:
40-
tuple, a dataframe at day-msa, with parent state column, and their string keys
29+
- map from msa/hrr to state, going by the state with the largest
30+
population (since a msa/hrr may span multiple states)
31+
- map from county to the corresponding state
4132
"""
42-
# zip -> msa
43-
zip_map = map_df[["zip", "hrrnum"]].dropna().drop_duplicates()
44-
# forget about the rest of the zips that aren't in MSA
45-
data = data.merge(zip_map, how="left", on="zip").dropna().drop(columns=["zip"], axis=1)
46-
47-
# msa + parent state
48-
# msa_map has mapping from msa to state, going by the state with the largest
49-
# population (since a msa may span multiple states)
50-
msa_map = map_df[["hrrnum", "state_id", "population"]]
51-
msa_map = msa_map.groupby(["hrrnum"]).max().reset_index()
52-
data = data.merge(msa_map, how="left", on="hrrnum").drop(
33+
fips_to_state = gmpr._load_crosswalk(from_code="fips", to_code="state")
34+
if geo_res == "county":
35+
mix_map = fips_to_state[["fips", "state_id"]]
36+
else:
37+
fips_to_geo_res = gmpr._load_crosswalk(from_code="fips", to_code=geo_res)
38+
mix_map = fips_to_geo_res[["fips", geo_res]].merge(
39+
fips_to_state[["fips", "state_id"]],
40+
on="fips",
41+
how="inner")
42+
mix_map = gmpr.add_population_column(mix_map, "fips").groupby(
43+
geo_res).max().reset_index().drop(
44+
["fips", "population"], axis = 1)
45+
# Merge the info of parent state to the data
46+
data = data.merge(mix_map, how="left", on=geo_key).drop(
5347
columns=["population"]).dropna()
54-
data = data.groupby(["timestamp", "hrrnum", "state_id"]).sum().reset_index()
55-
data["hrrnum"] = data["hrrnum"].astype(int)
56-
57-
return data, "hrrnum"
58-
59-
def zip_to_county(data, map_df):
60-
"""Aggregate zip codes to the county resolution, along with its parent state.
61-
Args:
62-
data: dataframe aggregated to the day-zip resolution
63-
Returns:
64-
dataframe at the day-county resolution and parent state, with their string keys
65-
"""
66-
# zip -> county + parent state (county has unique state)
67-
zip_map = map_df[["fips", "zip", "state_id"]].dropna().drop_duplicates()
68-
data = data.merge(zip_map, how="left", on="zip").drop(columns=["zip"]).dropna()
69-
data = data.groupby(["timestamp", "fips", "state_id"]).sum().reset_index()
70-
data["fips"] = data["fips"].apply(lambda x: str(int(x)).zfill(5))
71-
72-
return data, "fips"
73-
74-
def zip_to_state(data, map_df):
75-
"""Aggregate zip codes to the state resolution.
76-
Args:
77-
data: dataframe aggregated to the day-zip resolution
78-
Returns:
79-
dataframe at the day-state resolution, with the state key
80-
"""
81-
zip_map = map_df[["zip", "state_id"]].dropna().drop_duplicates()
82-
data = data.merge(zip_map, how="left", on="zip").drop(
83-
columns=["zip"]).dropna()
84-
data = data.groupby(["timestamp", "state_id"]).sum().reset_index()
48+
data = data.groupby(["timestamp", geo_key, "state_id"]).sum().reset_index()
8549
return data

quidel/delphi_quidel/pull.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,9 @@ def compare_dates(date1, date2, flag):
3333
if flag == "l":
3434
return date1
3535
return date2
36-
else:
37-
if flag == "l":
38-
return date2
39-
return date1
36+
if flag == "l":
37+
return date2
38+
return date1
4039

4140
def check_whether_date_in_range(search_date, start_date, end_date):
4241
"""
@@ -53,8 +52,7 @@ def read_historical_data():
5352
Read historical flu antigen test data stored in
5453
midas /common/quidel-historical-raw
5554
"""
56-
# pull_dir = "/common/quidel-historical-raw"
57-
pull_dir = "../../tempt_files/quidel_raw/quidel-historical-raw"
55+
pull_dir = "/common/quidel-historical-raw"
5856
columns = ['SofiaSerNum', 'TestDate', 'Facility', 'ZipCode',
5957
'FluA', 'FluB', 'StorageDate']
6058
df = pd.DataFrame(columns=columns)

quidel/delphi_quidel/run.py

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,8 @@
22
"""Functions to call when running the function.
33
44
This module should contain a function called `run_module`, that is executed
5-
when the module is run with `python -m MODULE_NAME`.
5+
when the module is run with `python -m delphi_quidel`.
66
"""
7-
from os.path import join
8-
9-
import pandas as pd
107
from delphi_utils import read_params
118

129
from .geo_maps import geo_map
@@ -25,12 +22,8 @@ def run_module():
2522
params = read_params()
2623
cache_dir = params["cache_dir"]
2724
export_dir = params["export_dir"]
28-
static_file_dir = params["static_file_dir"]
2925
export_start_dates = params["export_start_date"]
3026
export_end_dates = params["export_end_date"]
31-
map_df = pd.read_csv(
32-
join(static_file_dir, "fips_prop_pop.csv"), dtype={"fips": int}
33-
)
3427

3528
# Pull data and update export date
3629
dfs, _end_date = pull_quidel_data(params)
@@ -48,36 +41,40 @@ def run_module():
4841
wip_signal=params["wip_signal"],
4942
prefix="wip_")
5043

51-
for sensor in sensors:
52-
# Check either covid_ag or flu_ag
53-
test_type = "covid_ag" if "covid_ag" in sensor else "flu_ag"
54-
print("state", sensor)
44+
for test_type in ["covid_ag", "flu_ag"]:
45+
print("For %s:"%test_type)
5546
data = dfs[test_type].copy()
56-
state_groups = geo_map("state", data, map_df).groupby("state_id")
57-
first_date, last_date = data["timestamp"].min(), data["timestamp"].max()
5847

5948
# For State Level
60-
state_df = generate_sensor_for_states(
61-
state_groups, smooth=SENSORS[sensor][1],
62-
device=SENSORS[sensor][0], first_date=first_date,
63-
last_date=last_date)
64-
export_csv(state_df, "state", sensor, receiving_dir=export_dir,
65-
start_date=export_start_dates[test_type],
66-
end_date=export_end_dates[test_type])
67-
68-
# County/HRR/MSA level
69-
for geo_res in GEO_RESOLUTIONS:
70-
print(geo_res, sensor)
71-
data = dfs[test_type].copy()
72-
data, res_key = geo_map(geo_res, data, map_df)
73-
res_df = generate_sensor_for_other_geores(
74-
state_groups, data, res_key, smooth=SENSORS[sensor][1],
49+
state_groups = geo_map("state", data).groupby("state_id")
50+
first_date, last_date = data["timestamp"].min(), data["timestamp"].max()
51+
for sensor in sensors:
52+
if test_type not in sensor:
53+
continue
54+
print("state", sensor)
55+
state_df = generate_sensor_for_states(
56+
state_groups, smooth=SENSORS[sensor][1],
7557
device=SENSORS[sensor][0], first_date=first_date,
7658
last_date=last_date)
77-
export_csv(res_df, geo_res, sensor, receiving_dir=export_dir,
59+
export_csv(state_df, "state", sensor, receiving_dir=export_dir,
7860
start_date=export_start_dates[test_type],
7961
end_date=export_end_dates[test_type])
8062

63+
# County/HRR/MSA level
64+
for geo_res in GEO_RESOLUTIONS:
65+
geo_data, res_key = geo_map(geo_res, data)
66+
for sensor in sensors:
67+
if test_type not in sensor:
68+
continue
69+
print(geo_res, sensor)
70+
res_df = generate_sensor_for_other_geores(
71+
state_groups, geo_data, res_key, smooth=SENSORS[sensor][1],
72+
device=SENSORS[sensor][0], first_date=first_date,
73+
last_date=last_date)
74+
export_csv(res_df, geo_res, sensor, receiving_dir=export_dir,
75+
start_date=export_start_dates[test_type],
76+
end_date=export_end_dates[test_type])
77+
8178
# Export the cache file if the pipeline runs successfully.
8279
# Otherwise, don't update the cache file
8380
update_cache_file(dfs, _end_date, cache_dir)
2.57 KB
Binary file not shown.

quidel/tests/test_geo_maps.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,6 @@
99
from delphi_quidel.geo_maps import geo_map
1010

1111

12-
map_df = pd.read_csv(
13-
join("../static", "fips_prop_pop.csv"), dtype={"fips": int}
14-
)
15-
1612
class TestGeoMap:
1713
def test_county(self):
1814

@@ -23,10 +19,11 @@ def test_county(self):
2319
"2020-06-15", "2020-06-15", "2020-06-15"],
2420
"totalTest": [100, 50, 200, 200, 250, 500],
2521
"positiveTest": [10, 8, 15, 5, 20, 50],
22+
"numUniqueDevices": [2, 1, 1, 1, 1, 1]
2623
}
2724
)
2825

29-
new_df, res_key = geo_map("county", df, map_df)
26+
new_df, res_key = geo_map("county", df)
3027

3128
assert res_key == 'fips'
3229
assert set(new_df["fips"].values) == set(['25027', '53011', '48439'])
@@ -43,10 +40,11 @@ def test_state(self):
4340
"2020-06-15", "2020-06-15", "2020-06-15"],
4441
"totalTest": [100, 50, 200, 200, 250, 500],
4542
"positiveTest": [10, 8, 15, 5, 20, 50],
43+
"numUniqueDevices": [2, 1, 1, 1, 1, 1]
4644
}
4745
)
4846

49-
new_df = geo_map("state", df, map_df)
47+
new_df = geo_map("state", df)
5048

5149
assert set(new_df["state_id"].values) == set(['ma', 'tx', 'wa'])
5250
assert set(new_df["timestamp"].values) == set(df["timestamp"].values)
@@ -62,12 +60,13 @@ def test_hrr(self):
6260
"2020-06-15", "2020-06-15", "2020-06-15"],
6361
"totalTest": [100, 50, 200, 200, 250, 500],
6462
"positiveTest": [10, 8, 15, 5, 20, 50],
63+
"numUniqueDevices": [2, 1, 1, 1, 1, 1]
6564
}
6665
)
6766

68-
new_df, res_key = geo_map("hrr", df, map_df)
67+
new_df, res_key = geo_map("hrr", df)
6968

70-
assert set(new_df["hrrnum"].values) == set([16, 231, 340, 344, 394])
69+
assert set(new_df["hrr"].values) == set(["16", "231", "340", "344", "394"])
7170
assert set(new_df["timestamp"].values) == set(df["timestamp"].values)
7271
assert set(new_df["totalTest"].values) == set([500, 100, 250, 50, 400])
7372
assert set(new_df["positiveTest"].values) == set([50, 10, 20, 8, 20])
@@ -76,18 +75,19 @@ def test_msa(self):
7675

7776
df = pd.DataFrame(
7877
{
79-
"zip": [1607, 73716, 73719, 76010, 74435, 74936],
78+
"zip": [1607, 73716, 73719, 76010, 74945, 74936],
8079
"timestamp": ["2020-06-15", "2020-06-15", "2020-06-15",
8180
"2020-06-15", "2020-06-15", "2020-06-15"],
8281
"totalTest": [100, 50, 200, 200, 250, 500],
8382
"positiveTest": [10, 8, 15, 5, 20, 50],
83+
"numUniqueDevices": [2, 1, 1, 1, 1, 1]
8484
}
8585
)
8686

87-
new_df, res_key = geo_map("msa", df, map_df)
87+
new_df, res_key = geo_map("msa", df)
8888

89-
assert res_key == 'cbsa_id'
90-
assert set(new_df["cbsa_id"].values) == set(['19100', '22900', '49340'])
89+
assert res_key == 'msa'
90+
assert set(new_df["msa"].values) == set(['19100', '22900', '49340'])
9191
assert set(new_df["timestamp"].values) == set(df["timestamp"].values)
9292
assert set(new_df["totalTest"].values) == set([200, 750, 100])
9393
assert set(new_df["positiveTest"].values) == set([5, 70, 10])

quidel/tests/test_run.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from os import listdir, remove
1+
from os import listdir
22
from os.path import join
33

44
import pandas as pd

0 commit comments

Comments
 (0)