2
2
"""Functions to call when running the function.
3
3
4
4
This module should contain a function called `run_module`, that is executed
5
- when the module is run with `python -m MODULE_NAME `.
5
+ when the module is run with `python -m delphi_nchs_mortality `.
6
6
"""
7
7
from datetime import datetime , date , timedelta
8
8
from os .path import join
9
- from os import remove , listdir
10
- from shutil import copy
11
9
12
10
import numpy as np
13
11
import pandas as pd
14
12
from delphi_utils import read_params , S3ArchiveDiffer
15
13
16
14
from .pull import pull_nchs_mortality_data
17
15
from .export import export_csv
18
-
19
- # global constants
20
- METRICS = [
21
- 'covid_deaths' , 'total_deaths' , 'percent_of_expected_deaths' ,
22
- 'pneumonia_deaths' , 'pneumonia_and_covid_deaths' , 'influenza_deaths' ,
23
- 'pneumonia_influenza_or_covid_19_deaths'
24
- ]
25
- SENSOR_NAME_MAP = {
26
- "covid_deaths" : "deaths_covid_incidence" ,
27
- "total_deaths" : "deaths_allcause_incidence" ,
28
- "percent_of_expected_deaths" : "deaths_percent_of_expected" ,
29
- "pneumonia_deaths" : "deaths_pneumonia_notflu_incidence" ,
30
- "pneumonia_and_covid_deaths" : "deaths_covid_and_pneumonia_notflu_incidence" ,
31
- "influenza_deaths" : "deaths_flu_incidence" ,
32
- "pneumonia_influenza_or_covid_19_deaths" : "deaths_pneumonia_or_flu_or_covid_incidence"
33
- }
34
- SENSORS = [
35
- "num" ,
36
- "prop"
37
- ]
38
- INCIDENCE_BASE = 100000
39
- GEO_RES = "state"
16
+ from .archive_diffs import arch_diffs
17
+ from .constants import (METRICS , SENSOR_NAME_MAP ,
18
+ SENSORS , INCIDENCE_BASE , GEO_RES )
40
19
41
20
def run_module (): # pylint: disable=too-many-branches,too-many-statements
42
21
"""Run module for processing NCHS mortality data."""
@@ -46,9 +25,7 @@ def run_module(): # pylint: disable=too-many-branches,too-many-statements
46
25
export_start_date = date .today () - timedelta (
47
26
days = date .today ().weekday () + 2 )
48
27
export_start_date = export_start_date .strftime ('%Y-%m-%d' )
49
- export_dir = params ["export_dir" ]
50
28
daily_export_dir = params ["daily_export_dir" ]
51
- cache_dir = params ["cache_dir" ]
52
29
daily_cache_dir = params ["daily_cache_dir" ]
53
30
static_file_dir = params ["static_file_dir" ]
54
31
token = params ["token" ]
@@ -100,56 +77,7 @@ def run_module(): # pylint: disable=too-many-branches,too-many-statements
100
77
# Weekly run of archive utility on Monday
101
78
# - Does not upload to S3, that is handled by daily run of archive utility
102
79
# - Exports issues into receiving for the API
103
- if datetime .today ().weekday () == 0 :
104
- # Copy todays raw output to receiving
105
- for output_file in listdir (daily_export_dir ):
106
- copy (
107
- join (daily_export_dir , output_file ),
108
- join (export_dir , output_file ))
109
-
110
- weekly_arch_diff = S3ArchiveDiffer (
111
- cache_dir , export_dir ,
112
- params ["bucket_name" ], "nchs_mortality" ,
113
- params ["aws_credentials" ])
114
-
115
- # Dont update cache from S3 (has daily files), only simulate a update_cache() call
116
- weekly_arch_diff ._cache_updated = True # pylint: disable=protected-access
117
-
118
- # Diff exports, and make incremental versions
119
- _ , common_diffs , new_files = weekly_arch_diff .diff_exports ()
120
-
121
- # Archive changed and new files only
122
- to_archive = [f for f , diff in common_diffs .items () if diff is not None ]
123
- to_archive += new_files
124
- _ , fails = weekly_arch_diff .archive_exports (to_archive , update_s3 = False )
125
-
126
- # Filter existing exports to exclude those that failed to archive
127
- succ_common_diffs = {f : diff for f , diff in common_diffs .items () if f not in fails }
128
- weekly_arch_diff .filter_exports (succ_common_diffs )
129
-
130
- # Report failures: someone should probably look at them
131
- for exported_file in fails :
132
- print (f"Failed to archive (weekly) '{ exported_file } '" )
133
-
134
80
# Daily run of archiving utility
135
81
# - Uploads changed files to S3
136
82
# - Does not export any issues into receiving
137
-
138
- # Diff exports, and make incremental versions
139
- _ , common_diffs , new_files = daily_arch_diff .diff_exports ()
140
-
141
- # Archive changed and new files only
142
- to_archive = [f for f , diff in common_diffs .items () if diff is not None ]
143
- to_archive += new_files
144
- _ , fails = daily_arch_diff .archive_exports (to_archive )
145
-
146
- # Daily output not needed anymore, remove them
147
- for exported_file in new_files :
148
- remove (exported_file )
149
- for exported_file , diff_file in common_diffs .items ():
150
- remove (exported_file )
151
- remove (diff_file )
152
-
153
- # Report failures: someone should probably look at them
154
- for exported_file in fails :
155
- print (f"Failed to archive (daily) '{ exported_file } '" )
83
+ arch_diffs (params , daily_arch_diff )
0 commit comments