@@ -340,6 +340,58 @@ def _encode_locations(timestamp, code_locations):
340
340
}
341
341
342
342
343
+ class LocalAggregator (object ):
344
+ __slots__ = ("_measurements" ,)
345
+
346
+ def __init__ (self ):
347
+ # type: (...) -> None
348
+ self ._measurements = (
349
+ {}
350
+ ) # type: Dict[Tuple[str, MetricTagsInternal], Tuple[float, float, int, float]]
351
+
352
+ def add (
353
+ self ,
354
+ ty , # type: MetricType
355
+ key , # type: str
356
+ value , # type: float
357
+ unit , # type: MeasurementUnit
358
+ tags , # type: MetricTagsInternal
359
+ ):
360
+ # type: (...) -> None
361
+ export_key = "%s:%s@%s" % (ty , key , unit )
362
+ bucket_key = (export_key , tags )
363
+
364
+ old = self ._measurements .get (bucket_key )
365
+ if old is not None :
366
+ v_min , v_max , v_count , v_sum = old
367
+ v_min = min (v_min , value )
368
+ v_max = max (v_max , value )
369
+ v_count += 1
370
+ v_sum += value
371
+ else :
372
+ v_min = v_max = v_sum = value
373
+ v_count = 1
374
+ self ._measurements [bucket_key ] = (v_min , v_max , v_count , v_sum )
375
+
376
+ def to_json (self ):
377
+ # type: (...) -> Dict[str, Any]
378
+ rv = {}
379
+ for (export_key , tags ), (
380
+ v_min ,
381
+ v_max ,
382
+ v_count ,
383
+ v_sum ,
384
+ ) in self ._measurements .items ():
385
+ rv [export_key ] = {
386
+ "tags" : _tags_to_dict (tags ),
387
+ "min" : v_min ,
388
+ "max" : v_max ,
389
+ "count" : v_count ,
390
+ "sum" : v_sum ,
391
+ }
392
+ return rv
393
+
394
+
343
395
class MetricsAggregator (object ):
344
396
ROLLUP_IN_SECONDS = 10.0
345
397
MAX_WEIGHT = 100000
@@ -455,11 +507,12 @@ def add(
455
507
unit , # type: MeasurementUnit
456
508
tags , # type: Optional[MetricTags]
457
509
timestamp = None , # type: Optional[Union[float, datetime]]
510
+ local_aggregator = None , # type: Optional[LocalAggregator]
458
511
stacklevel = 0 , # type: int
459
512
):
460
513
# type: (...) -> None
461
514
if not self ._ensure_thread () or self ._flusher is None :
462
- return
515
+ return None
463
516
464
517
if timestamp is None :
465
518
timestamp = time .time ()
@@ -469,11 +522,12 @@ def add(
469
522
bucket_timestamp = int (
470
523
(timestamp // self .ROLLUP_IN_SECONDS ) * self .ROLLUP_IN_SECONDS
471
524
)
525
+ serialized_tags = _serialize_tags (tags )
472
526
bucket_key = (
473
527
ty ,
474
528
key ,
475
529
unit ,
476
- self . _serialize_tags ( tags ) ,
530
+ serialized_tags ,
477
531
)
478
532
479
533
with self ._lock :
@@ -486,7 +540,8 @@ def add(
486
540
metric = local_buckets [bucket_key ] = METRIC_TYPES [ty ](value )
487
541
previous_weight = 0
488
542
489
- self ._buckets_total_weight += metric .weight - previous_weight
543
+ added = metric .weight - previous_weight
544
+ self ._buckets_total_weight += added
490
545
491
546
# Store code location once per metric and per day (of bucket timestamp)
492
547
if self ._enable_code_locations :
@@ -509,6 +564,10 @@ def add(
509
564
# Given the new weight we consider whether we want to force flush.
510
565
self ._consider_force_flush ()
511
566
567
+ if local_aggregator is not None :
568
+ local_value = float (added if ty == "s" else value )
569
+ local_aggregator .add (ty , key , local_value , unit , serialized_tags )
570
+
512
571
def kill (self ):
513
572
# type: (...) -> None
514
573
if self ._flusher is None :
@@ -554,55 +613,87 @@ def _emit(
554
613
return envelope
555
614
return None
556
615
557
- def _serialize_tags (
558
- self , tags # type: Optional[MetricTags]
559
- ):
560
- # type: (...) -> MetricTagsInternal
561
- if not tags :
562
- return ()
563
-
564
- rv = []
565
- for key , value in iteritems (tags ):
566
- # If the value is a collection, we want to flatten it.
567
- if isinstance (value , (list , tuple )):
568
- for inner_value in value :
569
- if inner_value is not None :
570
- rv .append ((key , text_type (inner_value )))
571
- elif value is not None :
572
- rv .append ((key , text_type (value )))
573
616
574
- # It's very important to sort the tags in order to obtain the
575
- # same bucket key.
576
- return tuple (sorted (rv ))
617
+ def _serialize_tags (
618
+ tags , # type: Optional[MetricTags]
619
+ ):
620
+ # type: (...) -> MetricTagsInternal
621
+ if not tags :
622
+ return ()
623
+
624
+ rv = []
625
+ for key , value in iteritems (tags ):
626
+ # If the value is a collection, we want to flatten it.
627
+ if isinstance (value , (list , tuple )):
628
+ for inner_value in value :
629
+ if inner_value is not None :
630
+ rv .append ((key , text_type (inner_value )))
631
+ elif value is not None :
632
+ rv .append ((key , text_type (value )))
633
+
634
+ # It's very important to sort the tags in order to obtain the
635
+ # same bucket key.
636
+ return tuple (sorted (rv ))
637
+
638
+
639
+ def _tags_to_dict (tags ):
640
+ # type: (MetricTagsInternal) -> Dict[str, Any]
641
+ rv = {} # type: Dict[str, Any]
642
+ for tag_name , tag_value in tags :
643
+ old_value = rv .get (tag_name )
644
+ if old_value is not None :
645
+ if isinstance (old_value , list ):
646
+ old_value .append (tag_value )
647
+ else :
648
+ rv [tag_name ] = [old_value , tag_value ]
649
+ else :
650
+ rv [tag_name ] = tag_value
651
+ return rv
577
652
578
653
579
654
def _get_aggregator_and_update_tags (key , tags ):
580
- # type: (str, Optional[MetricTags]) -> Tuple[Optional[MetricsAggregator], Optional[MetricTags]]
655
+ # type: (str, Optional[MetricTags]) -> Tuple[Optional[MetricsAggregator], Optional[LocalAggregator], Optional[ MetricTags]]
581
656
"""Returns the current metrics aggregator if there is one."""
582
657
hub = sentry_sdk .Hub .current
583
658
client = hub .client
584
659
if client is None or client .metrics_aggregator is None :
585
- return None , tags
660
+ return None , None , tags
661
+
662
+ experiments = client .options .get ("_experiments" , {})
586
663
587
664
updated_tags = dict (tags or ()) # type: Dict[str, MetricTagValue]
588
665
updated_tags .setdefault ("release" , client .options ["release" ])
589
666
updated_tags .setdefault ("environment" , client .options ["environment" ])
590
667
591
668
scope = hub .scope
669
+ local_aggregator = None
670
+
671
+ # We go with the low-level API here to access transaction information as
672
+ # this one is the same between just errors and errors + performance
592
673
transaction_source = scope ._transaction_info .get ("source" )
593
674
if transaction_source in GOOD_TRANSACTION_SOURCES :
594
- transaction = scope ._transaction
595
- if transaction :
596
- updated_tags .setdefault ("transaction" , transaction )
675
+ transaction_name = scope ._transaction
676
+ if transaction_name :
677
+ updated_tags .setdefault ("transaction" , transaction_name )
678
+ if scope ._span is not None :
679
+ sample_rate = experiments .get ("metrics_summary_sample_rate" ) or 0.0
680
+ should_summarize_metric_callback = experiments .get (
681
+ "should_summarize_metric"
682
+ )
683
+ if random .random () < sample_rate and (
684
+ should_summarize_metric_callback is None
685
+ or should_summarize_metric_callback (key , updated_tags )
686
+ ):
687
+ local_aggregator = scope ._span ._get_local_aggregator ()
597
688
598
- callback = client . options . get ( "_experiments" , {}) .get ("before_emit_metric" )
599
- if callback is not None :
689
+ before_emit_callback = experiments .get ("before_emit_metric" )
690
+ if before_emit_callback is not None :
600
691
with recursion_protection () as in_metrics :
601
692
if not in_metrics :
602
- if not callback (key , updated_tags ):
603
- return None , updated_tags
693
+ if not before_emit_callback (key , updated_tags ):
694
+ return None , None , updated_tags
604
695
605
- return client .metrics_aggregator , updated_tags
696
+ return client .metrics_aggregator , local_aggregator , updated_tags
606
697
607
698
608
699
def incr (
@@ -615,9 +706,11 @@ def incr(
615
706
):
616
707
# type: (...) -> None
617
708
"""Increments a counter."""
618
- aggregator , tags = _get_aggregator_and_update_tags (key , tags )
709
+ aggregator , local_aggregator , tags = _get_aggregator_and_update_tags (key , tags )
619
710
if aggregator is not None :
620
- aggregator .add ("c" , key , value , unit , tags , timestamp , stacklevel )
711
+ aggregator .add (
712
+ "c" , key , value , unit , tags , timestamp , local_aggregator , stacklevel
713
+ )
621
714
622
715
623
716
class _Timing (object ):
@@ -637,6 +730,7 @@ def __init__(
637
730
self .value = value
638
731
self .unit = unit
639
732
self .entered = None # type: Optional[float]
733
+ self ._span = None # type: Optional[sentry_sdk.tracing.Span]
640
734
self .stacklevel = stacklevel
641
735
642
736
def _validate_invocation (self , context ):
@@ -650,17 +744,37 @@ def __enter__(self):
650
744
# type: (...) -> _Timing
651
745
self .entered = TIMING_FUNCTIONS [self .unit ]()
652
746
self ._validate_invocation ("context-manager" )
747
+ self ._span = sentry_sdk .start_span (op = "metric.timing" , description = self .key )
748
+ if self .tags :
749
+ for key , value in self .tags .items ():
750
+ if isinstance (value , (tuple , list )):
751
+ value = "," .join (sorted (map (str , value )))
752
+ self ._span .set_tag (key , value )
753
+ self ._span .__enter__ ()
653
754
return self
654
755
655
756
def __exit__ (self , exc_type , exc_value , tb ):
656
757
# type: (Any, Any, Any) -> None
657
- aggregator , tags = _get_aggregator_and_update_tags (self .key , self .tags )
758
+ assert self ._span , "did not enter"
759
+ aggregator , local_aggregator , tags = _get_aggregator_and_update_tags (
760
+ self .key , self .tags
761
+ )
658
762
if aggregator is not None :
659
763
elapsed = TIMING_FUNCTIONS [self .unit ]() - self .entered # type: ignore
660
764
aggregator .add (
661
- "d" , self .key , elapsed , self .unit , tags , self .timestamp , self .stacklevel
765
+ "d" ,
766
+ self .key ,
767
+ elapsed ,
768
+ self .unit ,
769
+ tags ,
770
+ self .timestamp ,
771
+ local_aggregator ,
772
+ self .stacklevel ,
662
773
)
663
774
775
+ self ._span .__exit__ (exc_type , exc_value , tb )
776
+ self ._span = None
777
+
664
778
def __call__ (self , f ):
665
779
# type: (Any) -> Any
666
780
self ._validate_invocation ("decorator" )
@@ -698,9 +812,11 @@ def timing(
698
812
- it can be used as a decorator
699
813
"""
700
814
if value is not None :
701
- aggregator , tags = _get_aggregator_and_update_tags (key , tags )
815
+ aggregator , local_aggregator , tags = _get_aggregator_and_update_tags (key , tags )
702
816
if aggregator is not None :
703
- aggregator .add ("d" , key , value , unit , tags , timestamp , stacklevel )
817
+ aggregator .add (
818
+ "d" , key , value , unit , tags , timestamp , local_aggregator , stacklevel
819
+ )
704
820
return _Timing (key , tags , timestamp , value , unit , stacklevel )
705
821
706
822
@@ -714,9 +830,11 @@ def distribution(
714
830
):
715
831
# type: (...) -> None
716
832
"""Emits a distribution."""
717
- aggregator , tags = _get_aggregator_and_update_tags (key , tags )
833
+ aggregator , local_aggregator , tags = _get_aggregator_and_update_tags (key , tags )
718
834
if aggregator is not None :
719
- aggregator .add ("d" , key , value , unit , tags , timestamp , stacklevel )
835
+ aggregator .add (
836
+ "d" , key , value , unit , tags , timestamp , local_aggregator , stacklevel
837
+ )
720
838
721
839
722
840
def set (
@@ -729,21 +847,25 @@ def set(
729
847
):
730
848
# type: (...) -> None
731
849
"""Emits a set."""
732
- aggregator , tags = _get_aggregator_and_update_tags (key , tags )
850
+ aggregator , local_aggregator , tags = _get_aggregator_and_update_tags (key , tags )
733
851
if aggregator is not None :
734
- aggregator .add ("s" , key , value , unit , tags , timestamp , stacklevel )
852
+ aggregator .add (
853
+ "s" , key , value , unit , tags , timestamp , local_aggregator , stacklevel
854
+ )
735
855
736
856
737
857
def gauge (
738
858
key , # type: str
739
859
value , # type: float
740
- unit = "none" , # type: MetricValue
860
+ unit = "none" , # type: MeasurementUnit
741
861
tags = None , # type: Optional[MetricTags]
742
862
timestamp = None , # type: Optional[Union[float, datetime]]
743
863
stacklevel = 0 , # type: int
744
864
):
745
865
# type: (...) -> None
746
866
"""Emits a gauge."""
747
- aggregator , tags = _get_aggregator_and_update_tags (key , tags )
867
+ aggregator , local_aggregator , tags = _get_aggregator_and_update_tags (key , tags )
748
868
if aggregator is not None :
749
- aggregator .add ("g" , key , value , unit , tags , timestamp , stacklevel )
869
+ aggregator .add (
870
+ "g" , key , value , unit , tags , timestamp , local_aggregator , stacklevel
871
+ )
0 commit comments