Skip to content

Commit 2884745

Browse files
authored
Merge pull request #398 from cmu-delphi/deploy-jhu
Propagate 328 (JHU population fix and function refactor) to main
2 parents dcaf97d + 7d84fe6 commit 2884745

File tree

14 files changed

+118297
-4587
lines changed

14 files changed

+118297
-4587
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,3 +126,7 @@ venv.bak/
126126
.retry
127127
.indicators-ansible-vault-pass
128128
indicators-ansible-vault-pass
129+
130+
# testing_utils
131+
testing_utils/cache
132+
testing_utils/*.csv

_delphi_utils_python/data_proc/geomap/geo_data_proc.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,7 @@ def create_fips_population_table():
391391
df_pr = df_pr.groupby("fips").sum().reset_index()
392392
df_pr = df_pr[~df_pr["fips"].isin(census_pop["fips"])]
393393
census_pop_pr = pd.concat([census_pop, df_pr])
394+
394395
census_pop_pr.to_csv(join(OUTPUT_DIR, FIPS_POPULATION_OUT_FILENAME), index=False)
395396

396397

_delphi_utils_python/delphi_utils/geomap.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -413,9 +413,10 @@ def replace_geocode(
413413
df = df.groupby([new_col]).sum().reset_index()
414414
return df
415415

416-
def add_population_column(self, data, geocode_type, geocode_col=None):
416+
def add_population_column(self, data, geocode_type, geocode_col=None, dropna=True):
417417
"""
418-
Appends a population column to a dateframe, based on the FIPS or ZIP code.
418+
Appends a population column to a dataframe, based on the FIPS or ZIP code. If no
419+
dataframe is provided, the full crosswalk from geocode to population is returned.
419420
420421
Parameters
421422
---------
@@ -433,24 +434,26 @@ def add_population_column(self, data, geocode_type, geocode_col=None):
433434
A dataframe with a population column appended.
434435
"""
435436
geocode_col = geocode_type if geocode_col is None else geocode_col
437+
data = data.copy()
436438

437439
if geocode_type not in ["fips", "zip"]:
438440
raise ValueError(
439441
"Only fips and zip geocodes supported. \
440442
For other codes, aggregate those."
441443
)
442444

445+
pop_df = self._load_crosswalk(from_code=geocode_type, to_code="pop")
446+
443447
if not is_string_dtype(data[geocode_col]):
444448
data[geocode_col] = data[geocode_col].astype(str).str.zfill(5)
445449

446-
pop_df = self._load_crosswalk(from_code=geocode_type, to_code="pop")
447-
450+
merge_type = "inner" if dropna else "left"
448451
data_with_pop = (
449-
data.copy()
450-
.merge(pop_df, left_on=geocode_col, right_on=geocode_type, how="inner")
452+
data
453+
.merge(pop_df, left_on=geocode_col, right_on=geocode_type, how=merge_type)
451454
.rename(columns={"pop": "population"})
452455
)
453-
data_with_pop["population"] = data_with_pop["population"].astype(int)
456+
454457
return data_with_pop
455458

456459
@staticmethod

jhu/delphi_jhu/geo.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ def geo_map(df: pd.DataFrame, geo_res: str):
3535
if geo_res == "county":
3636
df.rename(columns={'fips': 'geo_id'}, inplace=True)
3737
elif geo_res == "state":
38+
df = df.set_index("fips")
39+
# Zero out the state FIPS population to avoid double counting.
40+
state_fips_codes = {str(x).zfill(2) + "000" for x in range(1,73)}
41+
subset_state_fips_codes = set(df.index.values) & state_fips_codes
42+
df.loc[subset_state_fips_codes, "population"] = 0
43+
df = df.reset_index()
3844
df = gmpr.replace_geocode(df, "fips", "state_id", new_col="geo_id", date_col="timestamp")
3945
else:
4046
df = gmpr.replace_geocode(df, "fips", geo_res, new_col="geo_id", date_col="timestamp")

jhu/delphi_jhu/pull.py

Lines changed: 73 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,66 @@
11
# -*- coding: utf-8 -*-
22

3-
import re
43
import pandas as pd
54
import numpy as np
65
from delphi_utils import GeoMapper
76

8-
def detect_date_col(col_name: str):
9-
"""determine if column name is a date"""
10-
date_match = re.match(r'\d{1,2}\/\d{1,2}\/\d{1,2}', col_name)
11-
if date_match:
12-
return True
13-
return False
147

15-
def pull_jhu_data(base_url: str, metric: str, pop_df: pd.DataFrame) -> pd.DataFrame:
8+
def download_data(base_url: str, metric: str) -> pd.DataFrame:
9+
"""
10+
Downloads the data from the JHU repo, extracts the UID and the date columns, and
11+
enforces the date datatype on the the time column.
12+
"""
13+
# Read data
14+
df = pd.read_csv(base_url.format(metric=metric))
15+
# Keep the UID and the time series columns only
16+
# The regex filters for columns with the date format MM-DD-YY or M-D-YY
17+
df = df.filter(regex="\d{1,2}\/\d{1,2}\/\d{2}|UID").melt(
18+
id_vars=["UID"], var_name="timestamp", value_name="cumulative_counts"
19+
)
20+
df["timestamp"] = pd.to_datetime(df["timestamp"])
21+
return df
22+
23+
24+
def create_diffs_column(df: pd.DataFrame) -> pd.DataFrame:
25+
"""
26+
Using the cumulative_counts column from the dataframe, partitions the dataframe
27+
into separate time-series based on fips, and then computes pairwise differences
28+
of the cumulative values to get the incidence values. Boundary cases are handled
29+
by zero-filling the day prior.
30+
"""
31+
# Take time-diffs in each geo_code partition
32+
df = df.set_index(["fips", "timestamp"])
33+
df["new_counts"] = df.groupby(level=0)["cumulative_counts"].diff()
34+
# Fill the NA value for the first date of each partition with the cumulative value that day
35+
# (i.e. pretend the cumulative count the day before was 0)
36+
na_value_mask = df["new_counts"].isna()
37+
df.loc[na_value_mask, "new_counts"] = df.loc[na_value_mask, "cumulative_counts"]
38+
df = df.reset_index()
39+
return df
40+
41+
42+
def sanity_check_data(df: pd.DataFrame) -> pd.DataFrame:
43+
"""
44+
Perform a final set of sanity checks on the data.
45+
"""
46+
days_by_fips = df.groupby("fips").count()["cumulative_counts"].unique()
47+
unique_days = df["timestamp"].unique()
48+
49+
# each FIPS has same number of rows
50+
if (len(days_by_fips) > 1) or (days_by_fips[0] != len(unique_days)):
51+
raise ValueError("Differing number of days by fips")
52+
53+
min_timestamp = min(unique_days)
54+
max_timestamp = max(unique_days)
55+
n_days = (max_timestamp - min_timestamp) / np.timedelta64(1, "D") + 1
56+
if n_days != len(unique_days):
57+
raise ValueError(
58+
f"Not every day between {min_timestamp} and "
59+
"{max_timestamp} is represented."
60+
)
61+
62+
63+
def pull_jhu_data(base_url: str, metric: str, gmpr: GeoMapper) -> pd.DataFrame:
1664
"""Pulls the latest Johns Hopkins CSSE data, and conforms it into a dataset
1765
1866
The output dataset has:
@@ -28,92 +76,37 @@ def pull_jhu_data(base_url: str, metric: str, pop_df: pd.DataFrame) -> pd.DataFr
2876
may be negative. This is wholly dependent on the quality of the raw
2977
dataset.
3078
31-
We filter the data such that we only keep rows with valid FIPS, or "FIPS"
32-
codes defined under the exceptions of the README. The current exceptions
33-
include:
34-
35-
- 70002: Dukes County and Nantucket County in Massachusetts, which are
36-
reported together
37-
- 70003: Kansas City, Missouri, which reports counts separately from the
38-
four counties it intesects (Platte, Cass, Clay, Jackson Counties)
79+
We filter the data such that we only keep rows with valid FIPS or "FIPS"
80+
codes defined under the exceptions of the README.
3981
4082
Parameters
4183
----------
4284
base_url: str
43-
Base URL for pulling the JHU CSSE data
85+
Base URL for pulling the JHU CSSE data.
4486
metric: str
4587
One of 'confirmed' or 'deaths'.
46-
pop_df: pd.DataFrame
47-
Read from static file "fips_population.csv".
88+
gmpr: GeoMapper
89+
An instance of the geomapping utility.
4890
4991
Returns
5092
-------
5193
pd.DataFrame
5294
Dataframe as described above.
5395
"""
96+
df = download_data(base_url, metric)
5497

55-
# Read data
56-
df = pd.read_csv(base_url.format(metric=metric))
57-
58-
# FIPS are missing for some nonstandard FIPS
59-
date_cols = [col_name for col_name in df.columns if detect_date_col(col_name)]
60-
keep_cols = date_cols + ['UID']
61-
df = df[keep_cols]
62-
63-
df = df.melt(
64-
id_vars=["UID"],
65-
var_name="timestamp",
66-
value_name="cumulative_counts",
98+
gmpr = GeoMapper()
99+
df = gmpr.replace_geocode(
100+
df, "jhu_uid", "fips", from_col="UID", date_col="timestamp"
67101
)
68-
df["timestamp"] = pd.to_datetime(df["timestamp"])
69102

70-
gmpr = GeoMapper()
71-
df = gmpr.replace_geocode(df, "jhu_uid", "fips", from_col="UID", date_col="timestamp")
72-
73-
# Merge in population LOWERCASE, consistent across confirmed and deaths
74-
# Set population as NAN for fake fips
75-
pop_df.rename(columns={'FIPS':'fips'}, inplace=True)
76-
pop_df['fips'] = pop_df['fips'].astype(int).\
77-
astype(str).str.zfill(5)
78-
df = df.merge(pop_df, on="fips", how='left')
79-
80-
# Add a dummy first row here on day before first day
81-
# code below could be cleaned with groupby.diff
82-
83-
min_ts = min(df["timestamp"])
84-
df_dummy = df.loc[df["timestamp"] == min_ts].copy()
85-
df_dummy.loc[:, "timestamp"] = min_ts - pd.Timedelta(days=1)
86-
df_dummy.loc[:, "cumulative_counts"] = 0
87-
df = pd.concat([df_dummy, df])
88-
# Obtain new_counts
89-
df.sort_values(["fips", "timestamp"], inplace=True)
90-
df["new_counts"] = df["cumulative_counts"].diff() # 1st discrete difference
91-
# Handle edge cases where we diffed across fips
92-
mask = df["fips"] != df["fips"].shift(1)
93-
df.loc[mask, "new_counts"] = np.nan
94-
df.reset_index(inplace=True, drop=True)
103+
# Merge in population, set population as NAN for fake fips
104+
df = gmpr.add_population_column(df, "fips")
105+
df = create_diffs_column(df)
95106

96107
# Final sanity checks
97-
days_by_fips = df.groupby("fips").count()["cumulative_counts"].unique()
98-
unique_days = df["timestamp"].unique()
99-
# each FIPS has same number of rows
100-
if (len(days_by_fips) > 1) or (days_by_fips[0] != len(unique_days)):
101-
raise ValueError("Differing number of days by fips")
102-
min_timestamp = min(unique_days)
103-
max_timestamp = max(unique_days)
104-
n_days = (max_timestamp - min_timestamp) / np.timedelta64(1, "D") + 1
105-
if n_days != len(unique_days):
106-
raise ValueError(
107-
f"Not every day between {min_timestamp} and "
108-
"{max_timestamp} is represented."
109-
)
110-
return df.loc[
111-
df["timestamp"] >= min_ts,
112-
[ # Reorder
113-
"fips",
114-
"timestamp",
115-
"population",
116-
"new_counts",
117-
"cumulative_counts",
118-
],
119-
]
108+
sanity_check_data(df)
109+
110+
# Reorder columns
111+
df = df[["fips", "timestamp", "population", "new_counts", "cumulative_counts"]]
112+
return df

jhu/delphi_jhu/run.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
S3ArchiveDiffer,
1818
)
1919

20+
from delphi_utils import GeoMapper
2021
from .geo import geo_map
2122
from .pull import pull_jhu_data
2223
from .smooth import (
@@ -72,7 +73,6 @@ def run_module():
7273
export_start_date = params["export_start_date"]
7374
export_dir = params["export_dir"]
7475
base_url = params["base_url"]
75-
static_file_dir = params["static_file_dir"]
7676
cache_dir = params["cache_dir"]
7777

7878
if len(params["bucket_name"]) > 0:
@@ -84,12 +84,8 @@ def run_module():
8484
else:
8585
arch_diff = None
8686

87-
pop_df = pd.read_csv(
88-
join(static_file_dir, "fips_population.csv"),
89-
dtype={"fips": float, "population": float},
90-
)
91-
92-
dfs = {metric: pull_jhu_data(base_url, metric, pop_df) for metric in METRICS}
87+
gmpr = GeoMapper()
88+
dfs = {metric: pull_jhu_data(base_url, metric, gmpr) for metric in METRICS}
9389
for metric, geo_res, sensor, smoother in product(
9490
METRICS, GEO_RESOLUTIONS, SENSORS, SMOOTHERS):
9591
print(metric, geo_res, sensor, smoother)

0 commit comments

Comments
 (0)