@@ -743,8 +743,8 @@ class ScopeStream extends async.Stream<ScopeEvent> {
743
743
final _Streams _streams;
744
744
final String _name;
745
745
final subscriptions = < ScopeStreamSubscription > [];
746
+ final List <Function > _work = < Function > [];
746
747
bool _firing = false ;
747
- List <ScopeStreamSubscription > _toRemove;
748
748
749
749
750
750
ScopeStream (this ._streams, this ._exceptionHandler, this ._name);
@@ -753,12 +753,21 @@ class ScopeStream extends async.Stream<ScopeEvent> {
753
753
{ Function onError,
754
754
void onDone (),
755
755
bool cancelOnError }) {
756
- if (subscriptions.isEmpty) _streams._addCount (_name, 1 );
757
756
var subscription = new ScopeStreamSubscription (this , onData);
758
- subscriptions.add (subscription);
757
+ _concurrentSafeWork (() {
758
+ if (subscriptions.isEmpty) _streams._addCount (_name, 1 );
759
+ subscriptions.add (subscription);
760
+ });
759
761
return subscription;
760
762
}
761
763
764
+ void _concurrentSafeWork ([fn]) {
765
+ if (fn != null ) _work.add (fn);
766
+ while (! _firing && _work.isNotEmpty) {
767
+ _work.removeLast ()();
768
+ }
769
+ }
770
+
762
771
void _fire (ScopeEvent event) {
763
772
_firing = true ;
764
773
try {
@@ -771,31 +780,19 @@ class ScopeStream extends async.Stream<ScopeEvent> {
771
780
}
772
781
} finally {
773
782
_firing = false ;
774
- if (_toRemove != null ) {
775
- _toRemove.forEach (_actuallyRemove);
776
- _toRemove = null ;
777
- }
783
+ _concurrentSafeWork ();
778
784
}
779
785
}
780
786
781
787
void _remove (ScopeStreamSubscription subscription) {
782
- if (_firing) {
783
- if (_toRemove == null ) {
784
- _toRemove = [];
788
+ _concurrentSafeWork (() {
789
+ assert (subscription._scopeStream == this );
790
+ if (subscriptions.remove (subscription)) {
791
+ if (subscriptions.isEmpty) _streams._addCount (_name, - 1 );
792
+ } else {
793
+ throw new StateError ('AlreadyCanceled' );
785
794
}
786
- _toRemove.add (subscription);
787
- } else {
788
- _actuallyRemove (subscription);
789
- }
790
- }
791
-
792
- void _actuallyRemove (ScopeStreamSubscription subscription) {
793
- assert (subscription._scopeStream == this );
794
- if (subscriptions.remove (subscription)) {
795
- if (subscriptions.isEmpty) _streams._addCount (_name, - 1 );
796
- } else {
797
- throw new StateError ('AlreadyCanceled' );
798
- }
795
+ });
799
796
}
800
797
}
801
798
0 commit comments