Skip to content

API server code health improvements #1041

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Dec 2, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 61 additions & 41 deletions src/acquisition/afhsb/afhsb_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,60 +14,73 @@
import csv
import os

import sas7bdat
import pickle
import sas7bdat
import epiweeks as epi


DATAPATH = '/home/automation/afhsb_data'
SOURCE_DIR = DATAPATH
TARGET_DIR = DATAPATH
SOURCE_DIR = DATAPATH
TARGET_DIR = DATAPATH

INVALID_DMISIDS = set()

def get_flu_cat(dx):
# flu1 (influenza)
if (len(dx) == 0): return None
if len(dx) == 0:
return None
dx = dx.capitalize()
if (dx.isnumeric()):
if dx.isnumeric():
for prefix in ["487", "488"]:
if (dx.startswith(prefix)): return 1
if dx.startswith(prefix):
return 1
for i in range(0, 7):
prefix = str(480 + i)
if (dx.startswith(prefix)): return 2
if dx.startswith(prefix):
return 2
for i in range(0, 7):
prefix = str(460 + i)
if (dx.startswith(prefix)): return 3
if dx.startswith(prefix):
return 3
for prefix in ["07999", "3829", "7806", "7862"]:
if (dx.startswith(prefix)): return 3
if dx.startswith(prefix):
return 3
elif (dx[0].isalpha() and dx[1:].isnumeric()):
for prefix in ["J09", "J10", "J11"]:
if (dx.startswith(prefix)): return 1
if dx.startswith(prefix):
return 1
for i in range(12, 19):
prefix = "J{}".format(i)
if (dx.startswith(prefix)): return 2
if dx.startswith(prefix):
return 2
for i in range(0, 7):
prefix = "J0{}".format(i)
if (dx.startswith(prefix)): return 3
if dx.startswith(prefix):
return 3
for i in range(20, 23):
prefix = "J{}".format(i)
if (dx.startswith(prefix)): return 3
if dx.startswith(prefix):
return 3
for prefix in ["J40", "R05", "H669", "R509", "B9789"]:
if (dx.startswith(prefix)): return 3
if dx.startswith(prefix):
return 3
else:
return None

def aggregate_data(sourcefile, targetfile):
reader = sas7bdat.SAS7BDAT(os.path.join(SOURCE_DIR, sourcefile), skip_header=True)
# map column names to column indices
COL2IDX = {column.name.decode('utf-8'): column.col_id for column in reader.columns}
def get_field(row, column): return row[COL2IDX[column]]
col_2_idx = {column.name.decode('utf-8'): column.col_id for column in reader.columns}

def get_field(row, column):
return row[col_2_idx[column]]

def row2flu(row):
for i in range(1, 9):
dx = get_field(row, "dx{}".format(i))
flu_cat = get_flu_cat(dx)
if (flu_cat != None): return flu_cat
if flu_cat is not None:
return flu_cat
return 0

def row2epiweek(row):
Expand All @@ -77,28 +90,30 @@ def row2epiweek(row):
year, week_num = week_tuple[0], week_tuple[1]
return year, week_num

results_dict = dict()
for r, row in enumerate(reader):
results_dict = {}
for _, row in enumerate(reader):
# if (r >= 1000000): break
if (get_field(row, 'type') != "Outpt"): continue
if get_field(row, 'type') != "Outpt":
continue
year, week_num = row2epiweek(row)
dmisid = get_field(row, 'DMISID')
flu_cat = row2flu(row)

key_list = [year, week_num, dmisid, flu_cat]
curr_dict = results_dict
for i, key in enumerate(key_list):
if (i == len(key_list) - 1):
if (not key in curr_dict): curr_dict[key] = 0
if i == len(key_list) - 1:
if key not in curr_dict:
curr_dict[key] = 0
curr_dict[key] += 1
else:
if (not key in curr_dict): curr_dict[key] = dict()
if key not in curr_dict:
curr_dict[key] = {}
curr_dict = curr_dict[key]

results_path = os.path.join(TARGET_DIR, targetfile)
with open(results_path, 'wb') as f:
pickle.dump(results_dict, f, pickle.HIGHEST_PROTOCOL)
return


################# Functions for geographical information ####################
Expand All @@ -122,7 +137,7 @@ def format_dmisid_csv(filename, target_name):

