diff --git a/src/acquisition/rvdss/constants.py b/src/acquisition/rvdss/constants.py index 94a30bf04..f06f1d5e2 100644 --- a/src/acquisition/rvdss/constants.py +++ b/src/acquisition/rvdss/constants.py @@ -1,3 +1,5 @@ +from datetime import datetime + # The dataset calls the same viruses, provinces, regions (province groups), # and country by multiple names. Map each of those to a common abbreviation. VIRUSES = { @@ -34,7 +36,7 @@ "saskatchewan":"sk", "alberta": "ab", "british columbia" :"bc", - "yukon" : "yk", + "yukon" : "yt", "northwest territories" : "nt", "nunavut" : "nu", "canada":"ca", @@ -54,6 +56,8 @@ # Construct dashboard and data report URLS. DASHBOARD_BASE_URL = "https://health-infobase.canada.ca/src/data/respiratory-virus-detections/" DASHBOARD_W_DATE_URL = DASHBOARD_BASE_URL + "archive/{date}/" + +# May not need this since we write a function for this in pull_historic DASHBOARD_BASE_URLS_2023_2024_SEASON = ( DASHBOARD_W_DATE_URL.format(date = date) for date in ( @@ -74,6 +78,7 @@ SEASON_BASE_URL = "https://www.canada.ca" ALTERNATIVE_SEASON_BASE_URL = "www.phac-aspc.gc.ca/bid-bmi/dsd-dsm/rvdi-divr/" HISTORIC_SEASON_REPORTS_URL = SEASON_BASE_URL+"/en/public-health/services/surveillance/respiratory-virus-detections-canada/{year_range}.html" +DASHBOARD_ARCHIVED_DATES_URL= "https://health-infobase.canada.ca/src/js/respiratory-virus-detections/ArchiveData.json" # Each URL created here points to a list of all data reports made during that # season, e.g. @@ -82,7 +87,7 @@ # disease data in a dashboard with a static URL. Therefore, this collection # of URLs does _NOT_ need to be updated. It is used for fetching historical # data (for dates on or before June 8, 2024) only. -HISTORIC_SEASON_URL = (HISTORIC_SEASON_REPORTS_URL.format(year_range = year_range) for year_range in +HISTORIC_SEASON_URLS = (HISTORIC_SEASON_REPORTS_URL.format(year_range = year_range) for year_range in ( "2013-2014", "2014-2015", @@ -101,7 +106,12 @@ DASHBOARD_UPDATE_DATE_FILE = "RVD_UpdateDate.csv" DASHBOARD_DATA_FILE = "RVD_WeeklyData.csv" -RESP_COUNTS_OUTPUT_FILE = "respiratory_detections.csv" + +RESP_DETECTIONS_OUTPUT_FILE = "respiratory_detections.csv" POSITIVE_TESTS_OUTPUT_FILE = "positive_tests.csv" +COUNTS_OUTPUT_FILE = "number_of_detections.csv" + +FIRST_WEEK_OF_YEAR = 35 -LAST_WEEK_OF_YEAR = 35 +UPDATE_DATES_FILE = "update_dates.txt" +NOW = datetime.now() diff --git a/src/acquisition/rvdss/database.py b/src/acquisition/rvdss/database.py new file mode 100644 index 000000000..4e1ea1c87 --- /dev/null +++ b/src/acquisition/rvdss/database.py @@ -0,0 +1,121 @@ +""" +=============== +=== Purpose === +=============== + +Stores data provided by rvdss Corp., which contains flu lab test results. +See: rvdss.py + + +======================= +=== Data Dictionary === +======================= + +`rvdss` is the table where rvdss data is stored. ++----------+-------------+------+-----+---------+----------------+ +| Field | Type | Null | Key | Default | Extra | ++----------+-------------+------+-----+---------+----------------+ +| id | int(11) | NO | PRI | NULL | auto_increment | +| location | varchar(8) | NO | MUL | NULL | | +| epiweek | int(11) | NO | MUL | NULL | | +| value | float | NO | | NULL | | ++----------+-------------+------+-----+---------+----------------+ +id: unique identifier for each record +location: hhs1-10 +epiweek: the epiweek during which the queries were executed +value: number of total test records per facility, within each epiweek + +================= +=== Changelog === +================= +2017-12-14: + * add "need update" check + +2017-12-02: + * original version +""" + +# standard library +import argparse + +# third party +import mysql.connector + +# first party +from delphi.epidata.acquisition.rvdss import rvdss +import delphi.operations.secrets as secrets +from delphi.utils.epidate import EpiDate +import delphi.utils.epiweek as flu +from delphi.utils.geo.locations import Locations + +LOCATIONS = Locations.hhs_list +DATAPATH = "/home/automation/rvdss_data" + + +def update(locations, first=None, last=None, force_update=False, load_email=True): + # download and prepare data first + qd = rvdss.rvdssData(DATAPATH, load_email) + if not qd.need_update and not force_update: + print("Data not updated, nothing needs change.") + return + + qd_data = qd.load_csv() + qd_measurements = qd.prepare_measurements(qd_data, start_weekday=4) + qd_ts = rvdss.measurement_to_ts(qd_measurements, 7, startweek=first, endweek=last) + # connect to the database + u, p = secrets.db.epi + cnx = mysql.connector.connect(user=u, password=p, database="epidata") + cur = cnx.cursor() + + def get_num_rows(): + cur.execute("SELECT count(1) `num` FROM `rvdss`") + for (num,) in cur: + pass + return num + + # check from 4 weeks preceeding the last week with data through this week + cur.execute("SELECT max(`epiweek`) `ew0`, yearweek(now(), 6) `ew1` FROM `rvdss`") + for (ew0, ew1) in cur: + ew0 = 200401 if ew0 is None else flu.add_epiweeks(ew0, -4) + ew0 = ew0 if first is None else first + ew1 = ew1 if last is None else last + print(f"Checking epiweeks between {int(ew0)} and {int(ew1)}...") + + # keep track of how many rows were added + rows_before = get_num_rows() + + # check rvdss for new and/or revised data + sql = """ + INSERT INTO + `rvdss` (`location`, `epiweek`, `value`) + VALUES + (%s, %s, %s) + ON DUPLICATE KEY UPDATE + `value` = %s + """ + + total_rows = 0 + + for location in locations: + if location not in qd_ts: + continue + ews = sorted(qd_ts[location].keys()) + num_missing = 0 + for ew in ews: + v = qd_ts[location][ew] + sql_data = (location, ew, v, v) + cur.execute(sql, sql_data) + total_rows += 1 + if v == 0: + num_missing += 1 + if num_missing > 0: + print(f" [{location}] missing {int(num_missing)}/{len(ews)} value(s)") + + # keep track of how many rows were added + rows_after = get_num_rows() + print(f"Inserted {int(rows_after - rows_before)}/{int(total_rows)} row(s)") + + # cleanup + cur.close() + cnx.commit() + cnx.close() diff --git a/src/acquisition/rvdss/rvdss_historic.py b/src/acquisition/rvdss/pull_historic.py similarity index 79% rename from src/acquisition/rvdss/rvdss_historic.py rename to src/acquisition/rvdss/pull_historic.py index ee22f2eed..82ff48910 100644 --- a/src/acquisition/rvdss/rvdss_historic.py +++ b/src/acquisition/rvdss/pull_historic.py @@ -14,14 +14,15 @@ from datetime import datetime, timedelta import math -from delphi.epidata.acquisition.rvdss.constants import ( - DASHBOARD_BASE_URLS_2023_2024_SEASON, HISTORIC_SEASON_URL, - ALTERNATIVE_SEASON_BASE_URL, SEASON_BASE_URL, LAST_WEEK_OF_YEAR, - RESP_COUNTS_OUTPUT_FILE, POSITIVE_TESTS_OUTPUT_FILE +from constants import ( + HISTORIC_SEASON_URLS, + ALTERNATIVE_SEASON_BASE_URL, SEASON_BASE_URL, FIRST_WEEK_OF_YEAR, + DASHBOARD_ARCHIVED_DATES_URL, + DASHBOARD_BASE_URL ) -from delphi.epidata.acquisition.rvdss.utils import ( +from utils import ( abbreviate_virus, abbreviate_geo, create_geo_types, check_date_format, - get_revised_data, get_weekly_data + fetch_dashboard_data,preprocess_table_columns, add_flu_prefix ) #%% Functions @@ -78,7 +79,7 @@ def get_report_date(week,start_year,epi=False): epi - if True, return the date in cdc format (yearweek) """ - if week < LAST_WEEK_OF_YEAR: + if week < FIRST_WEEK_OF_YEAR: year=int(start_year)+1 else: year=int(start_year) @@ -137,9 +138,9 @@ def get_modified_dates(soup,week_end_date): meta_tags=soup.find_all("meta",title="W3CDTF") for tag in meta_tags: if tag.get("name", None) == "dcterms.modified" or tag.get("property", None) == "dcterms.modified": - modified_date = tag.get("content", None) + date_modified = tag.get("content", None) - mod_date = datetime.strptime(modified_date, "%Y-%m-%d") + mod_date = datetime.strptime(date_modified, "%Y-%m-%d") week_date = datetime.strptime(week_end_date, "%Y-%m-%d") diff_days = (mod_date-week_date).days @@ -183,65 +184,13 @@ def deduplicate_rows(table): new_table=table return(new_table) -def add_flu_prefix(flu_subtype): - """ Add the prefix `flu` when only the subtype is reported """ +def drop_ah1_columns(table): + h1n1_column_exists = any([re.search("h1n1",c) for c in table.columns]) + ah1_column_exists = any([re.search(r"ah1\b",c) for c in table.columns]) - pat1 =r"^ah3" - pat2= r"^auns" - pat3= r"^ah1pdm09" - pat4= r"^ah1n1pdm09" - combined_pat = '|'.join((pat1, pat2,pat3,pat4)) - - full_fluname = re.sub(combined_pat, r"flu\g<0>",flu_subtype) - return(full_fluname) - -def make_signal_type_spelling_consistent(signal): - """ - Make the signal type (i.e. percent positive, number tests, total tests) have consistent spelling - Also remove total from signal names - """ - - pat1 = "positive" - pat2 = 'pos' - combined_pat = '|'.join((pat1, pat2)) - - pat3 = r"test\b" - pat4 = 'tested' - combined_pat2 = '|'.join((pat3, pat4)) - - new_signal = re.sub(combined_pat, "positive_tests",signal) - new_signal = re.sub(combined_pat2, "tests",new_signal) - new_signal =re.sub(" *%", "_pct_positive",new_signal) - new_signal = re.sub("total ", "",new_signal) - return(new_signal) - -def preprocess_table_columns(table): - """ - Remove characters like . or * from columns - Abbreviate the viruses in columns - Change some naming of signals in columns (i.e order of hpiv and other) - Change some naming of locations in columns (i.e at instead of atl) - """ - table.columns = [re.sub("\xa0"," ", col) for col in table.columns] # \xa0 to space - table.columns = [re.sub("(.*?)(\.\d+)", "\\1", c) for c in table.columns] # remove .# for duplicated columns - table.columns =[re.sub("\.", "", s)for s in table.columns] #remove periods - table.columns =[re.sub(r"\((all)\)", "", s)for s in table.columns] # remove (all) - table.columns =[re.sub(r"\s*\(|\)", "", s)for s in table.columns] - table.columns = [re.sub(' +', ' ', col) for col in table.columns] # Make any muliple spaces into one space - table.columns = [re.sub(r'\(|\)', '', col) for col in table.columns] # replace () for _ - table.columns = [re.sub(r'/', '_', col) for col in table.columns] # replace / with _ - - table.columns = [re.sub(r"^at\b","atl ",t) for t in table.columns] - table.columns = [re.sub("canada","can",t) for t in table.columns] - - table.columns =[re.sub(r"h1n1 2009 |h1n12009", "ah1n1pdm09", s)for s in table.columns] - table.columns =[abbreviate_virus(col) for col in table.columns] # abbreviate viruses - table.columns = [re.sub(r"flu a","flua",t) for t in table.columns] - table.columns = [re.sub(r"flu b","flub",t) for t in table.columns] - table.columns = [re.sub("flutest","flu test", col) for col in table.columns] - table.columns = [re.sub(r"other hpiv","hpivother",t) for t in table.columns] - - table.columns=[make_signal_type_spelling_consistent(col) for col in table.columns] + if ah1_column_exists and h1n1_column_exists: + column_name_to_drop = list(table.filter(regex=r'ah1\b')) + table.drop(columns = column_name_to_drop,inplace=True) return(table) def create_detections_table(table,modified_date,week_number,week_end_date,start_year): @@ -367,7 +316,7 @@ def create_percent_positive_detection_table(table,modified_date,start_year, flu= return(table) -def get_season_reports(url): +def fetch_one_season_from_report(url): # From the url, go to the main landing page for a season # which contains all the links to each week in the season page=requests.get(url) @@ -382,13 +331,13 @@ def get_season_reports(url): # create tables to hold all the data for the season all_positive_tables=pd.DataFrame() all_number_tables=pd.DataFrame() - all_respiratory_detection_table=pd.DataFrame() + all_respiratory_detection_tables=pd.DataFrame() for week_num in range(len(urls)): current_week = weeks[week_num] current_week_end = end_dates[week_num] - # In the 2019=2020 season, the webpages for weeks 5 and 47 only have + # In the 2019-2020 season, the webpages for weeks 5 and 47 only have # the abbreviations table and the headers for the respiratory detections # table, so they are effectively empty, and skipped if season[0] == '2019': @@ -399,6 +348,7 @@ def get_season_reports(url): temp_url=urls[week_num] temp_page=requests.get(temp_url) new_soup = BeautifulSoup(temp_page.text, 'html.parser') + captions = extract_captions_of_interest(new_soup) modified_date = get_modified_dates(new_soup,current_week_end) @@ -431,7 +381,7 @@ def get_season_reports(url): # Read table, coding all the abbreviations for missing data into NA # Also use dropna because removing footers causes the html to have an empty row - na_values = ['N.A.','N.A', 'N.C.','N.R.','Not Available','Not Tested',"N.D.","-"] + na_values = ['N.A.','N.A', 'N.C.','N.R.','Not Available','Not Tested',"not available","not tested","N.D.","-"] table = pd.read_html(tab,na_values=na_values)[0].dropna(how="all") # Check for multiline headers @@ -468,6 +418,9 @@ def get_season_reports(url): # a date is written as 022-09-03, instead of 2022-09-03 table.loc[table['week'] == 35, 'week end'] = "2022-09-03" + # check if both ah1 and h1n1 are given. If so drop one since they are the same virus and ah1 is always empty + table = drop_ah1_columns(table) + # Rename columns table= preprocess_table_columns(table) @@ -523,17 +476,17 @@ def get_season_reports(url): positive_tables.append(pos_table) # create path to save files - path = "season_" + season[0]+"_"+season[1] + #path = "season_" + season[0]+"_"+season[1] # combine all the positive tables - combined_positive_tables=pd.concat(positive_tables,axis=1) + combined_positive_tables =pd.concat(positive_tables,axis=1) # Check if the indices are already in the season table # If not, add the weeks tables into the season table # check for deduplication pandas - if not respiratory_detection_table.index.isin(all_respiratory_detection_table.index).any(): - all_respiratory_detection_table= pd.concat([all_respiratory_detection_table,respiratory_detection_table]) + if not respiratory_detection_table.index.isin(all_respiratory_detection_tables.index).any(): + all_respiratory_detection_tables= pd.concat([all_respiratory_detection_tables,respiratory_detection_table]) if not combined_positive_tables.index.isin(all_positive_tables.index).any(): all_positive_tables=pd.concat([all_positive_tables,combined_positive_tables]) @@ -542,40 +495,33 @@ def get_season_reports(url): if not number_detections_table.index.isin(all_number_tables.index).any(): all_number_tables=pd.concat([all_number_tables,number_detections_table]) - # write files to csvs - all_respiratory_detection_table.to_csv(path+"/" + RESP_COUNTS_OUTPUT_FILE, index=True) - all_positive_tables.to_csv(path+"/" + POSITIVE_TESTS_OUTPUT_FILE, index=True) - - # Write the number of detections table to csv if it exists (i.e has rows) - if len(all_number_tables) != 0: - all_number_tables.to_csv(path+"/number_of_detections.csv", index=True) - -def main(): - # Scrape each season. Saves data to CSVs as a side effect. - [get_season_reports(url) for url in HISTORIC_SEASON_URL] - - # Update the end of the 2023-2024 season with the dashboard data + return { + "respiratory_detection": all_respiratory_detection_tables, + "positive": all_positive_tables, + "count": all_number_tables, + } - # Load old csvs - old_detection_data = pd.read_csv('season_2023_2024/' + RESP_COUNTS_OUTPUT_FILE).set_index(['epiweek', 'time_value', 'issue', 'geo_type', 'geo_value']) - old_positive_data = pd.read_csv('season_2023_2024/' + POSITIVE_TESTS_OUTPUT_FILE).set_index(['epiweek', 'time_value', 'issue', 'geo_type', 'geo_value']) +def fetch_archived_dashboard_dates(archive_url): + r=requests.get(archive_url) + values=r.json() + data=pd.json_normalize(values) + english_data = data[data["lang"]=="en"] + + archived_dates=english_data['date'].to_list() + return(archived_dates) - for base_url in DASHBOARD_BASE_URLS_2023_2024_SEASON: - # Get weekly dashboard data - weekly_data = get_weekly_data(base_url,2023).set_index(['epiweek', 'time_value', 'issue', 'geo_type', 'geo_value']) - positive_data = get_revised_data(base_url) - # Check if indices are already present in the old data - # If not, add the new data - if not weekly_data.index.isin(old_detection_data.index).any(): - old_detection_data= pd.concat([old_detection_data,weekly_data],axis=0) +def fetch_report_data(): + # Scrape each season. + dict_list = [fetch_one_season_from_report(url) for url in HISTORIC_SEASON_URLS] - if not positive_data.index.isin(old_positive_data.index).any(): - old_positive_data= pd.concat([old_positive_data,positive_data],axis=0) + return dict_list - # Overwrite/update csvs - old_detection_data.to_csv('season_2023_2024/' + RESP_COUNTS_OUTPUT_FILE,index=True) - old_positive_data.to_csv('season_2023_2024/' + POSITIVE_TESTS_OUTPUT_FILE,index=True) +def fetch_historical_dashboard_data(): + # Update the end of the 2023-2024 season with the dashboard data + archived_dates = fetch_archived_dashboard_dates(DASHBOARD_ARCHIVED_DATES_URL) + + archived_urls= [DASHBOARD_BASE_URL + "archive/"+ date+"/" for date in archived_dates] + dict_list = [fetch_dashboard_data(url) for url in archived_urls] -if __name__ == '__main__': - main() + return dict_list diff --git a/src/acquisition/rvdss/run.py b/src/acquisition/rvdss/run.py new file mode 100644 index 000000000..599fc89de --- /dev/null +++ b/src/acquisition/rvdss/run.py @@ -0,0 +1,128 @@ +""" +Defines command line interface for the rvdss indicator. Current data (covering the most recent epiweek) and historical data (covering all data before the most recent epiweek) can be generated together or separately. + +Defines top-level functions to fetch data and save to disk or DB. +""" + +import pandas as pd +import os +import argparse + +from utils import fetch_dashboard_data, check_most_recent_update_date,get_dashboard_update_date +from constants import DASHBOARD_BASE_URL, RESP_DETECTIONS_OUTPUT_FILE, POSITIVE_TESTS_OUTPUT_FILE, COUNTS_OUTPUT_FILE,UPDATE_DATES_FILE +from pull_historic import fetch_report_data,fetch_historical_dashboard_data + +def update_current_data(): + + ## Check if data for current update date has already been fetched + headers = { + 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36' + } + + update_date = get_dashboard_update_date(DASHBOARD_BASE_URL, headers) + already_updated = check_most_recent_update_date(update_date,UPDATE_DATES_FILE) + + if not already_updated: + with open(UPDATE_DATES_FILE, 'a') as testfile: + testfile.write(update_date+ "\n") + + ## TODO: what is the base path for these files? + base_path = "." + + data_dict = fetch_dashboard_data(DASHBOARD_BASE_URL) + + table_types = { + "respiratory_detection": RESP_DETECTIONS_OUTPUT_FILE, + "positive": POSITIVE_TESTS_OUTPUT_FILE, + # "count": COUNTS_OUTPUT_FILE, # Dashboards don't contain this data. + } + for tt in table_types.keys(): + data = data_dict[tt] + + # Write the tables to separate csvs + path = base_path + "/" + table_types[tt] + + # Since this function generates new data weekly, we need to combine it with the existing data, if it exists. + if not os.path.exists(path): + data.to_csv(path,index=True) + else: + old_data = pd.read_csv(path).set_index(['epiweek', 'time_value', 'issue', 'geo_type', 'geo_value']) + + # If index already exists in the data on disk, don't add the new data -- we may have already run the weekly data fetch. + ## TODO: The check on index maybe should be stricter? Although we do deduplication upstream, so this probably won't find true duplicates + if not data.index.isin(old_data.index).any(): + old_data= pd.concat([old_data,data],axis=0) + old_data.to_csv(path,index=True) + + # ## TODO + # update_database(data) + else: + print("Data is already up to date") + +def update_historical_data(): + ## TODO: what is the base path for these files? + base_path = "." + + report_dict_list = fetch_report_data() # a dict for every season, and every seasonal dict has 2/3 tables inside + + # a dict with an entry for every week that has an archival dashboard, and each entry has 2/3 tables + dashboard_dict_list = fetch_historical_dashboard_data() + + table_types = { + "respiratory_detection": RESP_DETECTIONS_OUTPUT_FILE, + "positive": POSITIVE_TESTS_OUTPUT_FILE, + "count": COUNTS_OUTPUT_FILE, + } + for tt in table_types.keys(): + # Merge tables together from dashboards and reports for each table type. + dashboard_data = [elem.get(tt, pd.DataFrame()) for elem in dashboard_dict_list] # a list of all the dashboard tables + report_data = [elem.get(tt, None) for elem in report_dict_list] # a list of the report table + + all_report_tables = pd.concat(report_data) + all_dashboard_tables = pd.concat(dashboard_data) + + data = pd.concat([all_report_tables, all_dashboard_tables]) + + # Write the tables to separate csvs + if not data.empty: + data.to_csv(base_path +"/" + table_types[tt], index=True) + + # ## TODO + # update_database(data) + + +def main(): + # args and usage + parser = argparse.ArgumentParser() + # fmt: off + parser.add_argument( + "--current", + "-c", + action="store_true", + help="fetch current data, that is, data for the latest epiweek" + ) + parser.add_argument( + "--historical", + "-hist", + action="store_true", + help="fetch historical data, that is, data for all available time periods other than the latest epiweek" + ) + # fmt: on + args = parser.parse_args() + + current_flag, historical_flag = ( + args.current, + args.historical, + ) + if not current_flag and not historical_flag: + raise Exception("no data was requested") + + # Decide what to update + if current_flag: + update_current_data() + if historical_flag: + update_historical_data() + + +if __name__ == "__main__": + main() diff --git a/src/acquisition/rvdss/rvdss_update.py b/src/acquisition/rvdss/rvdss_update.py deleted file mode 100644 index 7aed18974..000000000 --- a/src/acquisition/rvdss/rvdss_update.py +++ /dev/null @@ -1,42 +0,0 @@ -""" -Script to fetch new data, after data reporting moved to the dashboard -format. This covers dates following the 2023-2024 season (exclusive). -""" - -import pandas as pd -import os - -from delphi.epidata.acquisition.rvdss.utils import get_weekly_data, get_revised_data, get_dashboard_update_date -from delphi.epidata.acquisition.rvdss.constants import DASHBOARD_BASE_URL, RESP_COUNTS_OUTPUT_FILE, POSITIVE_TESTS_OUTPUT_FILE - - -def main(): - headers = { - 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36' - } - - update_date = get_dashboard_update_date(DASHBOARD_BASE_URL,headers) - weekly_data = get_weekly_data(DASHBOARD_BASE_URL,2024,headers,update_date).set_index(['epiweek', 'time_value', 'issue', 'geo_type', 'geo_value']) - positive_data = get_revised_data(DASHBOARD_BASE_URL,headers,update_date) - - path1 = './' + RESP_COUNTS_OUTPUT_FILE - path2 = './' + POSITIVE_TESTS_OUTPUT_FILE - - if not os.path.exists(path1): - weekly_data.to_csv(path1,index=True) - else: - old_detection_data = pd.read_csv(path1).set_index(['epiweek', 'time_value', 'issue', 'geo_type', 'geo_value']) - if not weekly_data.index.isin(old_detection_data.index).any(): - old_detection_data= pd.concat([old_detection_data,weekly_data],axis=0) - old_detection_data.to_csv(path1,index=True) - - if not os.path.exists(path2): - positive_data.to_csv(path2,index=True) - else: - old_positive_data = pd.read_csv(path2).set_index(['epiweek', 'time_value', 'issue', 'geo_type', 'geo_value']) - if not positive_data.index.isin(old_positive_data.index).any(): - old_positive_data= pd.concat([old_positive_data,positive_data],axis=0) - old_positive_data.to_csv(path2,index=True) - -if __name__ == '__main__': - main() \ No newline at end of file diff --git a/src/acquisition/rvdss/utils.py b/src/acquisition/rvdss/utils.py index 24a2b8337..28c3fcdb1 100644 --- a/src/acquisition/rvdss/utils.py +++ b/src/acquisition/rvdss/utils.py @@ -8,8 +8,8 @@ from unidecode import unidecode import string -from delphi.epidata.acquisition.rvdss.constants import ( - VIRUSES, GEOS, REGIONS, NATION, LAST_WEEK_OF_YEAR, +from constants import ( + VIRUSES, GEOS, REGIONS, NATION, DASHBOARD_UPDATE_DATE_FILE, DASHBOARD_DATA_FILE ) @@ -27,6 +27,7 @@ def abbreviate_geo(full_name): lowercase=re.sub("/territoires","",lowercase) lowercase=re.sub("^cana$","can",lowercase) lowercase =lowercase.translate(str.maketrans(string.punctuation, ' '*len(string.punctuation),'.'+"'")) + lowercase=re.sub("kidshospital","kids hospital",lowercase) lowercase=re.sub(' +', ' ', lowercase) new_name=unidecode(lowercase) @@ -70,8 +71,88 @@ def get_dashboard_update_date(base_url,headers): update_date_url_response = requests.get(update_date_url, headers=headers) update_date = datetime.strptime(update_date_url_response.text,"%m/%d/%Y %H:%M:%S").strftime("%Y-%m-%d") return(update_date) + +def check_most_recent_update_date(date,date_file): + with open(date_file) as file: + current_date = date + contents = file.read() + + already_updated = current_date in contents + return(already_updated) + +def preprocess_table_columns(table): + """ + Remove characters like . or * from columns + Abbreviate the viruses in columns + Change some naming of signals in columns (i.e order of hpiv and other) + Change some naming of locations in columns (i.e at instead of atl) + """ + table.columns = [re.sub("\xa0"," ", col) for col in table.columns] # \xa0 to space + table.columns = [re.sub("(.*?)(\.\d+)", "\\1", c) for c in table.columns] # remove .# for duplicated columns + table.columns =[re.sub("\.", "", s)for s in table.columns] #remove periods + table.columns =[re.sub(r"\((all)\)", "", s)for s in table.columns] # remove (all) + table.columns =[re.sub(r"\s*\(|\)", "", s)for s in table.columns] + table.columns = [re.sub(' +', ' ', col) for col in table.columns] # Make any muliple spaces into one space + table.columns = [re.sub(r'\(|\)', '', col) for col in table.columns] # replace () for _ + table.columns = [re.sub(r'/', '_', col) for col in table.columns] # replace / with _ + + table.columns = [re.sub(r"^at\b","atl ",t) for t in table.columns] + table.columns = [re.sub("canada","can",t) for t in table.columns] + table.columns = [re.sub(r"\bcb\b","bc",t) for t in table.columns] + + table.columns =[re.sub(r"h1n1 2009 |h1n12009|a_h1|ah1\b", "ah1n1pdm09", s)for s in table.columns] + table.columns =[re.sub(r"a_uns", "auns", s)for s in table.columns] + table.columns =[re.sub(r"a_h3", "ah3", s)for s in table.columns] + + table.columns =[abbreviate_virus(col) for col in table.columns] # abbreviate viruses + table.columns = [re.sub(r"flu a","flua",t) for t in table.columns] + table.columns = [re.sub(r"flu b","flub",t) for t in table.columns] + table.columns = [re.sub(r"flutest\b","flu test", col) for col in table.columns] + table.columns = [re.sub(r"other hpiv|other_hpiv","hpivother",t) for t in table.columns] + + table.columns=[re.sub(r'bpositive','b_positive',c) for c in table.columns] + table.columns=[re.sub(r'apositive','a_positive',c) for c in table.columns] + table.columns=[re.sub(r'hpiv_1','hpiv1',c) for c in table.columns] + table.columns=[re.sub(r'hpiv_2','hpiv2',c) for c in table.columns] + table.columns=[re.sub(r'hpiv_3','hpiv3',c) for c in table.columns] + table.columns=[re.sub(r'hpiv_4','hpiv4',c) for c in table.columns] + + table.columns=[make_signal_type_spelling_consistent(col) for col in table.columns] + return(table) + +def add_flu_prefix(flu_subtype): + """ Add the prefix `flu` when only the subtype is reported """ + + pat1 =r"^ah3" + pat2= r"^auns" + pat3= r"^ah1pdm09" + pat4= r"^ah1n1pdm09" + combined_pat = '|'.join((pat1, pat2,pat3,pat4)) + + full_fluname = re.sub(combined_pat, r"flu\g<0>",flu_subtype) + return(full_fluname) + +def make_signal_type_spelling_consistent(signal): + """ + Make the signal type (i.e. percent positive, number tests, total tests) have consistent spelling + Also remove total from signal names + """ + + pat1 = r"positive\b" + pat2 = r'pos\b' + combined_pat = '|'.join((pat1, pat2)) -def get_revised_data(base_url,headers,update_date): + pat3 = r"test\b" + pat4 = 'tested' + combined_pat2 = '|'.join((pat3, pat4)) + + new_signal = re.sub(combined_pat, "positive_tests",signal) + new_signal = re.sub(combined_pat2, "tests",new_signal) + new_signal =re.sub(" *%", "_pct_positive",new_signal) + new_signal = re.sub("total ", "",new_signal) + return(new_signal) + +def get_positive_data(base_url,headers,update_date): # Get update data url = base_url+DASHBOARD_DATA_FILE @@ -90,10 +171,14 @@ def get_revised_data(base_url,headers,update_date): #df=df.drop(["weekorder","region","year","week"],axis=1) - df = df.pivot(index=['epiweek','time_value','issue','geo_type','geo_value'], + df = df.pivot(index=['epiweek','time_value','issue','geo_type','geo_value','region','week','weekorder','year'], columns="virus",values=['tests','percentpositive','positivetests']) + df.columns = ['_'.join(col).strip() for col in df.columns.values] df = df.rename(columns=lambda x: '_'.join(x.split('_')[1:]+x.split('_')[:1])) + df.columns = [re.sub(r'/', '', col) for col in df.columns] # replace / with _ + df.columns = [re.sub(r"flu a","flua",t) for t in df.columns] + df.columns = [re.sub(r"flu b","flub",t) for t in df.columns] df.columns=[re.sub("positivetests", "positive_tests",col) for col in df.columns] df.columns=[re.sub("percentpositive", "pct_positive",col) for col in df.columns] df.columns=[re.sub(r' ','_',c) for c in df.columns] @@ -104,7 +189,7 @@ def get_revised_data(base_url,headers,update_date): return(df) -def get_weekly_data(base_url,start_year,headers,update_date): +def get_detections_data(base_url,headers,update_date): # Get current week and year summary_url = base_url + "RVD_SummaryText.csv" summary_url_response = requests.get(summary_url, headers=headers) @@ -113,38 +198,41 @@ def get_weekly_data(base_url,start_year,headers,update_date): week_df = summary_df[(summary_df['Section'] == "summary") & (summary_df['Type']=="title")] week_string = week_df.iloc[0]['Text'].lower() current_week = int(re.search("week (.+?) ", week_string).group(1)) - - if current_week < LAST_WEEK_OF_YEAR: - current_year = start_year+1 - else: - current_year = start_year + current_year= int(re.search("20\d{2}", week_string).group(0)) current_epiweek= Week(current_year,current_week) # Get weekly data - weekly_url = base_url + "RVD_CurrentWeekTable.csv" - weekly_url_response = requests.get(weekly_url, headers=headers) - weekly_url_response.encoding='UTF-8' - df_weekly = pd.read_csv(io.StringIO(weekly_url_response.text)) - - df_weekly = df_weekly.rename(columns=lambda x: '_'.join(x.split('_')[1:]+x.split('_')[:1])) - df_weekly.insert(0,"epiweek",int(str(current_epiweek))) - df_weekly.insert(1,"time_value",str(current_epiweek.enddate())) - df_weekly.insert(2,"issue",update_date) - df_weekly.columns=[abbreviate_virus(c) for c in df_weekly.columns] - df_weekly.columns=[re.sub(r'test\b','tests',c) for c in df_weekly.columns] - df_weekly.columns=[re.sub(r'pos\b','positive_tests',c) for c in df_weekly.columns] - df_weekly.columns=[re.sub(r'flua_','flu_a',c) for c in df_weekly.columns] - df_weekly.columns=[re.sub(r'flub_','flu_b',c) for c in df_weekly.columns] - df_weekly.columns=[re.sub(r'bpositive','b_positive',c) for c in df_weekly.columns] - df_weekly.columns=[re.sub(r'apositive','a_positive',c) for c in df_weekly.columns] - df_weekly.columns=[re.sub(r'flu_ah1_','flu_ah1pdm09_',c) for c in df_weekly.columns] - df_weekly.columns=[re.sub(r' ','_',c) for c in df_weekly.columns] - df_weekly=df_weekly.rename(columns={'reportinglaboratory':"geo_value"}) - df_weekly['geo_value'] = [abbreviate_geo(g) for g in df_weekly['geo_value']] - df_weekly['geo_type'] = [create_geo_types(g,"lab") for g in df_weekly['geo_value']] - - # if df_weekly.columns.isin(["weekorder","date","week"]).all(): - # df_weekly=df_weekly.drop(["weekorder","date","week"],axis=1) - - return(df_weekly) \ No newline at end of file + detections_url = base_url + "RVD_CurrentWeekTable.csv" + detections_url_response = requests.get(detections_url, headers=headers) + detections_url_response.encoding='UTF-8' + df_detections = pd.read_csv(io.StringIO(detections_url_response.text)) + + df_detections = df_detections.rename(columns=lambda x: '_'.join(x.split('_')[1:]+x.split('_')[:1])) + df_detections.insert(0,"epiweek",int(str(current_epiweek))) + df_detections.insert(1,"time_value",str(current_epiweek.enddate())) + df_detections.insert(2,"issue",update_date) + df_detections=preprocess_table_columns(df_detections) + + df_detections.columns=[re.sub(r' ','_',c) for c in df_detections.columns] + df_detections=df_detections.rename(columns={'reportinglaboratory':"geo_value"}) + df_detections['geo_value'] = [abbreviate_geo(g) for g in df_detections['geo_value']] + df_detections['geo_type'] = [create_geo_types(g,"lab") for g in df_detections['geo_value']] + + return(df_detections.set_index(['epiweek', 'time_value', 'issue', 'geo_type', 'geo_value'])) + +def fetch_dashboard_data(url): + headers = { + 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36' + } + + update_date = get_dashboard_update_date(url, headers) + + detections_data = get_detections_data(url,headers,update_date) + positive_data = get_positive_data(url,headers,update_date) + + return { + "respiratory_detection": detections_data, + "positive": positive_data, + # "count": None, # Dashboards don't contain this data. + } diff --git a/src/ddl/rvdss.sql b/src/ddl/rvdss.sql new file mode 100644 index 000000000..d3a17a5b5 --- /dev/null +++ b/src/ddl/rvdss.sql @@ -0,0 +1,49 @@ +USE epidata; +/* +TODO: briefly describe data source and define all columns. +*/ + +CREATE TABLE `rvdss_repiratory_detections` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `date` date NOT NULL, + `geo_type` char(20) NOT NULL, + `geo_value` char(20) NOT NULL, + `epiweek` int(11) NOT NULL, + `flua_positive_tests` int(11) NOT NULL, + `flua_percent_positive_tests` double NOT NULL, + `flu_total_tests` int(11) NOT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `date` (`date`,`geo_value`), + KEY `state` (`state`), + KEY `epiweek` (`epiweek`), +) ENGINE=InnoDB DEFAULT CHARSET=utf8; + +CREATE TABLE `rvdss_testing` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `date` date NOT NULL, + `geo_type` char(20) NOT NULL, + `geo_value` char(20) NOT NULL, + `epiweek` int(11) NOT NULL, + `flua_positive_tests` int(11) NOT NULL, + `flua_percent_positive_tests` double NOT NULL, + `flu_total_tests` int(11) NOT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `date` (`date`,`geo_value`), + KEY `state` (`state`), + KEY `epiweek` (`epiweek`), +) ENGINE=InnoDB DEFAULT CHARSET=utf8; + +CREATE TABLE `rvdss_detections_counts` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `date` date NOT NULL, + `geo_type` char(20) NOT NULL, + `geo_value` char(20) NOT NULL, + `epiweek` int(11) NOT NULL, + `flua_positive_tests` int(11) NOT NULL, + `flua_percent_positive_tests` double NOT NULL, + `flu_total_tests` int(11) NOT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `date` (`date`,`geo_value`), + KEY `state` (`state`), + KEY `epiweek` (`epiweek`), +) ENGINE=InnoDB DEFAULT CHARSET=utf8;