Skip to content

Remove cruise ships FIPS codes, 88888 and 99999, from JHU #335

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 71 commits into from
Oct 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
911c0e0
Added archiving diffing utility
eujing Aug 12, 2020
5d3b8ec
Updated unit tests
eujing Aug 12, 2020
2d3fb80
Updated ansible template
eujing Aug 12, 2020
f1cace3
Merge pull request #235 from cmu-delphi/main
krivard Aug 27, 2020
a24dc4d
add code
Aug 18, 2020
e833515
fixed an error in unit tests
Aug 18, 2020
5f7ffa1
removed unused files
Aug 18, 2020
870fd20
changed format of intermediate output dataframes
Aug 18, 2020
b9a788f
uncommented code for pulling raw data
jingjtang Aug 18, 2020
ad1d4d5
Added code for using geomap utils
Aug 28, 2020
d8c7019
Merge pull request #234 from cmu-delphi/diff-uploads-usafacts
krivard Aug 31, 2020
9b48cce
[sir-complainsalot] Add grace period and dry run
krivard Sep 9, 2020
bcda25f
Reduce df memory usage, and vectorize more
eujing Oct 7, 2020
4a58a16
Merge branch 'main' of github.com:cmu-delphi/covidcast-indicators int…
sgsmob Oct 8, 2020
c687428
Merge branch 'main' of github.com:cmu-delphi/covidcast-indicators int…
sgsmob Oct 9, 2020
d357d01
Merge pull request #305 from cmu-delphi/deploy-jhu
krivard Oct 9, 2020
81be4f4
update code for geomapping using utils
Oct 12, 2020
eec5f50
fix typo in documentation
huisaddison Oct 12, 2020
0c502f7
fix typo in documentation
huisaddison Oct 12, 2020
37edbf5
fix typo in documentation
huisaddison Oct 12, 2020
3a0f9a9
fix typo in documentation
huisaddison Oct 12, 2020
a2d5768
fix typo in documentation
huisaddison Oct 12, 2020
ed4c306
Merge branch 'main' of github.com:cmu-delphi/covidcast-indicators int…
sgsmob Oct 12, 2020
6a616c4
refactor safegraph.process to pave the way for multifile processing
sgsmob Oct 12, 2020
45307ad
tests for finding the file names in the past week
sgsmob Oct 12, 2020
efdf3fd
testing process_window
sgsmob Oct 13, 2020
7ed90e1
comments and formatting for pylint compliance
sgsmob Oct 13, 2020
d0151e8
docstring updates
sgsmob Oct 13, 2020
8918ada
lint compliance in test cases
sgsmob Oct 13, 2020
6b24185
move location of VALID_GEO_RESOLUTIONS
sgsmob Oct 13, 2020
cf6f4c1
Merge pull request #287 from cmu-delphi/sir-dryrun
krivard Oct 13, 2020
10d3711
file existence checking in process
sgsmob Oct 13, 2020
8604ec1
Merge branch 'main' of github.com:cmu-delphi/covidcast-indicators int…
sgsmob Oct 13, 2020
8c665c6
refactor CSV name
sgsmob Oct 13, 2020
e0ed614
add test for process
sgsmob Oct 14, 2020
8db87c5
fix line too long
sgsmob Oct 14, 2020
e6502e5
remove extraneous prints
sgsmob Oct 15, 2020
922432b
documentation on process_file wrapper
sgsmob Oct 15, 2020
4debc31
fix broken usafacts tests to read from the proper directories
sgsmob Oct 15, 2020
4563d1f
Merge branch 'main' into safegraph_patterns
Oct 16, 2020
1633036
uncomment code for using geo utils
Oct 16, 2020
6358975
fixed errors in geo mapping functions
Oct 16, 2020
0909ca7
fixed errors in geo mapping function and updated the unit tests
Oct 16, 2020
c4808fa
Merge branch 'safegraph_patterns' of https://github.com/cmu-delphi/co…
Oct 16, 2020
424ceb4
deleted extra keyword argument in process
Oct 16, 2020
3495bee
added a dry-run mode
Oct 16, 2020
7b7934b
updated unit tests
Oct 16, 2020
986e585
fixed the dir to sample data
Oct 16, 2020
b0ae5bb
added static folder and params.json for unit tests
Oct 16, 2020
4ce4630
fix whitespacing for linter
huisaddison Oct 17, 2020
3d76de4
remove unused imports
huisaddison Oct 17, 2020
8091221
Add a gap detector to Sir Complainsalot
capnrefsmmat Oct 17, 2020
849263c
Formatting fix
capnrefsmmat Oct 17, 2020
39df546
Add hospital admissions, USAFacts to Sir Complainsalot
capnrefsmmat Oct 17, 2020
51c0c03
make new receiving directory in test directory
sgsmob Oct 19, 2020
261503a
Merge pull request #225 from cmu-delphi/safegraph_patterns
krivard Oct 19, 2020
7c8e702
Merge branch 'main' of github.com:cmu-delphi/covidcast-indicators int…
sgsmob Oct 19, 2020
90a22c4
don't overwrite files
sgsmob Oct 19, 2020
5117b9d
remove unused import
sgsmob Oct 19, 2020
509f4d7
substring testing with 'in'
sgsmob Oct 19, 2020
9d6394a
update tests to process to include wip and 7d_avg signals
sgsmob Oct 19, 2020
470e95d
added wip signals to params file
sgsmob Oct 19, 2020
0783ebd
Merge pull request #327 from cmu-delphi/sir-gapdetector
krivard Oct 19, 2020
ee49484
Merge pull request #309 from sgsmob/weekday
krivard Oct 20, 2020
bb8e17e
Merge pull request #331 from cmu-delphi/deploy-safegraph
krivard Oct 20, 2020
0115538
Issue template for feature release
krivard Oct 20, 2020
44be069
Change auto-assign of release tasks
krivard Oct 20, 2020
a1a1b50
Temporarily skip linting in Jenkins
korlaxxalrok Oct 20, 2020
b0bb289
Merge pull request #314 from sgsmob/fix_usa_tests
krivard Oct 20, 2020
317a4d5
Merge pull request #334 from cmu-delphi/deploy-usafacts
krivard Oct 20, 2020
84049be
Remove cruise ships FIPS codes, 88888 and 99999
dshemetov Oct 20, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions .github/ISSUE_TEMPLATE/feature_release.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
---
name: Feature release
about: Begin the finishing work for features ready to be included in a release
title: 'Release NEW_THING'
labels: 'release'
assignees: 'benjaminysmith'
---

