18
18
19
19
import java .lang .reflect .Type ;
20
20
import java .util .ArrayList ;
21
- import java .util .Collections ;
22
21
import java .util .Date ;
23
22
import java .util .List ;
24
23
import java .util .Map ;
@@ -549,7 +548,7 @@ private class ReceiptHandler implements Receiptable {
549
548
550
549
private final List <Consumer <StompHeaders >> receiptCallbacks = new ArrayList <>(2 );
551
550
552
- private final List <Consumer < StompHeaders > > receiptLostCallbacks = new ArrayList <>(2 );
551
+ private final List <Runnable > receiptLostCallbacks = new ArrayList <>(2 );
553
552
554
553
@ Nullable
555
554
private ScheduledFuture <?> future ;
@@ -582,44 +581,34 @@ public String getReceiptId() {
582
581
583
582
@ Override
584
583
public void addReceiptTask (Runnable task ) {
585
- addTask ( h -> task .run (), true );
584
+ addReceiptTask ( headers -> task .run ());
586
585
}
587
586
588
587
@ Override
589
588
public void addReceiptTask (Consumer <StompHeaders > task ) {
590
- addTask (task , true );
591
- }
592
-
593
- @ Override
594
- public void addReceiptLostTask (Runnable task ) {
595
- addTask (h -> task .run (), false );
596
- }
597
-
598
- private void addTask (Consumer <StompHeaders > task , boolean successTask ) {
599
- Assert .notNull (this .receiptId ,
600
- "To track receipts, set autoReceiptEnabled=true or add 'receiptId' header" );
589
+ Assert .notNull (this .receiptId , "Set autoReceiptEnabled to track receipts or add a 'receiptId' header" );
601
590
synchronized (this ) {
602
- if (this .result != null && this .result == successTask ) {
603
- invoke (Collections .singletonList (task ));
591
+ if (this .result != null ) {
592
+ if (this .result ) {
593
+ task .accept (this .receiptHeaders );
594
+ }
604
595
}
605
596
else {
606
- if (successTask ) {
607
- this .receiptCallbacks .add (task );
608
- }
609
- else {
610
- this .receiptLostCallbacks .add (task );
611
- }
597
+ this .receiptCallbacks .add (task );
612
598
}
613
599
}
614
600
}
615
601
616
- private void invoke (List <Consumer <StompHeaders >> callbacks ) {
617
- for (Consumer <StompHeaders > consumer : callbacks ) {
618
- try {
619
- consumer .accept (this .receiptHeaders );
602
+ @ Override
603
+ public void addReceiptLostTask (Runnable task ) {
604
+ synchronized (this ) {
605
+ if (this .result != null ) {
606
+ if (!this .result ) {
607
+ task .run ();
608
+ }
620
609
}
621
- catch ( Throwable ex ) {
622
- // ignore
610
+ else {
611
+ this . receiptLostCallbacks . add ( task );
623
612
}
624
613
}
625
614
}
@@ -639,13 +628,33 @@ private void handleInternal(boolean result, @Nullable StompHeaders receiptHeader
639
628
}
640
629
this .result = result ;
641
630
this .receiptHeaders = receiptHeaders ;
642
- invoke (result ? this .receiptCallbacks : this .receiptLostCallbacks );
631
+ if (result ) {
632
+ this .receiptCallbacks .forEach (consumer -> {
633
+ try {
634
+ consumer .accept (this .receiptHeaders );
635
+ }
636
+ catch (Throwable ex ) {
637
+ // ignore
638
+ }
639
+ });
640
+ }
641
+ else {
642
+ this .receiptLostCallbacks .forEach (task -> {
643
+ try {
644
+ task .run ();
645
+ }
646
+ catch (Throwable ex ) {
647
+ // ignore
648
+ }
649
+ });
650
+ }
643
651
DefaultStompSession .this .receiptHandlers .remove (this .receiptId );
644
652
if (this .future != null ) {
645
653
this .future .cancel (true );
646
654
}
647
655
}
648
656
}
657
+
649
658
}
650
659
651
660
0 commit comments