Skip to content

Commit 6b407bf

Browse files
committed
Merge branch 'main' of github.com:cmu-delphi/covidcast-indicators into pylint
2 parents ab0ab6e + a233937 commit 6b407bf

File tree

16 files changed

+118320
-4610
lines changed

16 files changed

+118320
-4610
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,3 +126,7 @@ venv.bak/
126126
.retry
127127
.indicators-ansible-vault-pass
128128
indicators-ansible-vault-pass
129+
130+
# testing_utils
131+
testing_utils/cache
132+
testing_utils/*.csv

_delphi_utils_python/data_proc/geomap/geo_data_proc.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,12 @@ def create_jhu_uid_fips_crosswalk():
217217
{"jhu_uid": "63072999", "fips": "72000", "weight": 1.0},
218218
]
219219
)
220+
cruise_ships = pd.DataFrame(
221+
[
222+
{"jhu_uid": "84088888", "fips": "88888", "weight": 1.0},
223+
{"jhu_uid": "84099999", "fips": "99999", "weight": 1.0},
224+
]
225+
)
220226

221227
jhu_df = (
222228
pd.read_csv(JHU_FIPS_URL, dtype={"UID": str, "FIPS": str})
@@ -234,7 +240,7 @@ def create_jhu_uid_fips_crosswalk():
234240
# Drop the JHU UIDs that were hand-modified
235241
dup_ind = jhu_df["jhu_uid"].isin(
236242
pd.concat(
237-
[hand_additions, unassigned_states, out_of_state, puerto_rico_unassigned]
243+
[hand_additions, unassigned_states, out_of_state, puerto_rico_unassigned, cruise_ships]
238244
)["jhu_uid"].values
239245
)
240246
jhu_df.drop(jhu_df.index[dup_ind], inplace=True)
@@ -391,6 +397,7 @@ def create_fips_population_table():
391397
df_pr = df_pr.groupby("fips").sum().reset_index()
392398
df_pr = df_pr[~df_pr["fips"].isin(census_pop["fips"])]
393399
census_pop_pr = pd.concat([census_pop, df_pr])
400+
394401
census_pop_pr.to_csv(join(OUTPUT_DIR, FIPS_POPULATION_OUT_FILENAME), index=False)
395402

396403

_delphi_utils_python/delphi_utils/data/jhu_uid_fips_table.csv

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,6 @@ jhu_uid,fips,weight
8282
63072149,72149,1.0
8383
63072151,72151,1.0
8484
63072153,72153,1.0
85-
84088888,88888,1.0
86-
84099999,99999,1.0
8785
84000001,01000,1.0
8886
84000002,02000,1.0
8987
84000004,04000,1.0

_delphi_utils_python/delphi_utils/geomap.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -413,9 +413,10 @@ def replace_geocode(
413413
df = df.groupby([new_col]).sum().reset_index()
414414
return df
415415

416-
def add_population_column(self, data, geocode_type, geocode_col=None):
416+
def add_population_column(self, data, geocode_type, geocode_col=None, dropna=True):
417417
"""
418-
Appends a population column to a dateframe, based on the FIPS or ZIP code.
418+
Appends a population column to a dataframe, based on the FIPS or ZIP code. If no
419+
dataframe is provided, the full crosswalk from geocode to population is returned.
419420
420421
Parameters
421422
---------
@@ -433,24 +434,26 @@ def add_population_column(self, data, geocode_type, geocode_col=None):
433434
A dataframe with a population column appended.
434435
"""
435436
geocode_col = geocode_type if geocode_col is None else geocode_col
437+
data = data.copy()
436438

437439
if geocode_type not in ["fips", "zip"]:
438440
raise ValueError(
439441
"Only fips and zip geocodes supported. \
440442
For other codes, aggregate those."
441443
)
442444

445+
pop_df = self._load_crosswalk(from_code=geocode_type, to_code="pop")
446+
443447
if not is_string_dtype(data[geocode_col]):
444448
data[geocode_col] = data[geocode_col].astype(str).str.zfill(5)
445449

446-
pop_df = self._load_crosswalk(from_code=geocode_type, to_code="pop")
447-
450+
merge_type = "inner" if dropna else "left"
448451
data_with_pop = (
449-
data.copy()
450-
.merge(pop_df, left_on=geocode_col, right_on=geocode_type, how="inner")
452+
data
453+
.merge(pop_df, left_on=geocode_col, right_on=geocode_type, how=merge_type)
451454
.rename(columns={"pop": "population"})
452455
)
453-
data_with_pop["population"] = data_with_pop["population"].astype(int)
456+
454457
return data_with_pop
455458

456459
@staticmethod

jhu/delphi_jhu/geo.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ def geo_map(df: pd.DataFrame, geo_res: str):
3636
if geo_res == "county":
3737
df.rename(columns={'fips': 'geo_id'}, inplace=True)
3838
elif geo_res == "state":
39+
df = df.set_index("fips")
40+
# Zero out the state FIPS population to avoid double counting.
41+
state_fips_codes = {str(x).zfill(2) + "000" for x in range(1,73)}
42+
subset_state_fips_codes = set(df.index.values) & state_fips_codes
43+
df.loc[subset_state_fips_codes, "population"] = 0
44+
df = df.reset_index()
3945
df = gmpr.replace_geocode(df, "fips", "state_id", new_col="geo_id", date_col="timestamp")
4046
else:
4147
df = gmpr.replace_geocode(df, "fips", geo_res, new_col="geo_id", date_col="timestamp")

jhu/delphi_jhu/pull.py

Lines changed: 73 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,67 @@
11
# -*- coding: utf-8 -*-
22
"""Functions to pull data from JHU website."""
33

4-
import re
54
import pandas as pd
65
import numpy as np
76
from delphi_utils import GeoMapper
87

9-
def detect_date_col(col_name: str):
10-
"""determine if column name is a date"""
11-
date_match = re.match(r'\d{1,2}\/\d{1,2}\/\d{1,2}', col_name)
12-
if date_match:
13-
return True
14-
return False
158

16-
def pull_jhu_data(base_url: str, metric: str, pop_df: pd.DataFrame) -> pd.DataFrame:
9+
def download_data(base_url: str, metric: str) -> pd.DataFrame:
10+
"""
11+
Downloads the data from the JHU repo, extracts the UID and the date columns, and
12+
enforces the date datatype on the the time column.
13+
"""
14+
# Read data
15+
df = pd.read_csv(base_url.format(metric=metric))
16+
# Keep the UID and the time series columns only
17+
# The regex filters for columns with the date format MM-DD-YY or M-D-YY
18+
df = df.filter(regex=r"\d{1,2}\/\d{1,2}\/\d{2}|UID").melt(
19+
id_vars=["UID"], var_name="timestamp", value_name="cumulative_counts"
20+
)
21+
df["timestamp"] = pd.to_datetime(df["timestamp"])
22+
return df
23+
24+
25+
def create_diffs_column(df: pd.DataFrame) -> pd.DataFrame:
26+
"""
27+
Using the cumulative_counts column from the dataframe, partitions the dataframe
28+
into separate time-series based on fips, and then computes pairwise differences
29+
of the cumulative values to get the incidence values. Boundary cases are handled
30+
by zero-filling the day prior.
31+
"""
32+
# Take time-diffs in each geo_code partition
33+
df = df.set_index(["fips", "timestamp"])
34+
df["new_counts"] = df.groupby(level=0)["cumulative_counts"].diff()
35+
# Fill the NA value for the first date of each partition with the cumulative value that day
36+
# (i.e. pretend the cumulative count the day before was 0)
37+
na_value_mask = df["new_counts"].isna()
38+
df.loc[na_value_mask, "new_counts"] = df.loc[na_value_mask, "cumulative_counts"]
39+
df = df.reset_index()
40+
return df
41+
42+
43+
def sanity_check_data(df: pd.DataFrame) -> pd.DataFrame:
44+
"""
45+
Perform a final set of sanity checks on the data.
46+
"""
47+
days_by_fips = df.groupby("fips").count()["cumulative_counts"].unique()
48+
unique_days = df["timestamp"].unique()
49+
50+
# each FIPS has same number of rows
51+
if (len(days_by_fips) > 1) or (days_by_fips[0] != len(unique_days)):
52+
raise ValueError("Differing number of days by fips")
53+
54+
min_timestamp = min(unique_days)
55+
max_timestamp = max(unique_days)
56+
n_days = (max_timestamp - min_timestamp) / np.timedelta64(1, "D") + 1
57+
if n_days != len(unique_days):
58+
raise ValueError(
59+
f"Not every day between {min_timestamp} and "
60+
"{max_timestamp} is represented."
61+
)
62+
63+
64+
def pull_jhu_data(base_url: str, metric: str, gmpr: GeoMapper) -> pd.DataFrame:
1765
"""Pulls the latest Johns Hopkins CSSE data, and conforms it into a dataset
1866
1967
The output dataset has:
@@ -29,92 +77,37 @@ def pull_jhu_data(base_url: str, metric: str, pop_df: pd.DataFrame) -> pd.DataFr
2977
may be negative. This is wholly dependent on the quality of the raw
3078
dataset.
3179
32-
We filter the data such that we only keep rows with valid FIPS, or "FIPS"
33-
codes defined under the exceptions of the README. The current exceptions
34-
include:
35-
36-
- 70002: Dukes County and Nantucket County in Massachusetts, which are
37-
reported together
38-
- 70003: Kansas City, Missouri, which reports counts separately from the
39-
four counties it intesects (Platte, Cass, Clay, Jackson Counties)
80+
We filter the data such that we only keep rows with valid FIPS or "FIPS"
81+
codes defined under the exceptions of the README.
4082
4183
Parameters
4284
----------
4385
base_url: str
44-
Base URL for pulling the JHU CSSE data
86+
Base URL for pulling the JHU CSSE data.
4587
metric: str
4688
One of 'confirmed' or 'deaths'.
47-
pop_df: pd.DataFrame
48-
Read from static file "fips_population.csv".
89+
gmpr: GeoMapper
90+
An instance of the geomapping utility.
4991
5092
Returns
5193
-------
5294
pd.DataFrame
5395
Dataframe as described above.
5496
"""
97+
df = download_data(base_url, metric)
5598

56-
# Read data
57-
df = pd.read_csv(base_url.format(metric=metric))
58-
59-
# FIPS are missing for some nonstandard FIPS
60-
date_cols = [col_name for col_name in df.columns if detect_date_col(col_name)]
61-
keep_cols = date_cols + ['UID']
62-
df = df[keep_cols]
63-
64-
df = df.melt(
65-
id_vars=["UID"],
66-
var_name="timestamp",
67-
value_name="cumulative_counts",
99+
gmpr = GeoMapper()
100+
df = gmpr.replace_geocode(
101+
df, "jhu_uid", "fips", from_col="UID", date_col="timestamp"
68102
)
69-
df["timestamp"] = pd.to_datetime(df["timestamp"])
70103

71-
gmpr = GeoMapper()
72-
df = gmpr.replace_geocode(df, "jhu_uid", "fips", from_col="UID", date_col="timestamp")
73-
74-
# Merge in population LOWERCASE, consistent across confirmed and deaths
75-
# Set population as NAN for fake fips
76-
pop_df.rename(columns={'FIPS':'fips'}, inplace=True)
77-
pop_df['fips'] = pop_df['fips'].astype(int).\
78-
astype(str).str.zfill(5)
79-
df = df.merge(pop_df, on="fips", how='left')
80-
81-
# Add a dummy first row here on day before first day
82-
# code below could be cleaned with groupby.diff
83-
84-
min_ts = min(df["timestamp"])
85-
df_dummy = df.loc[df["timestamp"] == min_ts].copy()
86-
df_dummy.loc[:, "timestamp"] = min_ts - pd.Timedelta(days=1)
87-
df_dummy.loc[:, "cumulative_counts"] = 0
88-
df = pd.concat([df_dummy, df])
89-
# Obtain new_counts
90-
df.sort_values(["fips", "timestamp"], inplace=True)
91-
df["new_counts"] = df["cumulative_counts"].diff() # 1st discrete difference
92-
# Handle edge cases where we diffed across fips
93-
mask = df["fips"] != df["fips"].shift(1)
94-
df.loc[mask, "new_counts"] = np.nan
95-
df.reset_index(inplace=True, drop=True)
104+
# Merge in population, set population as NAN for fake fips
105+
df = gmpr.add_population_column(df, "fips")
106+
df = create_diffs_column(df)
96107

97108
# Final sanity checks
98-
days_by_fips = df.groupby("fips").count()["cumulative_counts"].unique()
99-
unique_days = df["timestamp"].unique()
100-
# each FIPS has same number of rows
101-
if (len(days_by_fips) > 1) or (days_by_fips[0] != len(unique_days)):
102-
raise ValueError("Differing number of days by fips")
103-
min_timestamp = min(unique_days)
104-
max_timestamp = max(unique_days)
105-
n_days = (max_timestamp - min_timestamp) / np.timedelta64(1, "D") + 1
106-
if n_days != len(unique_days):
107-
raise ValueError(
108-
f"Not every day between {min_timestamp} and "
109-
"{max_timestamp} is represented."
110-
)
111-
return df.loc[
112-
df["timestamp"] >= min_ts,
113-
[ # Reorder
114-
"fips",
115-
"timestamp",
116-
"population",
117-
"new_counts",
118-
"cumulative_counts",
119-
],
120-
]
109+
sanity_check_data(df)
110+
111+
# Reorder columns
112+
df = df[["fips", "timestamp", "population", "new_counts", "cumulative_counts"]]
113+
return df

jhu/delphi_jhu/run.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,15 @@
77
from datetime import datetime
88
from itertools import product
99
from functools import partial
10-
from os.path import join
1110

1211
import numpy as np
13-
import pandas as pd
1412
from delphi_utils import (
1513
read_params,
1614
create_export_csv,
1715
S3ArchiveDiffer,
1816
)
1917

18+
from delphi_utils import GeoMapper
2019
from .geo import geo_map
2120
from .pull import pull_jhu_data
2221
from .smooth import (
@@ -72,7 +71,6 @@ def run_module():
7271
export_start_date = params["export_start_date"]
7372
export_dir = params["export_dir"]
7473
base_url = params["base_url"]
75-
static_file_dir = params["static_file_dir"]
7674
cache_dir = params["cache_dir"]
7775

7876
if len(params["bucket_name"]) > 0:
@@ -84,12 +82,8 @@ def run_module():
8482
else:
8583
arch_diff = None
8684

87-
pop_df = pd.read_csv(
88-
join(static_file_dir, "fips_population.csv"),
89-
dtype={"fips": float, "population": float},
90-
)
91-
92-
dfs = {metric: pull_jhu_data(base_url, metric, pop_df) for metric in METRICS}
85+
gmpr = GeoMapper()
86+
dfs = {metric: pull_jhu_data(base_url, metric, gmpr) for metric in METRICS}
9387
for metric, geo_res, sensor, smoother in product(
9488
METRICS, GEO_RESOLUTIONS, SENSORS, SMOOTHERS):
9589
print(metric, geo_res, sensor, smoother)

0 commit comments

Comments
 (0)