Skip to content

Refactor usafacts to use geo utils #316

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Oct 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 19 additions & 65 deletions usafacts/delphi_usafacts/geo.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,10 @@
"""Functions for converting geocodes."""
import pandas as pd

from delphi_utils import GeoMapper

INCIDENCE_BASE = 100000
# https://code.activestate.com/recipes/577775-state-fips-codes-dict/
STATE_TO_FIPS = {
"WA": "53",
"DE": "10",
"DC": "11",
"WI": "55",
"WV": "54",
"HI": "15",
"FL": "12",
"WY": "56",
"PR": "72",
"NJ": "34",
"NM": "35",
"TX": "48",
"LA": "22",
"NC": "37",
"ND": "38",
"NE": "31",
"TN": "47",
"NY": "36",
"PA": "42",
"AK": "02",
"NV": "32",
"NH": "33",
"VA": "51",
"CO": "08",
"CA": "06",
"AL": "01",
"AR": "05",
"VT": "50",
"IL": "17",
"GA": "13",
"IN": "18",
"IA": "19",
"MA": "25",
"AZ": "04",
"ID": "16",
"CT": "09",
"ME": "23",
"MD": "24",
"OK": "40",
"OH": "39",
"UT": "49",
"MO": "29",
"MN": "27",
"MI": "26",
"RI": "44",
"KS": "20",
"MT": "30",
"MS": "28",
"SC": "45",
"KY": "21",
"OR": "41",
"SD": "46",
}

SECONDARY_FIPS = [
("51620", ["51093", "51175"]),
("51685", ["51153"]),
Expand All @@ -77,6 +24,7 @@
("46102", "46113"),
]


FIPS_TO_STATE = {v: k.lower() for k, v in STATE_TO_FIPS.items()}

# Valid geographical resolutions output by this indicator.
Expand Down Expand Up @@ -109,7 +57,6 @@ def fips_to_state(fips: str) -> str:
"""
return FIPS_TO_STATE[fips[:2]]


def disburse(df: pd.DataFrame, pooled_fips: str, fips_list: list):
"""Disburse counts from POOLED_FIPS equally to the counties in FIPS_LIST.

Expand Down Expand Up @@ -148,7 +95,7 @@ def geo_map(df: pd.DataFrame, geo_res: str, map_df: pd.DataFrame, sensor: str):
Columns: fips, timestamp, new_counts, cumulative_counts, population ...
geo_res: str
Geographic resolution to which to aggregate. Valid options:
('county', 'state', 'msa', 'hrr').
("county", "state", "msa", "hrr").
map_df: pd.DataFrame
Loaded from static file "fips_prop_pop.csv".
sensor: str
Expand All @@ -164,23 +111,28 @@ def geo_map(df: pd.DataFrame, geo_res: str, map_df: pd.DataFrame, sensor: str):
if geo_res not in VALID_GEO_RES:
raise ValueError(f"geo_res must be one of {VALID_GEO_RES}")

df_mega = df[df['fips'].astype(int) % 1000 == 0].copy()
# State-level records unassigned to specific counties are coded as fake
# counties with fips XX000.
unassigned_counties = df[df["fips"].str.endswith("000")].copy()

df = df[df['fips'].astype(int) % 1000 != 0].copy()
df = df[df["fips"].astype(int) % 1000 != 0].copy()
# Disburse unallocated cases/deaths in NYC to NYC counties
df = disburse(df, NYC_FIPS[0][0], NYC_FIPS[0][1])
df = df[df['fips'] != NYC_FIPS[0][0]]
df = df[df["fips"] != NYC_FIPS[0][0]]

