Skip to content

Commit 47a8c52

Browse files
committed
sane *_updated_timestamp field names
1 parent 68af3b6 commit 47a8c52

File tree

10 files changed

+94
-94
lines changed

10 files changed

+94
-94
lines changed

integrations/acquisition/covidcast/test_csv_uploading.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -192,10 +192,10 @@ def apply_lag(expected_epidata):
192192
})
193193

194194
# verify timestamps and default values are reasonable
195-
self.cur.execute('select timestamp1, timestamp2, direction from covidcast')
196-
for timestamp1, timestamp2, direction in self.cur:
197-
self.assertGreater(timestamp1, 0)
198-
self.assertEqual(timestamp2, 0)
195+
self.cur.execute('select value_updated_timestamp, direction_updated_timestamp, direction from covidcast')
196+
for value_updated_timestamp, direction_updated_timestamp, direction in self.cur:
197+
self.assertGreater(value_updated_timestamp, 0)
198+
self.assertEqual(direction_updated_timestamp, 0)
199199
self.assertIsNone(direction)
200200

201201
# verify that the CSVs were archived

integrations/acquisition/covidcast/test_direction_updating.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ def test_uploading(self):
188188
})
189189

190190
# verify secondary timestamps were updated
191-
self.cur.execute('select timestamp2 from covidcast order by id asc')
191+
self.cur.execute('select direction_updated_timestamp from covidcast order by id asc')
192192
timestamps = [t for (t,) in self.cur]
193193
for t in timestamps[:6]:
194194
# first 6 rows had `direction` updated

src/acquisition/covidcast/database.py

