@@ -458,9 +458,12 @@ def handle_export():
458
458
source_signal_pairs , alias_mapper = create_source_signal_alias_mapper (source_signal_pairs )
459
459
start_day , is_day = parse_day_or_week_arg ("start_day" , 202001 if weekly_signals > 0 else 20200401 )
460
460
end_day , is_end_day = parse_day_or_week_arg ("end_day" , 202020 if weekly_signals > 0 else 20200901 )
461
+ time_window = (start_day , end_day )
461
462
if is_day != is_end_day :
462
463
raise ValidationFailedException ("mixing weeks with day arguments" )
463
464
_verify_argument_time_type_matches (is_day , daily_signals , weekly_signals )
465
+ transform_args = parse_transform_args ()
466
+ jit_bypass = parse_jit_bypass ()
464
467
465
468
geo_type = request .args .get ("geo_type" , "county" )
466
469
geo_values = request .args .get ("geo_values" , "*" )
@@ -472,13 +475,22 @@ def handle_export():
472
475
if is_day != is_as_of_day :
473
476
raise ValidationFailedException ("mixing weeks with day arguments" )
474
477
478
+ use_server_side_compute = all ([is_day , is_end_day ]) and JIT_COMPUTE and not jit_bypass
479
+ if use_server_side_compute :
480
+ pad_length = get_pad_length (source_signal_pairs , transform_args .get ("smoother_window_length" ))
481
+ source_signal_pairs , row_transform_generator = get_basename_signals (source_signal_pairs )
482
+ time_window = pad_time_window (time_window , pad_length )
483
+
475
484
# build query
476
485
q = QueryBuilder ("covidcast" , "t" )
477
486
478
- q .set_fields (["geo_value" , "signal" , "time_value" , "issue" , "lag" , "value" , "stderr" , "sample_size" , "geo_type" , "source" ], [], [])
487
+ fields_string = ["geo_value" , "signal" , "geo_type" , "source" ]
488
+ fields_int = ["time_value" , "issue" , "lag" ]
489
+ fields_float = ["value" , "stderr" , "sample_size" ]
490
+ q .set_fields (fields_string + fields_int + fields_float , [], [])
479
491
q .set_order ("time_value" , "geo_value" )
480
492
q .where_source_signal_pairs ("source" , "signal" , source_signal_pairs )
481
- q .where_time_pairs ("time_type" , "time_value" , [TimePair ("day" if is_day else "week" , [( start_day , end_day ) ])])
493
+ q .where_time_pairs ("time_type" , "time_value" , [TimePair ("day" if is_day else "week" , [time_window ])])
482
494
q .where_geo_pairs ("geo_type" , "geo_value" , [GeoPair (geo_type , True if geo_values == "*" else geo_values )])
483
495
484
496
_handle_lag_issues_as_of (q , None , None , as_of )
@@ -489,7 +501,7 @@ def handle_export():
489
501
filename = "covidcast-{source}-{signal}-{start_day}-to-{end_day}{as_of}" .format (source = source , signal = signal , start_day = format_date (start_day ), end_day = format_date (end_day ), as_of = as_of_str )
490
502
p = CSVPrinter (filename )
491
503
492
- def parse_row (i , row ):
504
+ def parse_csv_row (i , row ):
493
505
# '',geo_value,signal,{time_value,issue},lag,value,stderr,sample_size,geo_type,data_source
494
506
return {
495
507
"" : i ,
@@ -505,10 +517,20 @@ def parse_row(i, row):
505
517
"data_source" : alias_mapper (row ["source" ], row ["signal" ]) if alias_mapper else row ["source" ],
506
518
}
507
519
508
- def gen (first_row , rows ):
509
- yield parse_row (0 , first_row )
520
+ if use_server_side_compute :
521
+ def gen_transform (rows ):
522
+ parsed_rows = (parse_row (row , fields_string , fields_int , fields_float ) for row in rows )
523
+ transformed_rows = row_transform_generator (parsed_rows = parsed_rows , time_pairs = [TimePair ("day" , [time_window ])], transform_args = transform_args )
524
+ for row in transformed_rows :
525
+ yield row
526
+ else :
527
+ def gen_transform (rows ):
528
+ for row in rows :
529
+ yield row
530
+
531
+ def gen_parse (rows ):
510
532
for i , row in enumerate (rows ):
511
- yield parse_row ( i + 1 , row )
533
+ yield parse_csv_row ( i , row )
512
534
513
535
# execute query
514
536
try :
@@ -517,14 +539,15 @@ def gen(first_row, rows):
517
539
raise DatabaseErrorException (str (e ))
518
540
519
541
# special case for no data to be compatible with the CSV server
520
- first_row = next (r , None )
542
+ transformed_query = peekable (gen_transform (r ))
543
+ first_row = transformed_query .peek (None )
521
544
if not first_row :
522
545
return "No matching data found for signal {source}:{signal} " "at {geo} level from {start_day} to {end_day}, as of {as_of}." .format (
523
546
source = source , signal = signal , geo = geo_type , start_day = format_date (start_day ), end_day = format_date (end_day ), as_of = (date .today ().isoformat () if as_of is None else format_date (as_of ))
524
547
)
525
548
526
549
# now use a generator for sending the rows and execute all the other queries
527
- return p (gen ( first_row , r ))
550
+ return p (gen_parse ( transformed_query ))
528
551
529
552
530
553
@bp .route ("/backfill" , methods = ("GET" , "POST" ))
0 commit comments