if geo_res == "county":
if sensor not in PROP_SENSORS:
df = df.append(df_mega)
# It is not clear how to calculate the proportion for unallocated
# cases/deaths, so we exclude them for those sensors.
df = df.append(unassigned_counties)
df["geo_id"] = df["fips"]
elif geo_res == "state":
# Grab first two digits of fips
# Map state fips to us postal code
# Add unallocated cases/deaths
df = df.append(df_mega)
df["geo_id"] = df["fips"].apply(fips_to_state)
df = df.append(unassigned_counties)
geo_mapper = GeoMapper()
df = geo_mapper.add_geocode(df, "fips", "state_id", new_col="geo_id")
elif geo_res in ("msa", "hrr"):
# Map "missing" secondary FIPS to those that are in our canonical set
for fips, fips_list in SECONDARY_FIPS:
Expand All @@ -192,12 +144,14 @@ def geo_map(df: pd.DataFrame, geo_res: str, map_df: pd.DataFrame, sensor: str):
map_df["geo_id"] = map_df[colname].astype(int)
df["fips"] = df["fips"].astype(int)
merged = df.merge(map_df, on="fips")
merged["cumulative_counts"] = merged["cumulative_counts"] * merged["pop_prop"]
merged["cumulative_counts"] =\
merged["cumulative_counts"] * merged["pop_prop"]
merged["new_counts"] = merged["new_counts"] * merged["pop_prop"]
merged["population"] = merged["population"] * merged["pop_prop"]
df = merged.drop(["zip", "pop_prop", "hrrnum", "cbsa_id"], axis=1)
df = df.drop("fips", axis=1)
df = df.groupby(["geo_id", "timestamp"]).sum().reset_index()
df["incidence"] = df["new_counts"] / df["population"] * INCIDENCE_BASE
df["cumulative_prop"] = df["cumulative_counts"] / df["population"] * INCIDENCE_BASE
df["cumulative_prop"] =\
df["cumulative_counts"] / df["population"] * INCIDENCE_BASE
return df
20 changes: 17 additions & 3 deletions usafacts/delphi_usafacts/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""Functions for pulling data from the USAFacts website."""
import numpy as np
import pandas as pd
from delphi_utils import GeoMapper

# Columns to drop the the data frame.
DROP_COLUMNS = [
Expand All @@ -12,7 +13,7 @@
]


def pull_usafacts_data(base_url: str, metric: str, pop_df: pd.DataFrame) -> pd.DataFrame:
def pull_usafacts_data(base_url: str, metric: str, geo_mapper: GeoMapper) -> pd.DataFrame:
"""Pulls the latest USA Facts data, and conforms it into a dataset

The output dataset has:
Expand Down Expand Up @@ -44,8 +45,8 @@ def pull_usafacts_data(base_url: str, metric: str, pop_df: pd.DataFrame) -> pd.D
Base URL for pulling the USA Facts data
metric: str
One of 'confirmed' or 'deaths'. The keys of base_url.
pop_df: pd.DataFrame
Read from static file "fips_population.csv".
geo_mapper: GeoMapper
GeoMapper object with population info.

Returns
-------
Expand Down Expand Up @@ -82,6 +83,19 @@ def pull_usafacts_data(base_url: str, metric: str, pop_df: pd.DataFrame) -> pd.D

# Conform FIPS
df["fips"] = df["FIPS"].apply(lambda x: f"{int(x):05d}")

# The FIPS code 00001 is a dummy for unallocated NYC data. It doesn't have
# a corresponding population entry in the GeoMapper so it will be dropped
# in the call to `add_population_column()`. We pull it out here to
# reinsert it after the population data is added.
nyc_dummy_row = df[df["fips"] == "00001"]
assert len(nyc_dummy_row) == 1

# Merge in population LOWERCASE, consistent across confirmed and deaths
# Population for unassigned cases/deaths is NAN
df = geo_mapper.add_population_column(df, "fips")
df = df.append(nyc_dummy_row, ignore_index=True)

# Drop unnecessary columns (state is pre-encoded in fips)
try:
df.drop(DROP_COLUMNS, axis=1, inplace=True)
Expand Down
9 changes: 4 additions & 5 deletions usafacts/delphi_usafacts/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
read_params,
create_export_csv,
S3ArchiveDiffer,
GeoMapper
)

from .geo import geo_map
Expand Down Expand Up @@ -88,12 +89,10 @@ def run_module():
map_df = pd.read_csv(
join(static_file_dir, "fips_prop_pop.csv"), dtype={"fips": int}
)
pop_df = pd.read_csv(
join(static_file_dir, "fips_population.csv"),
dtype={"fips": float, "population": float},
).rename({"fips": "FIPS"}, axis=1)

dfs = {metric: pull_usafacts_data(base_url, metric, pop_df) for metric in METRICS}
geo_mapper = GeoMapper()

dfs = {metric: pull_usafacts_data(base_url, metric, geo_mapper) for metric in METRICS}
for metric, geo_res, sensor, smoother in product(
METRICS, GEO_RESOLUTIONS, SENSORS, SMOOTHERS):
print(geo_res, metric, sensor, smoother)
Expand Down
42 changes: 17 additions & 25 deletions usafacts/tests/test_geo.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,22 @@
import pytest

from os.path import join

import pytest

import numpy as np
import pandas as pd
from delphi_usafacts.geo import fips_to_state, disburse, geo_map
from delphi_usafacts.geo import disburse, geo_map

MAP_DF = pd.read_csv(
join("..", "static", "fips_prop_pop.csv"),
dtype={"fips": int}
)

sensor = "new_counts"
class TestFipsToState:

def test_normal(self):

assert fips_to_state("53003") == "wa"
assert fips_to_state("48027") == "tx"
assert fips_to_state("12003") == "fl"
assert fips_to_state("50103") == "vt"
assert fips_to_state("15003") == "hi"

SENSOR = "new_counts"

class TestDisburse:
"""Tests for the `geo.disburse()` function."""
def test_even(self):

