@@ -194,9 +194,16 @@ public OffsetMetadataManager build() {
194
194
195
195
/**
196
196
* The open transactions (producer ids) keyed by group.
197
+ * Tracks whether groups have any open transactions.
197
198
*/
198
199
private final TimelineHashMap <String , TimelineHashSet <Long >> openTransactionsByGroup ;
199
200
201
+ /**
202
+ * The open transactions (producer ids) keyed by group id, topic name and partition id.
203
+ * Tracks whether partitions have any pending transactional offsets.
204
+ */
205
+ private final TimelineHashMap <String , TimelineHashMap <String , TimelineHashMap <Integer , TimelineHashSet <Long >>>> openTransactionsByGroupTopicAndPartition ;
206
+
200
207
private class Offsets {
201
208
/**
202
209
* The offsets keyed by group id, topic name and partition id.
@@ -281,6 +288,7 @@ private OffsetAndMetadata remove(
281
288
this .offsets = new Offsets ();
282
289
this .pendingTransactionalOffsets = new TimelineHashMap <>(snapshotRegistry , 0 );
283
290
this .openTransactionsByGroup = new TimelineHashMap <>(snapshotRegistry , 0 );
291
+ this .openTransactionsByGroupTopicAndPartition = new TimelineHashMap <>(snapshotRegistry , 0 );
284
292
}
285
293
286
294
/**
@@ -650,24 +658,18 @@ public int deleteAllOffsets(
650
658
// Delete all the pending transactional offsets too. Here we only write a tombstone
651
659
// if the topic-partition was not in the main storage because we don't need to write
652
660
// two consecutive tombstones.
653
- TimelineHashSet <Long > openTransactions = openTransactionsByGroup .get (groupId );
654
- if (openTransactions != null ) {
655
- openTransactions .forEach (producerId -> {
656
- Offsets pendingOffsets = pendingTransactionalOffsets .get (producerId );
657
- if (pendingOffsets != null ) {
658
- TimelineHashMap <String , TimelineHashMap <Integer , OffsetAndMetadata >> pendingGroupOffsets =
659
- pendingOffsets .offsetsByGroup .get (groupId );
660
- if (pendingGroupOffsets != null ) {
661
- pendingGroupOffsets .forEach ((topic , offsetsByPartition ) -> {
662
- offsetsByPartition .keySet ().forEach (partition -> {
663
- if (!hasCommittedOffset (groupId , topic , partition )) {
664
- records .add (GroupCoordinatorRecordHelpers .newOffsetCommitTombstoneRecord (groupId , topic , partition ));
665
- numDeletedOffsets .getAndIncrement ();
666
- }
667
- });
668
- });
669
- }
670
- }
661
+ TimelineHashMap <String , TimelineHashMap <Integer , TimelineHashSet <Long >>> openTransactionsByTopic =
662
+ openTransactionsByGroupTopicAndPartition .get (groupId );
663
+ if (openTransactionsByTopic != null ) {
664
+ openTransactionsByTopic .forEach ((topic , openTransactionsByPartition ) -> {
665
+ openTransactionsByPartition .forEach ((partition , producerIds ) -> {
666
+ producerIds .forEach (producerId -> {
667
+ if (!hasCommittedOffset (groupId , topic , partition )) {
668
+ records .add (GroupCoordinatorRecordHelpers .newOffsetCommitTombstoneRecord (groupId , topic , partition ));
669
+ numDeletedOffsets .getAndIncrement ();
670
+ }
671
+ });
672
+ });
671
673
});
672
674
}
673
675
@@ -685,17 +687,15 @@ boolean hasPendingTransactionalOffsets(
685
687
String topic ,
686
688
int partition
687
689
) {
688
- final TimelineHashSet <Long > openTransactions = openTransactionsByGroup .get (groupId );
689
- if (openTransactions == null ) return false ;
690
+ TimelineHashMap <String , TimelineHashMap <Integer , TimelineHashSet <Long >>> openTransactionsByTopic =
691
+ openTransactionsByGroupTopicAndPartition .get (groupId );
692
+ if (openTransactionsByTopic == null ) return false ;
690
693
691
- for (Long producerId : openTransactions ) {
692
- Offsets offsets = pendingTransactionalOffsets .get (producerId );
693
- if (offsets != null && offsets .get (groupId , topic , partition ) != null ) {
694
- return true ;
695
- }
696
- }
694
+ TimelineHashMap <Integer , TimelineHashSet <Long >> openTransactionsByPartition = openTransactionsByTopic .get (topic );
695
+ if (openTransactionsByPartition == null ) return false ;
697
696
698
- return false ;
697
+ TimelineHashSet <Long > openTransactions = openTransactionsByPartition .get (partition );
698
+ return openTransactions != null && !openTransactions .isEmpty ();
699
699
}
700
700
701
701
/**
@@ -1005,21 +1005,41 @@ public void replay(
1005
1005
openTransactionsByGroup
1006
1006
.computeIfAbsent (groupId , __ -> new TimelineHashSet <>(snapshotRegistry , 1 ))
1007
1007
.add (producerId );
1008
+ openTransactionsByGroupTopicAndPartition
1009
+ .computeIfAbsent (groupId , __ -> new TimelineHashMap <>(snapshotRegistry , 1 ))
1010
+ .computeIfAbsent (topic , __ -> new TimelineHashMap <>(snapshotRegistry , 1 ))
1011
+ .computeIfAbsent (partition , __ -> new TimelineHashSet <>(snapshotRegistry , 1 ))
1012
+ .add (producerId );
1008
1013
}
1009
1014
} else {
1010
1015
if (offsets .remove (groupId , topic , partition ) != null ) {
1011
1016
metrics .decrementNumOffsets ();
1012
1017
}
1013
1018
1014
1019
// Remove all the pending offset commits related to the tombstone.
1015
- TimelineHashSet <Long > openTransactions = openTransactionsByGroup .get (groupId );
1016
- if (openTransactions != null ) {
1017
- openTransactions .forEach (openProducerId -> {
1018
- Offsets pendingOffsets = pendingTransactionalOffsets .get (openProducerId );
1019
- if (pendingOffsets != null ) {
1020
- pendingOffsets .remove (groupId , topic , partition );
1020
+ TimelineHashMap <String , TimelineHashMap <Integer , TimelineHashSet <Long >>> openTransactionsByTopic =
1021
+ openTransactionsByGroupTopicAndPartition .get (groupId );
1022
+ if (openTransactionsByTopic != null ) {
1023
+ TimelineHashMap <Integer , TimelineHashSet <Long >> openTransactionsByPartition = openTransactionsByTopic .get (topic );
1024
+ if (openTransactionsByPartition != null ) {
1025
+ TimelineHashSet <Long > openTransactions = openTransactionsByPartition .get (partition );
1026
+ if (openTransactions != null ) {
1027
+ openTransactions .forEach (openProducerId -> {
1028
+ Offsets pendingOffsets = pendingTransactionalOffsets .get (openProducerId );
1029
+ if (pendingOffsets != null ) {
1030
+ pendingOffsets .remove (groupId , topic , partition );
1031
+ }
1032
+ });
1033
+
1034
+ openTransactionsByPartition .remove (partition );
1035
+ if (openTransactionsByPartition .isEmpty ()) {
1036
+ openTransactionsByTopic .remove (topic );
1037
+ }
1038
+ if (openTransactionsByTopic .isEmpty ()) {
1039
+ openTransactionsByGroupTopicAndPartition .remove (groupId );
1040
+ }
1021
1041
}
1022
- });
1042
+ }
1023
1043
}
1024
1044
}
1025
1045
}
@@ -1031,6 +1051,7 @@ public void replay(
1031
1051
* @param result The result of the transaction.
1032
1052
* @throws RuntimeException if the transaction can not be completed.
1033
1053
*/
1054
+ @ SuppressWarnings ("NPathComplexity" )
1034
1055
public void replayEndTransactionMarker (
1035
1056
long producerId ,
1036
1057
TransactionResult result
@@ -1043,14 +1064,39 @@ public void replayEndTransactionMarker(
1043
1064
return ;
1044
1065
}
1045
1066
1046
- pendingOffsets .offsetsByGroup .keySet (). forEach (groupId -> {
1047
- TimelineHashSet <Long > openTransactions = openTransactionsByGroup .get (groupId );
1048
- if (openTransactions != null ) {
1049
- openTransactions .remove (producerId );
1050
- if (openTransactions .isEmpty ()) {
1067
+ pendingOffsets .offsetsByGroup .forEach (( groupId , topicOffsets ) -> {
1068
+ TimelineHashSet <Long > groupTransactions = openTransactionsByGroup .get (groupId );
1069
+ if (groupTransactions != null ) {
1070
+ groupTransactions .remove (producerId );
1071
+ if (groupTransactions .isEmpty ()) {
1051
1072
openTransactionsByGroup .remove (groupId );
1052
1073
}
1053
1074
}
1075
+
1076
+ TimelineHashMap <String , TimelineHashMap <Integer , TimelineHashSet <Long >>> openTransactionsByTopic =
1077
+ openTransactionsByGroupTopicAndPartition .get (groupId );
1078
+ if (openTransactionsByTopic == null ) return ;
1079
+
1080
+ topicOffsets .forEach ((topic , partitionOffsets ) -> {
1081
+ TimelineHashMap <Integer , TimelineHashSet <Long >> openTransactionsByPartition = openTransactionsByTopic .get (topic );
1082
+ if (openTransactionsByPartition == null ) return ;
1083
+
1084
+ partitionOffsets .keySet ().forEach (partitionId -> {
1085
+ TimelineHashSet <Long > partitionTransactions = openTransactionsByPartition .get (partitionId );
1086
+ if (partitionTransactions != null ) {
1087
+ partitionTransactions .remove (producerId );
1088
+ if (partitionTransactions .isEmpty ()) {
1089
+ openTransactionsByPartition .remove (partitionId );
1090
+ }
1091
+ if (openTransactionsByPartition .isEmpty ()) {
1092
+ openTransactionsByTopic .remove (topic );
1093
+ }
1094
+ if (openTransactionsByTopic .isEmpty ()) {
1095
+ openTransactionsByGroupTopicAndPartition .remove (groupId );
1096
+ }
1097
+ }
1098
+ });
1099
+ });
1054
1100
});
1055
1101
1056
1102
if (result == TransactionResult .COMMIT ) {
0 commit comments