- [Link to issue]()
- [Link to PR]()
- Proposed release version: <!-- eg 1.12 -->

<!-- Additional information about the feature: -->


<!-- relevant for most work -->

- [ ] API [documentation](https://github.com/cmu-delphi/delphi-epidata/tree/main/docs/api) and/or [changelog](https://github.com/cmu-delphi/delphi-epidata/blob/main/docs/api/covidcast_changelog.md)
- [ ] API mailing list notification

<!-- relevant for new signals -->

- [ ] Statistical review (usually [correlations](https://github.com/cmu-delphi/covidcast/tree/main/docs/R-notebooks))
- [ ] Signal / source name review (usually [Roni](https://docs.google.com/document/d/10hGd4Evce4lJ4VkWaQEKFQxvmw2P4xyYGtIAWF52Sf8/edit?usp=sharing))

<!-- relevant for new map signals -->

- [ ] Visual review
- [ ] [Signal description pop-up text](https://docs.google.com/document/d/1kDqRg8EaI4WQXMaUUbbCGPlsUqEql8kgXCNt6AvMA9I/edit?usp=sharing) review
- [ ] [Map release notes](https://docs.google.com/document/d/1BpxGgIma_Lkd2kxtwEo2DBdHQ3zk6dHRz-leUIRlOIA/edit?usp=sharing)
8 changes: 7 additions & 1 deletion _delphi_utils_python/data_proc/geomap/geo_data_proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,12 @@ def create_jhu_uid_fips_crosswalk():
{"jhu_uid": "63072999", "fips": "72000", "weight": 1.0},
]
)
cruise_ships = pd.DataFrame(
[
{"jhu_uid": "84088888", "fips": "88888", "weight": 1.0},
{"jhu_uid": "84099999", "fips": "99999", "weight": 1.0},
]
)

jhu_df = (
pd.read_csv(JHU_FIPS_URL, dtype={"UID": str, "FIPS": str})
Expand All @@ -234,7 +240,7 @@ def create_jhu_uid_fips_crosswalk():
# Drop the JHU UIDs that were hand-modified
dup_ind = jhu_df["jhu_uid"].isin(
pd.concat(
[hand_additions, unassigned_states, out_of_state, puerto_rico_unassigned]
[hand_additions, unassigned_states, out_of_state, puerto_rico_unassigned, cruise_ships]
)["jhu_uid"].values
)
jhu_df.drop(jhu_df.index[dup_ind], inplace=True)
Expand Down
2 changes: 0 additions & 2 deletions _delphi_utils_python/delphi_utils/data/jhu_uid_fips_table.csv
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ jhu_uid,fips,weight
63072149,72149,1.0
63072151,72151,1.0
63072153,72153,1.0
84088888,88888,1.0
84099999,99999,1.0
84000001,01000,1.0
84000002,02000,1.0
84000004,04000,1.0
Expand Down
7 changes: 0 additions & 7 deletions ansible/files/usafacts-params-prod.json

This file was deleted.

12 changes: 12 additions & 0 deletions ansible/templates/usafacts-params-prod.json.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"export_start_date": "latest",
"static_file_dir": "./static",
"export_dir": "/common/covidcast/receiving/usa-facts",
"cache_dir": "./cache",
"base_url": "https://usafactsstatic.blob.core.windows.net/public/data/covid-19/covid_{metric}_usafacts.csv",
"aws_credentials": {
"aws_access_key_id": "{{ delphi_aws_access_key_id }}",
"aws_secret_access_key": "{{ delphi_aws_secret_access_key }}"
},
"bucket_name": "delphi-covidcast-indicator-output"
}
4 changes: 3 additions & 1 deletion jenkins/usafacts-jenkins-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ local_indicator="usafacts"
cd "${WORKSPACE}/${local_indicator}" || exit

# Linter
env/bin/pylint delphi_"${local_indicator}"
#env/bin/pylint delphi_"${local_indicator}"
echo "Skip linting because we have weird breakage :( \
TODO: https://github.com/cmu-delphi/covidcast-indicators/issues/333"

# Unit tests and code coverage
cd tests || exit && \
Expand Down
2 changes: 1 addition & 1 deletion safegraph/delphi_safegraph/constants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@

"""Constants for constructing Safegraph indicator."""

HOME_DWELL = 'median_home_dwell_time'
COMPLETELY_HOME = 'completely_home_prop'
Expand Down
2 changes: 2 additions & 0 deletions safegraph/delphi_safegraph/geo.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
"""Geo location constants for constructing Safegraph indicator."""

# https://code.activestate.com/recipes/577775-state-fips-codes-dict/
STATE_TO_FIPS = {
Expand Down Expand Up @@ -62,3 +63,4 @@

FIPS_TO_STATE = {v: k.lower() for k, v in STATE_TO_FIPS.items()}

VALID_GEO_RESOLUTIONS = ('county', 'state')
178 changes: 136 additions & 42 deletions safegraph/delphi_safegraph/process.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,58 @@
import covidcast
"""Internal functions for creating Safegraph indicator."""
import datetime
import os
from typing import List
import numpy as np
import pandas as pd
import covidcast

from .constants import HOME_DWELL, COMPLETELY_HOME, FULL_TIME_WORK, PART_TIME_WORK
from .geo import FIPS_TO_STATE
from .geo import FIPS_TO_STATE, VALID_GEO_RESOLUTIONS

# Magic number for modular arithmetic; CBG -> FIPS
MOD = 10000000

# Base file name for raw data CSVs.
CSV_NAME = 'social-distancing.csv.gz'

def validate(df):
"""Confirms that a data frame has only one date."""
timestamps = df['date_range_start'].apply(date_from_timestamp)
assert len(timestamps.unique()) == 1


def date_from_timestamp(timestamp) -> datetime.date:
"""Extracts the date from a timestamp beginning with {YYYY}-{MM}-{DD}T."""
return datetime.date.fromisoformat(timestamp.split('T')[0])


def files_in_past_week(current_filename) -> List[str]:
"""Constructs file paths from previous 6 days.
Parameters
----------
current_filename: str
name of CSV file. Must be of the form
{path}/{YYYY}/{MM}/{DD}/{YYYY}-{MM}-{DD}-{CSV_NAME}
Returns
-------
List of file names corresponding to the 6 days prior to YYYY-MM-DD.
"""
path, year, month, day, _ = current_filename.rsplit('/', 4)
current_date = datetime.date(int(year), int(month), int(day))
one_day = datetime.timedelta(days=1)
for _ in range(1, 7):
current_date = current_date - one_day
date_str = current_date.isoformat()
date_path = date_str.replace('-', '/')
new_filename = f'{path}/{date_path}/{date_str}-{CSV_NAME}'
yield new_filename


def add_suffix(signals, suffix):
"""Adds `suffix` to every element of `signals`."""
return [s + suffix for s in signals]


def add_prefix(signal_names, wip_signal, prefix: str):
"""Adds prefix to signal if there is a WIP signal
Parameters
Expand Down Expand Up @@ -42,7 +87,7 @@ def add_prefix(signal_names, wip_signal, prefix: str):
]
raise ValueError("Supply True | False or '' or [] | list()")

# Check if the signal name is public

def public_signal(signal_):
"""Checks if the signal name is already public using COVIDcast
Parameters
Expand Down Expand Up @@ -89,32 +134,29 @@ def construct_signals(cbg_df, signal_names):
"""

# Preparation
cbg_df['timestamp'] = cbg_df['date_range_start'].apply(
lambda x: str(x).split('T')[0])
cbg_df['county_fips'] = (cbg_df['origin_census_block_group'] // MOD).apply(
lambda x: f'{int(x):05d}')

# Transformation: create signal not available in raw data
for signal in signal_names:
if signal.endswith(FULL_TIME_WORK):
if FULL_TIME_WORK in signal:
cbg_df[signal] = (cbg_df['full_time_work_behavior_devices']
/ cbg_df['device_count'])
elif signal.endswith(COMPLETELY_HOME):
elif COMPLETELY_HOME in signal:
cbg_df[signal] = (cbg_df['completely_home_device_count']
/ cbg_df['device_count'])
elif signal.endswith(PART_TIME_WORK):
elif PART_TIME_WORK in signal:
cbg_df[signal] = (cbg_df['part_time_work_behavior_devices']
/ cbg_df['device_count'])
elif signal.endswith(HOME_DWELL):
elif HOME_DWELL in signal:
cbg_df[signal] = (cbg_df['median_home_dwell_time'])


# Subsetting
return cbg_df[['timestamp', 'county_fips'] + signal_names]
return cbg_df[['county_fips'] + signal_names]


def aggregate(df, signal_names, geo_resolution='county'):
'''Aggregate signals to appropriate resolution and produce standard errors.
"""Aggregate signals to appropriate resolution and produce standard errors.
Parameters
----------
df: pd.DataFrame
Expand All @@ -129,27 +171,22 @@ def aggregate(df, signal_names, geo_resolution='county'):
pd.DataFrame:
DataFrame with one row per geo_id, with columns for the individual
signals, standard errors, and sample sizes.
'''
"""
# Prepare geo resolution
GEO_RESOLUTION = ('county', 'state')
if geo_resolution == 'county':
df['geo_id'] = df['county_fips']
elif geo_resolution == 'state':
df['geo_id'] = df['county_fips'].apply(lambda x:
FIPS_TO_STATE[x[:2]])
else:
raise ValueError(f'`geo_resolution` must be one of {GEO_RESOLUTION}.')
raise ValueError(
f'`geo_resolution` must be one of {VALID_GEO_RESOLUTIONS}.')

# Aggregation and signal creation
df_mean = df.groupby(['geo_id', 'timestamp'])[
signal_names
].mean()
df_sd = df.groupby(['geo_id', 'timestamp'])[
signal_names
].std()
df_n = df.groupby(['geo_id', 'timestamp'])[
signal_names
].count()
grouped_df = df.groupby(['geo_id'])[signal_names]
df_mean = grouped_df.mean()
df_sd = grouped_df.std()
df_n = grouped_df.count()
agg_df = pd.DataFrame.join(df_mean, df_sd,
lsuffix='_mean', rsuffix='_sd')
agg_df = pd.DataFrame.join(agg_df, df_n.rename({
Expand All @@ -161,39 +198,96 @@ def aggregate(df, signal_names, geo_resolution='county'):
return agg_df.reset_index()


def process(fname, signal_names, geo_resolutions, export_dir):
'''Process an input census block group-level CSV and export it. Assumes
that the input file has _only_ one date of data.
def process_window(df_list: List[pd.DataFrame],
signal_names: List[str],
geo_resolutions: List[str],
export_dir: str):
"""Processes a list of input census block group-level data frames as a
single data set and exports it. Assumes each data frame has _only_ one
date of data.
Parameters
----------
export_dir
path where the output files are saved
signal_names : List[str]
cbg_df: pd.DataFrame
list of census block group-level frames.
signal_names: List[str]
signal names to be processed
fname: str
Input filename.
geo_resolutions: List[str]
List of geo resolutions to export the data.
export_dir
path where the output files are saved
Returns
-------
None
'''
cbg_df = construct_signals(pd.read_csv(fname), signal_names)
unique_date = cbg_df['timestamp'].unique()
if len(unique_date) != 1:
raise ValueError(f'More than one timestamp found in input file {fname}.')
date = unique_date[0].replace('-', '')
None. One file is written per (signal, resolution) pair containing the
aggregated data from `df`.
"""
for df in df_list:
validate(df)
date = date_from_timestamp(df_list[0].at[0, 'date_range_start'])
cbg_df = pd.concat(construct_signals(df, signal_names) for df in df_list)
for geo_res in geo_resolutions:
df = aggregate(cbg_df, signal_names, geo_res)
aggregated_df = aggregate(cbg_df, signal_names, geo_res)
for signal in signal_names:
df_export = df[
df_export = aggregated_df[
['geo_id']
+ [f'{signal}_{x}' for x in ('mean', 'se', 'n')]
].rename({
].rename({
f'{signal}_mean': 'val',
f'{signal}_se': 'se',
f'{signal}_n': 'sample_size',
}, axis=1)
df_export.to_csv(f'{export_dir}/{date}_{geo_res}_{signal}.csv',
na_rep='NA',
index=False, )


def process(current_filename: str,
previous_filenames: List[str],
signal_names: List[str],
wip_signal,
geo_resolutions: List[str],
export_dir: str):
"""Creates and exports signals corresponding both to a single day as well
as averaged over the previous week.
Parameters
----------
current_filename: str
path to file holding the target date's data.
previous_filenames: List[str]
paths to files holding data from each day in the week preceding the
target date.
signal_names: List[str]
signal names to be processed for a single date.
A second version of each such signal named {SIGNAL}_7d_avg will be
created averaging {SIGNAL} over the past 7 days.
wip_signal : List[str] or bool
a list of wip signals: [], OR
all signals in the registry: True OR
only signals that have never been published: False
geo_resolutions: List[str]
List of geo resolutions to export the data.
export_dir
path where the output files are saved.
Returns
-------
None. For each (signal, resolution) pair, one file is written for the
single date values to {export_dir}/{date}_{resolution}_{signal}.csv and
one for the data averaged over the previous week to
{export_dir}/{date}_{resolution}_{signal}_7d_avg.csv.
"""
past_week = [pd.read_csv(current_filename)]
for fname in previous_filenames:
if os.path.exists(fname):
past_week.append(pd.read_csv(fname))

# First process the current file alone...
process_window(past_week[:1],
add_prefix(signal_names, wip_signal, 'wip_'),
geo_resolutions,
export_dir)
# ...then as part of the whole window.
process_window(past_week,
add_prefix(add_suffix(signal_names, '_7d_avg'),
wip_signal,
'wip_'),
geo_resolutions,
export_dir)
Loading