+26-26
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def __init__(self, source, signal, time_type, geo_type, time_value, geo_value, v
4242
self.value = value # ...
4343
self.stderr = stderr # ...
4444
self.sample_size = sample_size # from CSV row
45-
self.timestamp2 = 0
45+
self.direction_updated_timestamp = 0
4646
self.direction = None
4747
self.issue = issue
4848
self.lag = lag
@@ -101,21 +101,21 @@ def insert_or_update_batch(self, cc_rows, batch_size=0, commit_partial=False):
101101
sql = '''
102102
INSERT INTO `covidcast`
103103
(`id`, `source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`,
104-
`timestamp1`, `value`, `stderr`, `sample_size`,
105-
`timestamp2`, `direction`,
104+
`value_updated_timestamp`, `value`, `stderr`, `sample_size`,
105+
`direction_updated_timestamp`, `direction`,
106106
`issue`, `lag`)
107107
VALUES
108108
(0, %s, %s, %s, %s, %s, %s,
109109
UNIX_TIMESTAMP(NOW()), %s, %s, %s,
110110
0, NULL,
111111
%s, %s)
112112
ON DUPLICATE KEY UPDATE
113-
`timestamp1` = VALUES(`timestamp1`),
113+
`value_updated_timestamp` = VALUES(`value_updated_timestamp`),
114114
`value` = VALUES(`value`),
115115
`stderr` = VALUES(`stderr`),
116116
`sample_size` = VALUES(`sample_size`)
117117
'''
118-
# TODO: ^ do we want to reset `timestamp2` and `direction` in the duplicate key case?
118+
# TODO: ^ do we want to reset `direction_updated_timestamp` and `direction` in the duplicate key case?
119119

120120
# TODO: consider handling cc_rows as a generator instead of a list
121121
num_rows = len(cc_rows)
@@ -175,7 +175,7 @@ def insert_or_update(
175175
INSERT INTO `covidcast` VALUES
176176
(0, %s, %s, %s, %s, %s, %s, UNIX_TIMESTAMP(NOW()), %s, %s, %s, 0, NULL, %s, %s)
177177
ON DUPLICATE KEY UPDATE
178-
`timestamp1` = VALUES(`timestamp1`),
178+
`value_updated_timestamp` = VALUES(`value_updated_timestamp`),
179179
`value` = VALUES(`value`),
180180
`stderr` = VALUES(`stderr`),
181181
`sample_size` = VALUES(`sample_size`)
@@ -246,7 +246,7 @@ def update_direction(
246246
UPDATE
247247
`covidcast`
248248
SET
249-
`timestamp2` = UNIX_TIMESTAMP(NOW()),
249+
`direction_updated_timestamp` = UNIX_TIMESTAMP(NOW()),
250250
`direction` = %s
251251
WHERE
252252
`source` = %s AND
@@ -286,9 +286,9 @@ def get_all_record_values_of_timeseries_with_potentially_stale_direction(self, t
286286
`geo_type` varchar(12),
287287
`geo_value` varchar(12),
288288
`time_value` int(11),
289-
`timestamp1` int(11),
289+
`value_updated_timestamp` int(11),
290290
`value` double,
291-
`timestamp2` int(11),
291+
`direction_updated_timestamp` int(11),
292292
`direction` int(11),
293293
PRIMARY KEY(`id`)
294294
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
@@ -305,9 +305,9 @@ def get_all_record_values_of_timeseries_with_potentially_stale_direction(self, t
305305
`geo_type`,
306306
`geo_value`,
307307
`time_value`,
308-
`timestamp1`,
308+
`value_updated_timestamp`,
309309
`value`,
310-
`timestamp2`,
310+
`direction_updated_timestamp`,
311311
`direction`
312312
FROM
313313
(
@@ -358,7 +358,7 @@ def get_all_record_values_of_timeseries_with_potentially_stale_direction(self, t
358358
`geo_type`,
359359
`geo_value`
360360
HAVING
361-
MAX(`timestamp1`) > MIN(`timestamp2`)
361+
MAX(`value_updated_timestamp`) > MIN(`direction_updated_timestamp`)
362362
'''
363363

364364
# A query that selects rows of the time-series selected by stale_ts_key_sql query.
@@ -372,9 +372,9 @@ def get_all_record_values_of_timeseries_with_potentially_stale_direction(self, t
372372
`geo_type`,
373373
`geo_value`,
374374
`time_value`,
375-
`timestamp1`,
375+
`value_updated_timestamp`,
376376
`value`,
377-
`timestamp2`,
377+
`direction_updated_timestamp`,
378378
`direction`
379379
FROM ({stale_ts_key_sql}) AS t2
380380
LEFT JOIN `latest_issues` AS t3
@@ -424,8 +424,8 @@ def drop_temporary_table(self, tmp_table_name):
424424
sql = f'DROP TEMPORARY TABLE `{tmp_table_name}`;'
425425
self._cursor.execute(sql)
426426

427-
def update_timestamp2_from_temporary_table(self, tmp_table_name):
428-
"""Updates the `timestamp2` column of `covidcast` table for all the rows with id value in `tmp_table_name`.
427+
def update_direction_updated_timestamp_from_temporary_table(self, tmp_table_name):
428+
"""Updates the `direction_updated_timestamp` column of `covidcast` table for all the rows with id value in `tmp_table_name`.
429429
430430
`tmp_table_name`: name of the temporary table.
431431
"""
@@ -437,7 +437,7 @@ def update_timestamp2_from_temporary_table(self, tmp_table_name):
437437
ON
438438
`covidcast`.id=t.id
439439
SET
440-
`covidcast`.timestamp2=UNIX_TIMESTAMP(NOW())
440+
`covidcast`.direction_updated_timestamp=UNIX_TIMESTAMP(NOW())
441441
'''
442442
self._cursor.execute(sql)
443443

@@ -457,8 +457,8 @@ def get_keys_with_potentially_stale_direction(self):
457457
`signal`,
458458
`geo_type`,
459459
`geo_value`,
460-
MAX(`timestamp1`) AS `max_timestamp1`,
461-
MIN(`timestamp2`) AS `min_timestamp2`,
460+
MAX(`value_updated_timestamp`) AS `max_value_updated_timestamp`,
461+
MIN(`direction_updated_timestamp`) AS `min_direction_updated_timestamp`,
462462
MIN(`time_value`) AS `min_day`,
463463
MAX(`time_value`) AS `max_day`,
464464
COUNT(1) AS `series_length`
@@ -473,7 +473,7 @@ def get_keys_with_potentially_stale_direction(self):
473473
`geo_type`,
474474
`geo_value`
475475
HAVING
476-
MAX(`timestamp1`) > MIN(`timestamp2`)
476+
MAX(`value_updated_timestamp`) > MIN(`direction_updated_timestamp`)
477477
'''
478478

479479
self._cursor.execute(sql)
@@ -488,8 +488,8 @@ def get_daily_timeseries_for_direction_update(
488488
DATEDIFF(`time_value`, %s) AS `offset`,
489489
`time_value` AS `day`,
490490
`value`,
491-
`timestamp1`,
492-
`timestamp2`
491+
`value_updated_timestamp`,
492+
`direction_updated_timestamp`
493493
FROM
494494
`covidcast`
495495
WHERE
@@ -507,9 +507,9 @@ def get_daily_timeseries_for_direction_update(
507507
self._cursor.execute(sql, args)
508508
return list(self._cursor)
509509

510-
def update_timeseries_timestamp2(
510+
def update_timeseries_direction_updated_timestamp(
511511
self, source, signal, time_type, geo_type, geo_value):
512-
"""Update the `timestamp2` column for an entire time-series.
512+
"""Update the `direction_updated_timestamp` column for an entire time-series.
513513
514514
For daily time-series, this implies that all `direction` values in the
515515
specified time-series are confirmed fresh as of the current time. Even if
@@ -523,7 +523,7 @@ def update_timeseries_timestamp2(
523523
UPDATE
524524
`covidcast`
525525
SET
526-
`timestamp2` = UNIX_TIMESTAMP(NOW())
526+
`direction_updated_timestamp` = UNIX_TIMESTAMP(NOW())
527527
WHERE
528528
`source` = %s AND
529529
`signal` = %s AND
@@ -552,7 +552,7 @@ def get_covidcast_meta(self):
552552
MAX(`value`) AS `max_value`,
553553
ROUND(AVG(`value`),7) AS `mean_value`,
554554
ROUND(STD(`value`),7) AS `stdev_value`,
555-
MAX(`timestamp1`) AS `last_update`,
555+
MAX(`value_updated_timestamp`) AS `last_update`,
556556
MAX(`issue`) as `max_issue`,
557557
MIN(`lag`) as `min_lag`,
558558
MAX(`lag`) as `max_lag`

src/acquisition/covidcast/direction.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -64,18 +64,18 @@ def scan_timeseries(
6464
offsets,
6565
days,
6666
values,
67-
timestamp1s,
68-
timestamp2s,
67+
value_updated_timestamps,
68+
direction_updated_timestamps,
6969
get_direction_impl):
7070
"""Scan an entire time-series and return fresh direction updates.
7171
7272
`offsets`: time offset, relative to any arbitrary fixed point in time, for
7373
each day in `days`
7474
`days`: day (YYYYMMDD) corresponding to each row in the other arrays
7575
`values`: value of the signal on each day
76-
`timestamp1s`: primary timestamp for each row (i.e. when `value` was
76+
`value_updated_timestamps`: primary timestamp for each row (i.e. when `value` was
7777
updated)
78-
`timestamp2s`: secondary timestamp for each row (i.e. when `direction` was
78+
`direction_updated_timestamps`: secondary timestamp for each row (i.e. when `direction` was
7979
last deemed to be fresh, relative to associated `value`s)
8080
`get_direction_impl`: a function which takes two arrays (time and value)
8181
and returns a classification of the direction (i.e. as -1, 0, +1)
@@ -100,8 +100,8 @@ def scan_timeseries(
100100
start += 1
101101

102102
# check whether this row needs an update
103-
direction_time = timestamp2s[end]
104-
value_time = np.max(timestamp1s[start:end + 1])
103+
direction_time = direction_updated_timestamps[end]
104+
value_time = np.max(value_updated_timestamps[start:end + 1])
105105
if direction_time > value_time:
106106
# this row is fresh
107107
continue

src/acquisition/covidcast/direction_updater.py

+15-15
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ def update_loop(database, direction_impl=Direction):
7979
signal,
8080
geo_type,
8181
geo_value,
82-
max_timestamp1,
83-
min_timestamp2,
82+
max_value_updated_timestamp,
83+
min_direction_updated_timestamp,
8484
min_day,
8585
max_day,
8686
series_length,
@@ -108,7 +108,7 @@ def update_loop(database, direction_impl=Direction):
108108
min_day,
109109
max_day,
110110
series_length,
111-
max_timestamp1 - min_timestamp2,
111+
max_value_updated_timestamp - min_direction_updated_timestamp,
112112
)
113113
print(msg % args)
114114

@@ -118,12 +118,12 @@ def update_loop(database, direction_impl=Direction):
118118

119119
# transpose result set and cast data types
120120
data = np.array(timeseries_rows)
121-
offsets, days, values, timestamp1s, timestamp2s = data.T
121+
offsets, days, values, value_updated_timestamps, direction_updated_timestamps = data.T
122122
offsets = offsets.astype(np.int64)
123123
days = days.astype(np.int64)
124124
values = values.astype(np.float64)
125-
timestamp1s = timestamp1s.astype(np.int64)
126-
timestamp2s = timestamp2s.astype(np.int64)
125+
value_updated_timestamps = value_updated_timestamps.astype(np.int64)
126+
direction_updated_timestamps = direction_updated_timestamps.astype(np.int64)
127127

128128
# create a direction classifier for this signal
129129
data_stdev = data_stdevs[source][signal][geo_type]
@@ -135,7 +135,7 @@ def get_direction_impl(x, y):
135135

136136
# recompute any stale directions
137137
days, directions = direction_impl.scan_timeseries(
138-
offsets, days, values, timestamp1s, timestamp2s, get_direction_impl)
138+
offsets, days, values, value_updated_timestamps, direction_updated_timestamps, get_direction_impl)
139139

140140
if be_verbose:
141141
print(' computed %d direction updates' % len(directions))
@@ -148,7 +148,7 @@ def get_direction_impl(x, y):
148148
source, signal, 'day', geo_type, day, geo_value, direction)
149149

150150
# mark the entire time-series as fresh with respect to direction
151-
database.update_timeseries_timestamp2(
151+
database.update_timeseries_direction_updated_timestamp(
152152
source, signal, 'day', geo_type, geo_value)
153153

154154

@@ -162,7 +162,7 @@ def optimized_update_loop(database, direction_impl=Direction):
162162

163163
# A pandas DataFrame that will hold all rows from potentially stale time-series
164164
df_all = pd.DataFrame(columns=['id', 'source', 'signal', 'time_type', 'geo_type', 'geo_value', 'time_value',
165-
'timestamp1', 'value', 'timestamp2', 'direction'],
165+
'value_updated_timestamp', 'value', 'direction_updated_timestamp', 'direction'],
166166
data=database.get_all_record_values_of_timeseries_with_potentially_stale_direction(
167167
tmp_table_name))
168168
df_all.drop(columns=['time_type'], inplace=True)
@@ -220,15 +220,15 @@ def optimized_update_loop(database, direction_impl=Direction):
220220
ts_rows.time_value.min(),
221221
ts_rows.time_value.max(),
222222
len(ts_rows),
223-
ts_rows.timestamp1.max() - ts_rows.timestamp2.min()
223+
ts_rows.value_updated_timestamp.max() - ts_rows.direction_updated_timestamp.min()
224224
)
225225
print(msg % args)
226226

227227
offsets = ts_rows.offsets.values.astype(np.int64)
228228
days = ts_rows.time_value.values.astype(np.int64)
229229
values = ts_rows.value.values.astype(np.float64)
230-
timestamp1s = ts_rows.timestamp1.values.astype(np.int64)
231-
timestamp2s = ts_rows.timestamp2.values.astype(np.int64)
230+
value_updated_timestamps = ts_rows.value_updated_timestamp.values.astype(np.int64)
231+
direction_updated_timestamps = ts_rows.direction_updated_timestamp.values.astype(np.int64)
232232

233233
# create a direction classifier for this signal
234234
data_stdev = data_stdevs[source][signal][geo_type]
@@ -240,7 +240,7 @@ def get_direction_impl(x, y):
240240

241241
# recompute any stale directions
242242
days, directions = direction_impl.scan_timeseries(
243-
offsets, days, values, timestamp1s, timestamp2s, get_direction_impl)
243+
offsets, days, values, value_updated_timestamps, direction_updated_timestamps, get_direction_impl)
244244

245245
if be_verbose:
246246
print(' computed %d direction updates' % len(directions))
@@ -264,8 +264,8 @@ def get_direction_impl(x, y):
264264
for v, id_list in changed_rows.items():
265265
database.batched_update_direction(v, id_list)
266266

267-
# Updating timestamp2
268-
database.update_timestamp2_from_temporary_table(tmp_table_name)
267+
# Updating direction_updated_timestamp
268+
database.update_direction_updated_timestamp_from_temporary_table(tmp_table_name)
269269
# Dropping temporary table
270270
database.drop_temporary_table(tmp_table_name)
271271

0 commit comments

Comments
 (0)