Skip to content

Switch to geo utils for combined Quidel pipeline #308

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions quidel/delphi_quidel/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ def export_csv(df, geo_name, sensor, receiving_dir, start_date, end_date):
end_date: datetime.datetime
The last date to report
"""

df = df.copy()
df = df[np.logical_and(df["timestamp"] >= start_date,
df["timestamp"] <= end_date)]
df["timestamp"] <= end_date)].copy()

for date in df["timestamp"].unique():
t = pd.to_datetime(str(date))
Expand Down
124 changes: 44 additions & 80 deletions quidel/delphi_quidel/geo_maps.py
Original file line number Diff line number Diff line change
@@ -1,85 +1,49 @@
"""Contains geographic mapping tools."""
def geo_map(geo_res, data, map_df):
if geo_res == "county":
return zip_to_county(data, map_df)
if geo_res == "msa":
return zip_to_msa(data, map_df)
if geo_res == "hrr":
return zip_to_hrr(data, map_df)
return zip_to_state(data, map_df)

def zip_to_msa(data, map_df):
"""Map from zipcode to MSA (along with parent state).
Args:
data: dataframe at the day-zip resolution.
Returns:
tuple, a dataframe at day-msa, with parent state column, and their string keys
from delphi_utils import GeoMapper

date_col = "timestamp"
data_cols = ['totalTest', 'numUniqueDevices', 'positiveTest', "population"]
gmpr = GeoMapper() # Use geo utils
GEO_KEY_DICT = {
"county": "fips",
"msa": "msa",
"hrr": "hrr",
"state": "state_id"
}
def geo_map(geo_res, df):
data = df.copy()
geo_key = GEO_KEY_DICT[geo_res]
# Add population for each zipcode
data = gmpr.add_population_column(data, "zip")
# zip -> geo_res
data = gmpr.replace_geocode(data, "zip", geo_key,
date_col=date_col, data_cols=data_cols)
if geo_res == "state":
return data
# Add parent state
data = add_parent_state(data, geo_res, geo_key)
return data, geo_key

def add_parent_state(data, geo_res, geo_key):
"""
# zip -> msa
zip_map = map_df[["zip", "cbsa_id"]].dropna().drop_duplicates()
# forget about the rest of the zips that aren't in MSA
data = data.merge(zip_map, how="left", on="zip").dropna().drop(columns=["zip"], axis=1)

# msa + parent state
# msa_map has mapping from msa to state, going by the state with the largest
# population (since a msa may span multiple states)
msa_map = map_df[["cbsa_id", "state_id", "population"]]
msa_map = msa_map.groupby(["cbsa_id"]).max().reset_index()
data = data.merge(msa_map, how="left", on="cbsa_id").drop(
columns=["population"]).dropna()
data = data.groupby(["timestamp", "cbsa_id", "state_id"]).sum().reset_index()
data["cbsa_id"] = data["cbsa_id"].apply(lambda x: str(int(x)).zfill(5))

return data, "cbsa_id"

def zip_to_hrr(data, map_df):
"""Map from zipcode to HRR (along with parent state).
Args:
data: dataframe at the day-zip resolution.
Returns:
tuple, a dataframe at day-msa, with parent state column, and their string keys
- map from msa/hrr to state, going by the state with the largest
population (since a msa/hrr may span multiple states)
- map from county to the corresponding state
"""
# zip -> msa
zip_map = map_df[["zip", "hrrnum"]].dropna().drop_duplicates()
# forget about the rest of the zips that aren't in MSA
data = data.merge(zip_map, how="left", on="zip").dropna().drop(columns=["zip"], axis=1)

# msa + parent state
# msa_map has mapping from msa to state, going by the state with the largest
# population (since a msa may span multiple states)
msa_map = map_df[["hrrnum", "state_id", "population"]]
msa_map = msa_map.groupby(["hrrnum"]).max().reset_index()
data = data.merge(msa_map, how="left", on="hrrnum").drop(
fips_to_state = gmpr._load_crosswalk(from_code="fips", to_code="state")
if geo_res == "county":
mix_map = fips_to_state[["fips", "state_id"]]
else:
fips_to_geo_res = gmpr._load_crosswalk(from_code="fips", to_code=geo_res)
mix_map = fips_to_geo_res[["fips", geo_res]].merge(
fips_to_state[["fips", "state_id"]],
on="fips",
how="inner")
mix_map = gmpr.add_population_column(mix_map, "fips").groupby(
geo_res).max().reset_index().drop(
["fips", "population"], axis = 1)
# Merge the info of parent state to the data
data = data.merge(mix_map, how="left", on=geo_key).drop(
columns=["population"]).dropna()
data = data.groupby(["timestamp", "hrrnum", "state_id"]).sum().reset_index()
data["hrrnum"] = data["hrrnum"].astype(int)

return data, "hrrnum"

def zip_to_county(data, map_df):
"""Aggregate zip codes to the county resolution, along with its parent state.
Args:
data: dataframe aggregated to the day-zip resolution
Returns:
dataframe at the day-county resolution and parent state, with their string keys
"""
# zip -> county + parent state (county has unique state)
zip_map = map_df[["fips", "zip", "state_id"]].dropna().drop_duplicates()
data = data.merge(zip_map, how="left", on="zip").drop(columns=["zip"]).dropna()
data = data.groupby(["timestamp", "fips", "state_id"]).sum().reset_index()
data["fips"] = data["fips"].apply(lambda x: str(int(x)).zfill(5))

return data, "fips"

def zip_to_state(data, map_df):
"""Aggregate zip codes to the state resolution.
Args:
data: dataframe aggregated to the day-zip resolution
Returns:
dataframe at the day-state resolution, with the state key
"""
zip_map = map_df[["zip", "state_id"]].dropna().drop_duplicates()
data = data.merge(zip_map, how="left", on="zip").drop(
columns=["zip"]).dropna()
data = data.groupby(["timestamp", "state_id"]).sum().reset_index()
data = data.groupby(["timestamp", geo_key, "state_id"]).sum().reset_index()
return data
3 changes: 1 addition & 2 deletions quidel/delphi_quidel/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ def read_historical_data():
Read historical flu antigen test data stored in
midas /common/quidel-historical-raw
"""
# pull_dir = "/common/quidel-historical-raw"
pull_dir = "../../tempt_files/quidel_raw/quidel-historical-raw"
pull_dir = "/common/quidel-historical-raw"
columns = ['SofiaSerNum', 'TestDate', 'Facility', 'ZipCode',
'FluA', 'FluB', 'StorageDate']
df = pd.DataFrame(columns=columns)
Expand Down
55 changes: 26 additions & 29 deletions quidel/delphi_quidel/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@
This module should contain a function called `run_module`, that is executed
when the module is run with `python -m MODULE_NAME`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fill in MODULE_NAME

