|
10 | 10 |
|
11 | 11 | # third party
|
12 | 12 | import json
|
| 13 | +from typing import List |
13 | 14 | import mysql.connector
|
14 | 15 |
|
15 | 16 | # first party
|
16 | 17 | import delphi.operations.secrets as secrets
|
17 | 18 | from delphi.epidata.acquisition.covidcast.logger import get_structured_logger
|
18 |
| - |
19 |
| -class CovidcastRow(): |
20 |
| - """A container for all the values of a single covidcast row.""" |
21 |
| - |
22 |
| - @staticmethod |
23 |
| - def fromCsvRowValue(row_value, source, signal, time_type, geo_type, time_value, issue, lag): |
24 |
| - if row_value is None: return None |
25 |
| - return CovidcastRow(source, signal, time_type, geo_type, time_value, |
26 |
| - row_value.geo_value, |
27 |
| - row_value.value, |
28 |
| - row_value.stderr, |
29 |
| - row_value.sample_size, |
30 |
| - row_value.missing_value, |
31 |
| - row_value.missing_stderr, |
32 |
| - row_value.missing_sample_size, |
33 |
| - issue, lag) |
34 |
| - |
35 |
| - @staticmethod |
36 |
| - def fromCsvRows(row_values, source, signal, time_type, geo_type, time_value, issue, lag): |
37 |
| - # NOTE: returns a generator, as row_values is expected to be a generator |
38 |
| - return (CovidcastRow.fromCsvRowValue(row_value, source, signal, time_type, geo_type, time_value, issue, lag) |
39 |
| - for row_value in row_values) |
40 |
| - |
41 |
| - def __init__(self, source, signal, time_type, geo_type, time_value, geo_value, value, stderr, |
42 |
| - sample_size, missing_value, missing_stderr, missing_sample_size, issue, lag): |
43 |
| - self.id = None |
44 |
| - self.source = source |
45 |
| - self.signal = signal |
46 |
| - self.time_type = time_type |
47 |
| - self.geo_type = geo_type |
48 |
| - self.time_value = time_value |
49 |
| - self.geo_value = geo_value # from CSV row |
50 |
| - self.value = value # ... |
51 |
| - self.stderr = stderr # ... |
52 |
| - self.sample_size = sample_size # ... |
53 |
| - self.missing_value = missing_value # ... |
54 |
| - self.missing_stderr = missing_stderr # ... |
55 |
| - self.missing_sample_size = missing_sample_size # from CSV row |
56 |
| - self.direction_updated_timestamp = 0 |
57 |
| - self.direction = None |
58 |
| - self.issue = issue |
59 |
| - self.lag = lag |
60 |
| - |
61 |
| - def signal_pair(self): |
62 |
| - return f"{self.source}:{self.signal}" |
63 |
| - |
64 |
| - def geo_pair(self): |
65 |
| - return f"{self.geo_type}:{self.geo_value}" |
| 19 | +from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow |
66 | 20 |
|
67 | 21 |
|
68 | 22 | class DBLoadStateException(Exception):
|
@@ -154,7 +108,7 @@ def do_analyze(self):
|
154 | 108 | def insert_or_update_bulk(self, cc_rows):
|
155 | 109 | return self.insert_or_update_batch(cc_rows)
|
156 | 110 |
|
157 |
| - def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False, suppress_jobs=False): |
| 111 | + def insert_or_update_batch(self, cc_rows: List[CovidcastRow], batch_size=2**20, commit_partial=False, suppress_jobs=False): |
158 | 112 | """
|
159 | 113 | Insert new rows into the load table and dispatch into dimension and fact tables.
|
160 | 114 | """
|
|
0 commit comments