Skip to content

Commit a9c5e63

Browse files
committed
JHU prop signal fix and refactor
* drop XX000 FIPS when aggregating to state * refactor pull.py - encapsulate into functions, clarify the diffing code with pandas built-ins, use geomapper for population * remove unused static_file_dir param * improve test all around, add a subset of real JHU data as test file * tests: check for infinites, check to make sure the prop signals denominator matches the county sum total
1 parent 34a3585 commit a9c5e63

File tree

7 files changed

+117910
-219
lines changed

7 files changed

+117910
-219
lines changed

jhu/delphi_jhu/geo.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ def geo_map(df: pd.DataFrame, geo_res: str):
3535
if geo_res == "county":
3636
df.rename(columns={'fips': 'geo_id'}, inplace=True)
3737
elif geo_res == "state":
38+
df = df.set_index("fips")
39+
# Zero out the state FIPS population to avoid double counting.
40+
state_fips_codes = {str(x).zfill(2) + "000" for x in range(1,73)}
41+
subset_state_fips_codes = set(df.index.values) & state_fips_codes
42+
df.loc[subset_state_fips_codes, "population"] = 0
43+
df = df.reset_index()
3844
df = gmpr.replace_geocode(df, "fips", "state_id", new_col="geo_id", date_col="timestamp")
3945
else:
4046
df = gmpr.replace_geocode(df, "fips", geo_res, new_col="geo_id", date_col="timestamp")

jhu/delphi_jhu/pull.py

Lines changed: 72 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,66 @@
11
# -*- coding: utf-8 -*-
22

3-
import re
43
import pandas as pd
54
import numpy as np
65
from delphi_utils import GeoMapper
76

8-
def detect_date_col(col_name: str):
9-
"""determine if column name is a date"""
10-
date_match = re.match(r'\d{1,2}\/\d{1,2}\/\d{1,2}', col_name)
11-
if date_match:
12-
return True
13-
return False
147

15-
def pull_jhu_data(base_url: str, metric: str, pop_df: pd.DataFrame) -> pd.DataFrame:
8+
def download_data(base_url: str, metric: str) -> pd.DataFrame:
9+
"""
10+
Downloads the data from the JHU repo, extracts the UID and the date columns, and
11+
enforces the date datatype on the the time column.
12+
"""
13+
# Read data
14+
df = pd.read_csv(base_url.format(metric=metric))
15+
# Keep the UID and the time series columns only
16+
# The regex filters for columns with the date format MM-DD-YY or M-D-YY
17+
df = df.filter(regex="\d{1,2}\/\d{1,2}\/\d{2}|UID").melt(
18+
id_vars=["UID"], var_name="timestamp", value_name="cumulative_counts"
19+
)
20+
df["timestamp"] = pd.to_datetime(df["timestamp"])
21+
return df
22+
23+
24+
def create_diffs_column(df: pd.DataFrame) -> pd.DataFrame:
25+
"""
26+
Using the cumulative_counts column from the dataframe, partitions the dataframe
27+
into separate time-series based on fips, and then computes pairwise differences
28+
of the cumulative values to get the incidence values. Boundary cases are handled
29+
by zero-filling the day prior.
30+
"""
31+
# Take time-diffs in each geo_code partition
32+
df = df.set_index(["fips", "timestamp"])
33+
df["new_counts"] = df.groupby(level=0)["cumulative_counts"].diff()
34+
# Fill the NA value for the first date of each partition with the cumulative value that day
35+
# (i.e. pretend the cumulative count the day before was 0)
36+
na_value_mask = df["new_counts"].isna()
37+
df.loc[na_value_mask, "new_counts"] = df.loc[na_value_mask, "cumulative_counts"]
38+
df = df.reset_index()
39+
return df
40+
41+
42+
def sanity_check_data(df: pd.DataFrame) -> pd.DataFrame:
43+
"""
44+
Perform a final set of sanity checks on the data.
45+
"""
46+
days_by_fips = df.groupby("fips").count()["cumulative_counts"].unique()
47+
unique_days = df["timestamp"].unique()
48+
49+
# each FIPS has same number of rows
50+
if (len(days_by_fips) > 1) or (days_by_fips[0] != len(unique_days)):
51+
raise ValueError("Differing number of days by fips")
52+
53+
min_timestamp = min(unique_days)
54+
max_timestamp = max(unique_days)
55+
n_days = (max_timestamp - min_timestamp) / np.timedelta64(1, "D") + 1
56+
if n_days != len(unique_days):
57+
raise ValueError(
58+
f"Not every day between {min_timestamp} and "
59+
"{max_timestamp} is represented."
60+
)
61+
62+
63+
def pull_jhu_data(base_url: str, metric: str, gmpr: GeoMapper) -> pd.DataFrame:
1664
"""Pulls the latest Johns Hopkins CSSE data, and conforms it into a dataset
1765
1866
The output dataset has:
@@ -28,87 +76,39 @@ def pull_jhu_data(base_url: str, metric: str, pop_df: pd.DataFrame) -> pd.DataFr
2876
may be negative. This is wholly dependent on the quality of the raw
2977
dataset.
3078
31-
We filter the data such that we only keep rows with valid FIPS, or "FIPS"
79+
We filter the data such that we only keep rows with valid FIPS or "FIPS"
3280
codes defined under the exceptions of the README. The current exceptions
3381
include:
3482
35-
- 70002: Dukes County and Nantucket County in Massachusetts, which are
36-
reported together
37-
- 70003: Kansas City, Missouri, which reports counts separately from the
38-
four counties it intesects (Platte, Cass, Clay, Jackson Counties)
39-
4083
Parameters
4184
----------
4285
base_url: str
43-
Base URL for pulling the JHU CSSE data
86+
Base URL for pulling the JHU CSSE data.
4487
metric: str
4588
One of 'confirmed' or 'deaths'.
46-
pop_df: pd.DataFrame
47-
Read from static file "fips_population.csv".
89+
gmpr: GeoMapper
90+
An instance of the geomapping utility.
4891
4992
Returns
5093
-------
5194
pd.DataFrame
5295
Dataframe as described above.
5396
"""
97+
df = download_data(base_url, metric)
5498

