Skip to content

rvdss interface and new fn layout so current/historical data can be easily fetched #1551

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Nov 22, 2024
18 changes: 14 additions & 4 deletions src/acquisition/rvdss/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from datetime import datetime

# The dataset calls the same viruses, provinces, regions (province groups),
# and country by multiple names. Map each of those to a common abbreviation.
VIRUSES = {
Expand Down Expand Up @@ -34,7 +36,7 @@
"saskatchewan":"sk",
"alberta": "ab",
"british columbia" :"bc",
"yukon" : "yk",
"yukon" : "yt",
"northwest territories" : "nt",
"nunavut" : "nu",
"canada":"ca",
Expand All @@ -54,6 +56,8 @@
# Construct dashboard and data report URLS.
DASHBOARD_BASE_URL = "https://health-infobase.canada.ca/src/data/respiratory-virus-detections/"
DASHBOARD_W_DATE_URL = DASHBOARD_BASE_URL + "archive/{date}/"

# May not need this since we write a function for this in pull_historic
DASHBOARD_BASE_URLS_2023_2024_SEASON = (
DASHBOARD_W_DATE_URL.format(date = date) for date in
(
Expand All @@ -74,6 +78,7 @@
SEASON_BASE_URL = "https://www.canada.ca"
ALTERNATIVE_SEASON_BASE_URL = "www.phac-aspc.gc.ca/bid-bmi/dsd-dsm/rvdi-divr/"
HISTORIC_SEASON_REPORTS_URL = SEASON_BASE_URL+"/en/public-health/services/surveillance/respiratory-virus-detections-canada/{year_range}.html"
DASHBOARD_ARCHIVED_DATES_URL= "https://health-infobase.canada.ca/src/js/respiratory-virus-detections/ArchiveData.json"

# Each URL created here points to a list of all data reports made during that
# season, e.g.
Expand All @@ -82,7 +87,7 @@
# disease data in a dashboard with a static URL. Therefore, this collection
# of URLs does _NOT_ need to be updated. It is used for fetching historical
# data (for dates on or before June 8, 2024) only.
HISTORIC_SEASON_URL = (HISTORIC_SEASON_REPORTS_URL.format(year_range = year_range) for year_range in
HISTORIC_SEASON_URLS = (HISTORIC_SEASON_REPORTS_URL.format(year_range = year_range) for year_range in
(
"2013-2014",
"2014-2015",
Expand All @@ -101,7 +106,12 @@
DASHBOARD_UPDATE_DATE_FILE = "RVD_UpdateDate.csv"
DASHBOARD_DATA_FILE = "RVD_WeeklyData.csv"

RESP_COUNTS_OUTPUT_FILE = "respiratory_detections.csv"

RESP_DETECTIONS_OUTPUT_FILE = "respiratory_detections.csv"
POSITIVE_TESTS_OUTPUT_FILE = "positive_tests.csv"
COUNTS_OUTPUT_FILE = "number_of_detections.csv"

FIRST_WEEK_OF_YEAR = 35

LAST_WEEK_OF_YEAR = 35
UPDATE_DATES_FILE = "update_dates.txt"
NOW = datetime.now()
121 changes: 121 additions & 0 deletions src/acquisition/rvdss/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""
===============
=== Purpose ===
===============

Stores data provided by rvdss Corp., which contains flu lab test results.
See: rvdss.py


=======================
=== Data Dictionary ===
=======================

`rvdss` is the table where rvdss data is stored.
+----------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+----------+-------------+------+-----+---------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| location | varchar(8) | NO | MUL | NULL | |
| epiweek | int(11) | NO | MUL | NULL | |
| value | float | NO | | NULL | |
+----------+-------------+------+-----+---------+----------------+
id: unique identifier for each record
location: hhs1-10
epiweek: the epiweek during which the queries were executed
value: number of total test records per facility, within each epiweek

=================
=== Changelog ===
=================
2017-12-14:
* add "need update" check

2017-12-02:
* original version
"""

# standard library
import argparse

# third party
import mysql.connector

# first party
from delphi.epidata.acquisition.rvdss import rvdss
import delphi.operations.secrets as secrets
from delphi.utils.epidate import EpiDate
import delphi.utils.epiweek as flu
from delphi.utils.geo.locations import Locations

LOCATIONS = Locations.hhs_list
DATAPATH = "/home/automation/rvdss_data"


def update(locations, first=None, last=None, force_update=False, load_email=True):
# download and prepare data first
qd = rvdss.rvdssData(DATAPATH, load_email)
if not qd.need_update and not force_update:
print("Data not updated, nothing needs change.")
return

qd_data = qd.load_csv()
qd_measurements = qd.prepare_measurements(qd_data, start_weekday=4)
qd_ts = rvdss.measurement_to_ts(qd_measurements, 7, startweek=first, endweek=last)
# connect to the database
u, p = secrets.db.epi
cnx = mysql.connector.connect(user=u, password=p, database="epidata")
cur = cnx.cursor()

def get_num_rows():
cur.execute("SELECT count(1) `num` FROM `rvdss`")
for (num,) in cur:
pass
return num

# check from 4 weeks preceeding the last week with data through this week
cur.execute("SELECT max(`epiweek`) `ew0`, yearweek(now(), 6) `ew1` FROM `rvdss`")
for (ew0, ew1) in cur:
ew0 = 200401 if ew0 is None else flu.add_epiweeks(ew0, -4)
ew0 = ew0 if first is None else first
ew1 = ew1 if last is None else last
print(f"Checking epiweeks between {int(ew0)} and {int(ew1)}...")

# keep track of how many rows were added
rows_before = get_num_rows()

# check rvdss for new and/or revised data
sql = """
INSERT INTO
`rvdss` (`location`, `epiweek`, `value`)
VALUES
(%s, %s, %s)
ON DUPLICATE KEY UPDATE
`value` = %s
"""

total_rows = 0

for location in locations:
if location not in qd_ts:
continue
ews = sorted(qd_ts[location].keys())
num_missing = 0
for ew in ews:
v = qd_ts[location][ew]
sql_data = (location, ew, v, v)
cur.execute(sql, sql_data)
total_rows += 1
if v == 0:
num_missing += 1
if num_missing > 0:
print(f" [{location}] missing {int(num_missing)}/{len(ews)} value(s)")

# keep track of how many rows were added
rows_after = get_num_rows()
print(f"Inserted {int(rows_after - rows_before)}/{int(total_rows)} row(s)")

# cleanup
cur.close()
cnx.commit()
cnx.close()
Loading
Loading