"""
from os.path import join

import pandas as pd
from delphi_utils import read_params

from .geo_maps import geo_map
Expand All @@ -25,12 +22,8 @@ def run_module():
params = read_params()
cache_dir = params["cache_dir"]
export_dir = params["export_dir"]
static_file_dir = params["static_file_dir"]
export_start_dates = params["export_start_date"]
export_end_dates = params["export_end_date"]
map_df = pd.read_csv(
join(static_file_dir, "fips_prop_pop.csv"), dtype={"fips": int}
)

# Pull data and update export date
dfs, _end_date = pull_quidel_data(params)
Expand All @@ -48,36 +41,40 @@ def run_module():
wip_signal=params["wip_signal"],
prefix="wip_")

for sensor in sensors:
# Check either covid_ag or flu_ag
test_type = "covid_ag" if "covid_ag" in sensor else "flu_ag"
print("state", sensor)
for test_type in ["covid_ag", "flu_ag"]:
print("For %s:"%test_type)
data = dfs[test_type].copy()
state_groups = geo_map("state", data, map_df).groupby("state_id")
first_date, last_date = data["timestamp"].min(), data["timestamp"].max()

# For State Level
state_df = generate_sensor_for_states(
state_groups, smooth=SENSORS[sensor][1],
device=SENSORS[sensor][0], first_date=first_date,
last_date=last_date)
export_csv(state_df, "state", sensor, receiving_dir=export_dir,
start_date=export_start_dates[test_type],
end_date=export_end_dates[test_type])

# County/HRR/MSA level
for geo_res in GEO_RESOLUTIONS:
print(geo_res, sensor)
data = dfs[test_type].copy()
data, res_key = geo_map(geo_res, data, map_df)
res_df = generate_sensor_for_other_geores(
state_groups, data, res_key, smooth=SENSORS[sensor][1],
state_groups = geo_map("state", data).groupby("state_id")
first_date, last_date = data["timestamp"].min(), data["timestamp"].max()
for sensor in sensors:
if test_type not in sensor:
continue
print("state", sensor)
state_df = generate_sensor_for_states(
state_groups, smooth=SENSORS[sensor][1],
device=SENSORS[sensor][0], first_date=first_date,
last_date=last_date)
export_csv(res_df, geo_res, sensor, receiving_dir=export_dir,
export_csv(state_df, "state", sensor, receiving_dir=export_dir,
start_date=export_start_dates[test_type],
end_date=export_end_dates[test_type])

# County/HRR/MSA level
for geo_res in GEO_RESOLUTIONS:
geo_data, res_key = geo_map(geo_res, data)
for sensor in sensors:
if test_type not in sensor:
continue
print(geo_res, sensor)
res_df = generate_sensor_for_other_geores(
state_groups, geo_data, res_key, smooth=SENSORS[sensor][1],
device=SENSORS[sensor][0], first_date=first_date,
last_date=last_date)
export_csv(res_df, geo_res, sensor, receiving_dir=export_dir,
start_date=export_start_dates[test_type],
end_date=export_end_dates[test_type])

# Export the cache file if the pipeline runs successfully.
# Otherwise, don't update the cache file
update_cache_file(dfs, _end_date, cache_dir)
Binary file modified quidel/tests/test_data/covid_ag_test_data.xlsx
Binary file not shown.
24 changes: 12 additions & 12 deletions quidel/tests/test_geo_maps.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@
from delphi_quidel.geo_maps import geo_map


map_df = pd.read_csv(
join("../static", "fips_prop_pop.csv"), dtype={"fips": int}
)

class TestGeoMap:
def test_county(self):

Expand All @@ -23,10 +19,11 @@ def test_county(self):
"2020-06-15", "2020-06-15", "2020-06-15"],
"totalTest": [100, 50, 200, 200, 250, 500],
"positiveTest": [10, 8, 15, 5, 20, 50],
"numUniqueDevices": [2, 1, 1, 1, 1, 1]
}
)

new_df, res_key = geo_map("county", df, map_df)
new_df, res_key = geo_map("county", df)

assert res_key == 'fips'
assert set(new_df["fips"].values) == set(['25027', '53011', '48439'])
Expand All @@ -43,10 +40,11 @@ def test_state(self):
"2020-06-15", "2020-06-15", "2020-06-15"],
"totalTest": [100, 50, 200, 200, 250, 500],
"positiveTest": [10, 8, 15, 5, 20, 50],
"numUniqueDevices": [2, 1, 1, 1, 1, 1]
}
)

new_df = geo_map("state", df, map_df)
new_df = geo_map("state", df)

assert set(new_df["state_id"].values) == set(['ma', 'tx', 'wa'])
assert set(new_df["timestamp"].values) == set(df["timestamp"].values)
Expand All @@ -62,12 +60,13 @@ def test_hrr(self):
"2020-06-15", "2020-06-15", "2020-06-15"],
"totalTest": [100, 50, 200, 200, 250, 500],
"positiveTest": [10, 8, 15, 5, 20, 50],
"numUniqueDevices": [2, 1, 1, 1, 1, 1]
}
)

new_df, res_key = geo_map("hrr", df, map_df)
new_df, res_key = geo_map("hrr", df)

assert set(new_df["hrrnum"].values) == set([16, 231, 340, 344, 394])
assert set(new_df["hrr"].values) == set(["16", "231", "340", "344", "394"])
assert set(new_df["timestamp"].values) == set(df["timestamp"].values)
assert set(new_df["totalTest"].values) == set([500, 100, 250, 50, 400])
assert set(new_df["positiveTest"].values) == set([50, 10, 20, 8, 20])
Expand All @@ -76,18 +75,19 @@ def test_msa(self):

df = pd.DataFrame(
{
"zip": [1607, 73716, 73719, 76010, 74435, 74936],
"zip": [1607, 73716, 73719, 76010, 74945, 74936],
"timestamp": ["2020-06-15", "2020-06-15", "2020-06-15",
"2020-06-15", "2020-06-15", "2020-06-15"],
"totalTest": [100, 50, 200, 200, 250, 500],
"positiveTest": [10, 8, 15, 5, 20, 50],
"numUniqueDevices": [2, 1, 1, 1, 1, 1]
}
)

new_df, res_key = geo_map("msa", df, map_df)
new_df, res_key = geo_map("msa", df)

assert res_key == 'cbsa_id'
assert set(new_df["cbsa_id"].values) == set(['19100', '22900', '49340'])
assert res_key == 'msa'
assert set(new_df["msa"].values) == set(['19100', '22900', '49340'])
assert set(new_df["timestamp"].values) == set(df["timestamp"].values)
assert set(new_df["totalTest"].values) == set([200, 750, 100])
assert set(new_df["positiveTest"].values) == set([5, 70, 10])
2 changes: 1 addition & 1 deletion quidel/tests/test_run.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from os import listdir, remove
from os import listdir
from os.path import join

import pandas as pd
Expand Down