7
7
from urllib .parse import quote_plus as quote_as_url
8
8
9
9
import pandas as pd
10
+ import numpy as np
10
11
import requests
11
12
12
13
from delphi_utils .geomap import GeoMapper
@@ -457,14 +458,23 @@ def nation_from_state(df, sig, geomapper):
457
458
).drop (
458
459
"norm_denom" , axis = 1
459
460
)
461
+ # The filter in `fetch_new_reports` to keep most recent publish date gurantees
462
+ # that we'll only see one unique publish date per timestamp here.
463
+ publish_date_by_ts = df .groupby (
464
+ ["timestamp" ]
465
+ )["publish_date" ].apply (
466
+ lambda x : np .unique (x )[0 ]
467
+ ).reset_index (
468
+ )
460
469
df = geomapper .replace_geocode (
461
- df ,
470
+ df . drop ( "publish_date" , axis = 1 ) ,
462
471
'state_id' ,
463
472
'nation' ,
464
473
new_col = "geo_id"
465
474
)
466
475
df ["se" ] = None
467
476
df ["sample_size" ] = None
477
+ df = pd .merge (df , publish_date_by_ts , on = "timestamp" , how = "left" )
468
478
469
479
return df
470
480
@@ -483,8 +493,6 @@ def fetch_new_reports(params, logger=None):
483
493
"timestamp"
484
494
).apply (
485
495
lambda x : x [x ["publish_date" ] == x ["publish_date" ].max ()]
486
- ).drop (
487
- "publish_date" , axis = 1
488
496
).drop_duplicates (
489
497
)
490
498
@@ -496,7 +504,7 @@ def fetch_new_reports(params, logger=None):
496
504
).reset_index (
497
505
drop = True
498
506
) == 1 ), f"Duplicate rows in { sig } indicate that one or" \
499
- + " more reports was published multiple times and the copies differ"
507
+ + " more reports were published multiple times and the copies differ"
500
508
501
509
ret [sig ] = latest_sig_df
502
510
@@ -513,7 +521,25 @@ def fetch_new_reports(params, logger=None):
513
521
)
514
522
515
523
for key , df in ret .copy ().items ():
516
- (geo , sig , _ ) = key
524
+ (geo , sig , prop ) = key
525
+
526
+ if sig == "positivity" :
527
+ # Combine with test volume using publish date.
528
+ total_key = (geo , "total" , prop )
529
+ ret [key ] = unify_testing_sigs (
530
+ df , ret [total_key ]
531
+ ).drop (
532
+ "publish_date" , axis = 1
533
+ )
534
+
535
+ # No longer need "total" signal.
536
+ del ret [total_key ]
537
+ elif sig != "total" :
538
+ # If signal is not test volume or test positivity, we don't need
539
+ # publish date.
540
+ df = df .drop ("publish_date" , axis = 1 )
541
+ ret [key ] = df
542
+
517
543
if SIGNALS [sig ]["make_prop" ]:
518
544
ret [(geo , sig , IS_PROP )] = generate_prop_signal (df , geo , geomapper )
519
545
@@ -546,3 +572,113 @@ def generate_prop_signal(df, geo, geo_mapper):
546
572
df .drop (["population" , geo ], axis = 1 , inplace = True )
547
573
548
574
return df
575
+
576
+ def unify_testing_sigs (positivity_df , volume_df ):
577
+ """
578
+ Drop any observations with a sample size of 5 or less. Generate standard errors.
579
+
580
+ This combines test positivity and testing volume into a single signal,
581
+ where testing volume *from the same spreadsheet/publish date* (NOT the
582
+ same reference date) is used as the sample size for test positivity.
583
+
584
+ Total testing volume is typically provided for a 7-day period about 4 days
585
+ before the test positivity period. Since the CPR is only published on
586
+ weekdays, test positivity and test volume are only available for the same
587
+ reported dates 3 times a week. We have chosen to censor 7dav test
588
+ positivity based on the 7dav test volume provided in the same originating
589
+ spreadsheet, corresponding to a period ~4 days earlier.
590
+
591
+ This approach makes the signals maximally available (5 days per week) with
592
+ low latency. It avoids complications of having to process multiple
593
+ spreadsheets each day, and the fact that test positivity and test volume
594
+ are not available for all the same reference dates.
595
+
596
+ Discussion of decision and alternatives (Delphi-internal share drive):
597
+ https://docs.google.com/document/d/1MoIimdM_8OwG4SygoeQ9QEVZzIuDl339_a0xoYa6vuA/edit#
598
+
599
+ """
600
+ # Combine test positivity and test volume, maintaining "this week" and "previous week" status.
601
+ assert len (positivity_df .index ) == len (volume_df .index ), \
602
+ "Test positivity and volume data have different numbers of observations."
603
+ volume_df = add_max_ts_col (volume_df )[
604
+ ["geo_id" , "publish_date" , "val" , "is_max_group_ts" ]
605
+ ].rename (
606
+ columns = {"val" :"sample_size" }
607
+ )
608
+ col_order = list (positivity_df .columns )
609
+ positivity_df = add_max_ts_col (positivity_df ).drop (["sample_size" ], axis = 1 )
610
+
611
+ df = pd .merge (
612
+ positivity_df , volume_df ,
613
+ on = ["publish_date" , "geo_id" , "is_max_group_ts" ],
614
+ how = "left"
615
+ ).drop (
616
+ ["is_max_group_ts" ], axis = 1
617
+ )
618
+
619
+ # Drop everything with 5 or fewer total tests.
620
+ df = df .loc [df .sample_size > 5 ]
621
+
622
+ # Generate stderr.
623
+ df = df .assign (se = std_err (df ))
624
+
625
+ return df [col_order ]
626
+
627
+ def add_max_ts_col (df ):
628
+ """
629
+ Add column to differentiate timestamps for a given publish date-geo combo.
630
+
631
+ Each publish date is associated with two timestamps for test volume and
632
+ test positivity. The older timestamp corresponds to data from the
633
+ "previous week"; the newer timestamp corresponds to the "last week".
634
+
635
+ Since test volume and test positivity timestamps don't match exactly, we
636
+ can't use them to merge the two signals together, but we still need a way
637
+ to uniquely identify observations to avoid duplicating observations during
638
+ the join. This new column, which is analagous to the "last/previous week"
639
+ classification, is used to merge on.
640
+ """
641
+ assert all (
642
+ df .groupby (["publish_date" , "geo_id" ])["timestamp" ].count () == 2
643
+ ) and all (
644
+ df .groupby (["publish_date" , "geo_id" ])["timestamp" ].nunique () == 2
645
+ ), "Testing signals should have two unique timestamps per publish date-region combination."
646
+
647
+ max_ts_by_group = df .groupby (
648
+ ["publish_date" , "geo_id" ], as_index = False
649
+ )["timestamp" ].max (
650
+ ).rename (
651
+ columns = {"timestamp" :"max_timestamp" }
652
+ )
653
+ df = pd .merge (
654
+ df , max_ts_by_group ,
655
+ on = ["publish_date" , "geo_id" ],
656
+ how = "outer"
657
+ ).assign (
658
+ is_max_group_ts = lambda df : df ["timestamp" ] == df ["max_timestamp" ]
659
+ ).drop (
660
+ ["max_timestamp" ], axis = 1
661
+ )
662
+
663
+ return df
664
+
665
+ def std_err (df ):
666
+ """
667
+ Find Standard Error of a binomial proportion.
668
+
669
+ Assumes input sample_size are all > 0.
670
+
671
+ Parameters
672
+ ----------
673
+ df: pd.DataFrame
674
+ Columns: val, sample_size, ...
675
+
676
+ Returns
677
+ -------
678
+ pd.Series
679
+ Standard error of the positivity rate of PCR-specimen tests.
680
+ """
681
+ assert all (df .sample_size > 0 ), "Sample sizes must be greater than 0"
682
+ p = df .val
683
+ n = df .sample_size
684
+ return np .sqrt (p * (1 - p ) / n )
0 commit comments