"""Tests that values are disbursed evenly across recipients."""
df = pd.DataFrame(
{
"fips": ["51093", "51175", "51620"],
Expand All @@ -43,8 +34,9 @@ def test_even(self):


class TestGeoMap:
"""Tests for `geo.geo_map()`."""
def test_incorrect_geo(self):

"""Tests that an invalid resolution raises an error."""
df = pd.DataFrame(
{
"fips": ["53003", "48027", "50103"],
Expand All @@ -56,10 +48,10 @@ def test_incorrect_geo(self):
)

with pytest.raises(ValueError):
geo_map(df, "département", MAP_DF, sensor)
geo_map(df, "département", MAP_DF, SENSOR)

def test_county(self):

"""Tests that values are correctly aggregated at the county level."""
df = pd.DataFrame(
{
"fips": ["53003", "48027", "50103"],
Expand All @@ -70,7 +62,7 @@ def test_county(self):
}
)

new_df = geo_map(df, "county", MAP_DF, sensor)
new_df = geo_map(df, "county", MAP_DF, SENSOR)

exp_incidence = df["new_counts"] / df["population"] * 100000
exp_cprop = df["cumulative_counts"] / df["population"] * 100000
Expand All @@ -81,7 +73,7 @@ def test_county(self):
assert set(new_df["cumulative_prop"].values) == set(exp_cprop.values)

def test_state(self):

"""Tests that values are correctly aggregated at the state level."""
df = pd.DataFrame(
{
"fips": ["04001", "04003", "04009", "25023"],
Expand All @@ -92,7 +84,7 @@ def test_state(self):
}
)

new_df = geo_map(df, "state", MAP_DF, sensor)
new_df = geo_map(df, "state", MAP_DF, SENSOR)

exp_incidence = np.array([27, 13]) / np.array([2500, 25]) * 100000
exp_cprop = np.array([165, 60]) / np.array([2500, 25]) * 100000
Expand All @@ -106,7 +98,7 @@ def test_state(self):
assert (new_df["cumulative_prop"].values == exp_cprop).all()

def test_hrr(self):

"""Tests that values are correctly aggregated at the HRR level."""
df = pd.DataFrame(
{
"fips": ["13009", "13017", "13021", "09015"],
Expand All @@ -117,7 +109,7 @@ def test_hrr(self):
}
)

new_df = geo_map(df, "hrr", MAP_DF, sensor)
new_df = geo_map(df, "hrr", MAP_DF, SENSOR)

exp_incidence = np.array([13, 27]) / np.array([25, 2500]) * 100000
exp_cprop = np.array([60, 165]) / np.array([25, 2500]) * 100000
Expand All @@ -131,7 +123,7 @@ def test_hrr(self):
assert new_df["cumulative_prop"].values == pytest.approx(exp_cprop)

def test_msa(self):

"""Tests that values are correctly aggregated at the MSA level."""
df = pd.DataFrame(
{
"fips": ["13009", "13017", "13021", "09015"],
Expand All @@ -142,7 +134,7 @@ def test_msa(self):
}
)

new_df = geo_map(df, "msa", MAP_DF, sensor)
new_df = geo_map(df, "msa", MAP_DF, SENSOR)

exp_incidence = np.array([2, 13]) / np.array([300, 25]) * 100000
exp_cprop = np.array([45, 60]) / np.array([300, 25]) * 100000
Expand Down
16 changes: 7 additions & 9 deletions usafacts/tests/test_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,9 @@
from os.path import join

import pandas as pd
from delphi_utils import GeoMapper
from delphi_usafacts.pull import pull_usafacts_data

pop_df = pd.read_csv(
join("..", "static", "fips_population.csv"),
dtype={"fips": float, "population": float}
).rename({"fips": "FIPS"}, axis=1)

base_url_good = "test_data/small_{metric}.csv"

base_url_bad = {
Expand All @@ -18,11 +14,13 @@
"extra_cols": "test_data/bad_{metric}_extra_cols.csv"
}

geo_mapper = GeoMapper()


class TestPullUSAFacts:
def test_good_file(self):
metric = "deaths"
df = pull_usafacts_data(base_url_good, metric, pop_df)
df = pull_usafacts_data(base_url_good, metric, geo_mapper)

assert (
df.columns.values
Expand All @@ -34,21 +32,21 @@ def test_missing_days(self):
metric = "confirmed"
with pytest.raises(ValueError):
df = pull_usafacts_data(
base_url_bad["missing_days"], metric, pop_df
base_url_bad["missing_days"], metric, geo_mapper
)

def test_missing_cols(self):

metric = "confirmed"
with pytest.raises(ValueError):
df = pull_usafacts_data(
base_url_bad["missing_cols"], metric, pop_df
base_url_bad["missing_cols"], metric, geo_mapper
)

def test_extra_cols(self):

metric = "confirmed"
with pytest.raises(ValueError):
df = pull_usafacts_data(
base_url_bad["extra_cols"], metric, pop_df
base_url_bad["extra_cols"], metric, geo_mapper
)