src_csv = open(src_path, "r", encoding='utf-8-sig')
reader = csv.DictReader(src_csv)

dst_csv = open(dst_path, "w")
fieldnames = ['dmisid', 'country', 'state', 'zip5']
writer = csv.DictWriter(dst_csv, fieldnames=fieldnames)
Expand All @@ -132,9 +147,11 @@ def format_dmisid_csv(filename, target_name):

for row in reader:
country2 = row['Facility ISO Country Code']
if (country2 == ""): country3 = ""
elif (not country2 in country_mapping):
for key in row.keys(): print(key, row[key])
if country2 == "":
country3 = ""
elif country2 not in country_mapping:
for key in row.keys():
print(key, row[key])
continue
else:
country3 = country_mapping[country2]
Expand All @@ -149,6 +166,7 @@ def dmisid():
target_name = "simple_DMISID_FY2018.csv"
format_dmisid_csv(filename, target_name)


cen2states = {'cen1': {'CT', 'ME', 'MA', 'NH', 'RI', 'VT'},
'cen2': {'NJ', 'NY', 'PA'},
'cen3': {'IL', 'IN', 'MI', 'OH', 'WI'},
Expand All @@ -175,7 +193,7 @@ def state2region(D):
for region in D.keys():
states = D[region]
for state in states:
assert(not state in results)
assert state not in results
results[state] = region
return results

Expand Down Expand Up @@ -204,7 +222,7 @@ def write_afhsb_csv(period):
with open(os.path.join(TARGET_DIR, "{}.csv".format(period)), 'w') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()

i = 0
for year in sorted(results_dict.keys()):
year_dict = results_dict[year]
Expand All @@ -217,11 +235,12 @@ def write_afhsb_csv(period):
i += 1
epiweek = int("{}{:02d}".format(year, week))
flu_type = flu_mapping[flu]

row = {"epiweek": epiweek, "dmisid": None if (not dmisid.isnumeric()) else dmisid,
"flu_type": flu_type, "visit_sum": visit_sum, "id": i}
writer.writerow(row)
if (i % 100000 == 0): print(row)
if i % 100000 == 0:
print(row)

def dmisid_start_time_from_file(filename):
starttime_record = dict()
Expand All @@ -230,7 +249,7 @@ def dmisid_start_time_from_file(filename):
for row in reader:
dmisid = row['dmisid']
epiweek = int(row['epiweek'])
if (not dmisid in starttime_record):
if dmisid not in starttime_record:
starttime_record[dmisid] = epiweek
else:
starttime_record[dmisid] = min(epiweek, starttime_record[dmisid])
Expand All @@ -241,7 +260,7 @@ def dmisid_start_time():
record2 = dmisid_start_time_from_file(os.path.join(TARGET_DIR, "13to17.csv"))
record = record1
for dmisid, epiweek in record2.items():
if (dmisid in record):
if dmisid in record:
record[dmisid] = min(record[dmisid], epiweek)
else:
record[dmisid] = epiweek
Expand All @@ -261,10 +280,10 @@ def fillin_zero_to_csv(period, dmisid_start_record):
dmisid = row['dmisid']
flu_type = row['flu_type']
visit_sum = row['visit_sum']
if (not epiweek in results_dict):
if epiweek not in results_dict:
results_dict[epiweek] = dict()
week_dict = results_dict[epiweek]
if (not dmisid in week_dict):
if dmisid not in week_dict:
week_dict[dmisid] = dict()
dmisid_dict = week_dict[dmisid]
dmisid_dict[flu_type] = visit_sum
Expand All @@ -277,14 +296,15 @@ def fillin_zero_to_csv(period, dmisid_start_record):
week_dict = results_dict[epiweek]
for dmisid in dmisid_group:
start_week = dmisid_start_record[dmisid]
if (start_week > epiweek): continue
if start_week > epiweek:
continue

if (not dmisid in week_dict):
if dmisid not in week_dict:
week_dict[dmisid] = dict()

dmisid_dict = week_dict[dmisid]
for flutype in flutype_group:
if (not flutype in dmisid_dict):
if flutype not in dmisid_dict:
dmisid_dict[flutype] = 0

# Write to csv files
Expand All @@ -301,7 +321,7 @@ def fillin_zero_to_csv(period, dmisid_start_record):
row = {"id": i, "epiweek": epiweek, "dmisid": dmisid,
"flu_type": flutype, "visit_sum": visit_sum}
writer.writerow(row)
if (i % 100000 == 0):
if i % 100000 == 0:
print(row)
i += 1
print("Wrote {} rows".format(i))
Expand All @@ -328,4 +348,4 @@ def main():


if __name__ == '__main__':
main()
main()
2 changes: 1 addition & 1 deletion src/acquisition/afhsb/afhsb_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def init_all_tables(datapath):
raw_table_name = 'afhsb_{}_raw'.format(period)
state_table_name = 'afhsb_{}_state'.format(period)
region_table_name = 'afhsb_{}_region'.format(period)

init_raw_data(raw_table_name, os.path.join(datapath, "filled_{}.csv".format(period)))
agg_by_state(raw_table_name, state_table_name)
agg_by_region(state_table_name, region_table_name)
Expand Down
1 change: 1 addition & 0 deletions src/acquisition/afhsb/afhsb_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ def main():
afhsb_sql.init_all_tables(tmp_datapath)
# (Temporary parent directory should be deleted automatically.)


if __name__ == '__main__':
main()
5 changes: 0 additions & 5 deletions src/acquisition/cdcp/cdc_dropbox_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
"""

# standard library
import argparse
import datetime
from zipfile import ZIP_DEFLATED, ZipFile

Expand Down Expand Up @@ -149,10 +148,6 @@ def fetch_data():


def main():
# args and usage
parser = argparse.ArgumentParser()
args = parser.parse_args()

# fetch new data
fetch_data()

Expand Down
10 changes: 3 additions & 7 deletions src/acquisition/cdcp/cdc_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,11 @@

# third party
import mysql.connector
import numpy as np

# first party
from . import cdc_upload
import delphi.operations.secrets as secrets
from delphi.utils.epidate import EpiDate
import delphi.utils.epiweek as flu
from . import cdc_upload


def get_num_hits(cur, epiweek, state, page):
Expand All @@ -95,8 +93,7 @@ def get_num_hits(cur, epiweek, state, page):
pass
if num is None:
return 0
else:
return num
return num


def get_total_hits(cur, epiweek, state):
Expand All @@ -114,8 +111,7 @@ def get_total_hits(cur, epiweek, state):
pass
if total is None:
raise Exception('missing data for %d-%s' % (epiweek, state))
else:
return total
return total


def store_result(cur, epiweek, state, num1, num2, num3, num4, num5, num6, num7, num8, total):
Expand Down
2 changes: 0 additions & 2 deletions src/acquisition/cdcp/cdc_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,13 @@
import io
import os
import shutil
import sys
from zipfile import ZipFile

# third party
import mysql.connector

# first party
import delphi.operations.secrets as secrets
import delphi.utils.epiweek as flu


STATES = {
Expand Down
1 change: 0 additions & 1 deletion src/acquisition/covid_hosp/common/network.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# third party
import pandas
import requests


class Network:
Expand Down
2 changes: 0 additions & 2 deletions src/acquisition/covid_hosp/common/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@
"""

# standard library
import json
from pathlib import Path
from unittest.mock import Mock

# third party
import pandas
Expand Down
2 changes: 0 additions & 2 deletions src/acquisition/covid_hosp/state_daily/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
from delphi.epidata.acquisition.covid_hosp.common.database import Columndef
from delphi.epidata.acquisition.covid_hosp.common.utils import Utils

import pandas as pd


class Database(BaseDatabase):

Expand Down
4 changes: 0 additions & 4 deletions src/acquisition/covid_hosp/state_daily/network.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
# third party

import requests

# first party
from delphi.epidata.acquisition.covid_hosp.common.network import Network as BaseNetwork

Expand Down
7 changes: 1 addition & 6 deletions src/acquisition/covid_hosp/state_daily/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@
dataset provided by the US Department of Health & Human Services
via healthdata.gov.
"""
# standard library
import json

# third party
import pandas as pd

# first party
from delphi.epidata.acquisition.covid_hosp.common.utils import Utils
from delphi.epidata.acquisition.covid_hosp.state_daily.database import Database
Expand All @@ -29,5 +23,6 @@ def run(network=Network):

return Utils.update_dataset(Database, network)


# main entry point
Utils.launch_if_main(Update.run, __name__)
Loading