55-
# Read data
56-
df = pd.read_csv(base_url.format(metric=metric))
57-
58-
# FIPS are missing for some nonstandard FIPS
59-
date_cols = [col_name for col_name in df.columns if detect_date_col(col_name)]
60-
keep_cols = date_cols + ['UID']
61-
df = df[keep_cols]
62-
63-
df = df.melt(
64-
id_vars=["UID"],
65-
var_name="timestamp",
66-
value_name="cumulative_counts",
99+
gmpr = GeoMapper()
100+
df = gmpr.replace_geocode(
101+
df, "jhu_uid", "fips", from_col="UID", date_col="timestamp"
67102
)
68-
df["timestamp"] = pd.to_datetime(df["timestamp"])
69103

70-
gmpr = GeoMapper()
71-
df = gmpr.replace_geocode(df, "jhu_uid", "fips", from_col="UID", date_col="timestamp")
72104
# Merge in population, set population as NAN for fake fips
73-
df = pd.merge(df, pop_df, on="fips", how='left')
74-
75-
# Add a dummy first row here on day before first day
76-
# code below could be cleaned with groupby.diff
77-
78-
min_ts = min(df["timestamp"])
79-
df_dummy = df.loc[df["timestamp"] == min_ts].copy()
80-
df_dummy.loc[:, "timestamp"] = min_ts - pd.Timedelta(days=1)
81-
df_dummy.loc[:, "cumulative_counts"] = 0
82-
df = pd.concat([df_dummy, df])
83-
# Obtain new_counts
84-
df.sort_values(["fips", "timestamp"], inplace=True)
85-
df["new_counts"] = df["cumulative_counts"].diff() # 1st discrete difference
86-
# Handle edge cases where we diffed across fips
87-
mask = df["fips"] != df["fips"].shift(1)
88-
df.loc[mask, "new_counts"] = np.nan
89-
df.reset_index(inplace=True, drop=True)
105+
df = gmpr.add_population_column("fips", df)
106+
107+
df = create_diffs_column(df)
90108

91109
# Final sanity checks
92-
days_by_fips = df.groupby("fips").count()["cumulative_counts"].unique()
93-
unique_days = df["timestamp"].unique()
94-
# each FIPS has same number of rows
95-
if (len(days_by_fips) > 1) or (days_by_fips[0] != len(unique_days)):
96-
raise ValueError("Differing number of days by fips")
97-
min_timestamp = min(unique_days)
98-
max_timestamp = max(unique_days)
99-
n_days = (max_timestamp - min_timestamp) / np.timedelta64(1, "D") + 1
100-
if n_days != len(unique_days):
101-
raise ValueError(
102-
f"Not every day between {min_timestamp} and "
103-
"{max_timestamp} is represented."
104-
)
105-
return df.loc[
106-
df["timestamp"] >= min_ts,
107-
[ # Reorder
108-
"fips",
109-
"timestamp",
110-
"population",
111-
"new_counts",
112-
"cumulative_counts",
113-
],
114-
]
110+
sanity_check_data(df)
111+
112+
# Reorder columns
113+
df = df[["fips", "timestamp", "population", "new_counts", "cumulative_counts"]]
114+
return df

jhu/delphi_jhu/run.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ def run_module():
7373
export_start_date = params["export_start_date"]
7474
export_dir = params["export_dir"]
7575
base_url = params["base_url"]
76-
static_file_dir = params["static_file_dir"]
7776
cache_dir = params["cache_dir"]
7877

7978
if len(params["bucket_name"]) > 0:
@@ -85,9 +84,8 @@ def run_module():
8584
else:
8685
arch_diff = None
8786

88-
pop_df = GeoMapper().add_population_column("fips")
89-
90-
dfs = {metric: pull_jhu_data(base_url, metric, pop_df) for metric in METRICS}
87+
gmpr = GeoMapper()
88+
dfs = {metric: pull_jhu_data(base_url, metric, gmpr) for metric in METRICS}
9189
for metric, geo_res, sensor, smoother in product(
9290
METRICS, GEO_RESOLUTIONS, SENSORS, SMOOTHERS):
9391
print(metric, geo_res, sensor, smoother)

jhu/tests/conftest.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from os import listdir, remove
88
from os.path import join
9+
import pandas as pd
910

1011
from delphi_utils import read_params
1112
from delphi_jhu.run import run_module
@@ -25,3 +26,9 @@ def run_as_module():
2526
s3_client.create_bucket(Bucket=params["bucket_name"])
2627

2728
run_module()
29+
30+
@pytest.fixture
31+
def jhu_confirmed_test_data():
32+
df = pd.read_csv("test_data/jhu_confirmed.csv", dtype={"fips": str})
33+
df["timestamp"] = pd.to_datetime(df["timestamp"])
34+
return df

0 commit comments

Comments
 (0)