Skip to content

Commit 72fd9c7

Browse files
committed
Update to use subclassing to avoid so much change
1 parent 6604b68 commit 72fd9c7

File tree

2 files changed

+86
-93
lines changed

2 files changed

+86
-93
lines changed

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java

+58-81
Original file line numberDiff line numberDiff line change
@@ -120,76 +120,6 @@ public class TransactionManager {
120120
private final Set<TopicPartition> newPartitionsInTransaction;
121121
private final Set<TopicPartition> pendingPartitionsInTransaction;
122122
private final Set<TopicPartition> partitionsInTransaction;
123-
124-
/**
125-
* During its normal course of operations, the transaction manager transitions through different internal
126-
* states (i.e. by updating {@link #currentState}) to one of those defined in {@link State}. These state transitions
127-
* result from actions on one of the following classes of threads:
128-
*
129-
* <ul>
130-
* <li><em>Application</em> threads that invokes {@link Producer} API calls</li>
131-
* <li><em>{@link Sender}</em> thread operations</li>
132-
* </ul>
133-
*
134-
* When an invalid state transition is detected during execution on an <em>application</em> thread, the
135-
* {@link #currentState} is <em>not updated</em> and an {@link IllegalStateException} is thrown. This gives the
136-
* application the opportunity to fix the issue without permanently poisoning the state of the
137-
* transaction manager. The {@link Producer} API calls that perform a state transition include:
138-
*
139-
* <ul>
140-
* <li>{@link Producer#initTransactions()} calls {@link #initializeTransactions()}</li>
141-
* <li>{@link Producer#beginTransaction()} calls {@link #beginTransaction()}</li>
142-
* <li>{@link Producer#commitTransaction()}} calls {@link #beginCommit()}</li>
143-
* <li>{@link Producer#abortTransaction()} calls {@link #beginAbort()}
144-
* </li>
145-
* <li>{@link Producer#sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} calls
146-
* {@link #sendOffsetsToTransaction(Map, ConsumerGroupMetadata)}
147-
* </li>
148-
* <li>{@link Producer#send(ProducerRecord)} (and its variants) calls
149-
* {@link #maybeAddPartition(TopicPartition)} and
150-
* {@link #maybeTransitionToErrorState(RuntimeException)}
151-
* </li>
152-
* </ul>
153-
*
154-
* <p/>
155-
*
156-
* The {@link Producer} is implemented such that much of its work delegated to and performed asynchronously on the
157-
* <em>{@link Sender}</em> thread. This includes record batching, network I/O, broker response handlers, etc. If an
158-
* invalid state transition is detected in the <em>{@link Sender}</em> thread, in addition to throwing an
159-
* {@link IllegalStateException}, the transaction manager intentionally "poisons" itself by setting its
160-
* {@link #currentState} to {@link State#FATAL_ERROR}, a state from which it cannot recover.
161-
*
162-
* <p/>
163-
*
164-
* It's important to prevent possible corruption when the transaction manager has determined that it is in a
165-
* fatal state. Subsequent transaction operations attempted via either the <em>application</em> or the
166-
* <em>{@link Sender}</em> thread should fail. This is achieved when these operations invoke the
167-
* {@link #maybeFailWithError()} method, as it causes a {@link KafkaException} to be thrown, ensuring the stated
168-
* transactional guarantees are not violated.
169-
*
170-
* <p/>
171-
*
172-
* See KAFKA-14831 for more detail.
173-
*/
174-
public interface InvalidStateTransitionAttemptHandler {
175-
176-
/**
177-
* Callback to determine if the transaction manager's {@link #currentState} should be set to
178-
* {@link State#FATAL_ERROR} to prevent possible transaction corruption.
179-
*
180-
* @return {@code true} to set state to {@link State#FATAL_ERROR} before throwing an exception,
181-
* {@code false} to throw an exception without first changing the state
182-
*/
183-
boolean isFatalState();
184-
}
185-
186-
/**
187-
* The default logic is to poison the internal {@link #currentState state} if an invalid state transition is
188-
* attempted on the {@link Sender.SenderThread sender's thread}.
189-
*/
190-
public static final InvalidStateTransitionAttemptHandler DEFAULT_INVALID_STATE_TRANSITION_ATTEMPT_HANDLER = () -> Thread.currentThread() instanceof Sender.SenderThread;
191-
private final InvalidStateTransitionAttemptHandler invalidStateTransitionAttemptHandler;
192-
193123
private PendingStateTransition pendingTransition;
194124

195125
// This is used by the TxnRequestHandlers to control how long to back off before a given request is retried.
@@ -274,15 +204,6 @@ public TransactionManager(final LogContext logContext,
274204
final int transactionTimeoutMs,
275205
final long retryBackoffMs,
276206
final ApiVersions apiVersions) {
277-
this(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs, apiVersions, DEFAULT_INVALID_STATE_TRANSITION_ATTEMPT_HANDLER);
278-
}
279-
280-
public TransactionManager(final LogContext logContext,
281-
final String transactionalId,
282-
final int transactionTimeoutMs,
283-
final long retryBackoffMs,
284-
final ApiVersions apiVersions,
285-
final InvalidStateTransitionAttemptHandler invalidStateTransitionAttemptHandler) {
286207
this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
287208
this.transactionalId = transactionalId;
288209
this.log = logContext.logger(TransactionManager.class);
@@ -299,7 +220,6 @@ public TransactionManager(final LogContext logContext,
299220
this.retryBackoffMs = retryBackoffMs;
300221
this.txnPartitionMap = new TxnPartitionMap(logContext);
301222
this.apiVersions = apiVersions;
302-
this.invalidStateTransitionAttemptHandler = invalidStateTransitionAttemptHandler;
303223
}
304224

305225
public synchronized TransactionalRequestResult initializeTransactions() {
@@ -1086,7 +1006,7 @@ private void transitionTo(State target, RuntimeException error) {
10861006
String message = idString + "Invalid transition attempted from state "
10871007
+ currentState.name() + " to state " + target.name();
10881008

1089-
if (invalidStateTransitionAttemptHandler.isFatalState()) {
1009+
if (shouldSetToFatalErrorState()) {
10901010
currentState = State.FATAL_ERROR;
10911011
lastError = new IllegalStateException(message);
10921012
throw lastError;
@@ -1109,6 +1029,63 @@ private void transitionTo(State target, RuntimeException error) {
11091029
currentState = target;
11101030
}
11111031

1032+
/**
1033+
* During its normal course of operations, the transaction manager transitions through different internal
1034+
* states (i.e. by updating {@link #currentState}) to one of those defined in {@link State}. These state transitions
1035+
* result from actions on one of the following classes of threads:
1036+
*
1037+
* <ul>
1038+
* <li><em>Application</em> threads that invokes {@link Producer} API calls</li>
1039+
* <li><em>{@link Sender}</em> thread operations</li>
1040+
* </ul>
1041+
*
1042+
* When an invalid state transition is detected during execution on an <em>application</em> thread, the
1043+
* {@link #currentState} is <em>not updated</em> and an {@link IllegalStateException} is thrown. This gives the
1044+
* application the opportunity to fix the issue without permanently poisoning the state of the
1045+
* transaction manager. The {@link Producer} API calls that perform a state transition include:
1046+
*
1047+
* <ul>
1048+
* <li>{@link Producer#initTransactions()} calls {@link #initializeTransactions()}</li>
1049+
* <li>{@link Producer#beginTransaction()} calls {@link #beginTransaction()}</li>
1050+
* <li>{@link Producer#commitTransaction()}} calls {@link #beginCommit()}</li>
1051+
* <li>{@link Producer#abortTransaction()} calls {@link #beginAbort()}
1052+
* </li>
1053+
* <li>{@link Producer#sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} calls
1054+
* {@link #sendOffsetsToTransaction(Map, ConsumerGroupMetadata)}
1055+
* </li>
1056+
* <li>{@link Producer#send(ProducerRecord)} (and its variants) calls
1057+
* {@link #maybeAddPartition(TopicPartition)} and
1058+
* {@link #maybeTransitionToErrorState(RuntimeException)}
1059+
* </li>
1060+
* </ul>
1061+
*
1062+
* <p/>
1063+
*
1064+
* The {@link Producer} is implemented such that much of its work delegated to and performed asynchronously on the
1065+
* <em>{@link Sender}</em> thread. This includes record batching, network I/O, broker response handlers, etc. If an
1066+
* invalid state transition is detected in the <em>{@link Sender}</em> thread, in addition to throwing an
1067+
* {@link IllegalStateException}, the transaction manager intentionally "poisons" itself by setting its
1068+
* {@link #currentState} to {@link State#FATAL_ERROR}, a state from which it cannot recover.
1069+
*
1070+
* <p/>
1071+
*
1072+
* It's important to prevent possible corruption when the transaction manager has determined that it is in a
1073+
* fatal state. Subsequent transaction operations attempted via either the <em>application</em> or the
1074+
* <em>{@link Sender}</em> thread should fail. This is achieved when these operations invoke the
1075+
* {@link #maybeFailWithError()} method, as it causes a {@link KafkaException} to be thrown, ensuring the stated
1076+
* transactional guarantees are not violated.
1077+
*
1078+
* <p/>
1079+
*
1080+
* See KAFKA-14831 for more detail.
1081+
*
1082+
* @return {@code true} to set state to {@link State#FATAL_ERROR} before throwing an exception,
1083+
* {@code false} to throw an exception without first changing the state
1084+
*/
1085+
protected boolean shouldSetToFatalErrorState() {
1086+
return Thread.currentThread() instanceof Sender.SenderThread;
1087+
}
1088+
11121089
private void ensureTransactional() {
11131090
if (!isTransactional())
11141091
throw new IllegalStateException("Transactional method invoked on a non-transactional producer.");

clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java

+28-12
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public class TransactionManagerTest {
149149

150150
private RecordAccumulator accumulator = null;
151151
private Sender sender = null;
152-
private TransactionManager transactionManager = null;
152+
private TestableTransactionManager transactionManager = null;
153153
private Node brokerNode = null;
154154
private long finalizedFeaturesEpoch = 0;
155155

@@ -163,12 +163,6 @@ public void setup() {
163163
}
164164

165165
private void initializeTransactionManager(Optional<String> transactionalId, boolean transactionV2Enabled) {
166-
initializeTransactionManager(transactionalId, transactionV2Enabled, TransactionManager.DEFAULT_INVALID_STATE_TRANSITION_ATTEMPT_HANDLER);
167-
}
168-
169-
private void initializeTransactionManager(Optional<String> transactionalId,
170-
boolean transactionV2Enabled,
171-
TransactionManager.InvalidStateTransitionAttemptHandler invalidStateTransitionAttemptHandler) {
172166
Metrics metrics = new Metrics(time);
173167

174168
apiVersions.update("0", new NodeApiVersions(Arrays.asList(
@@ -194,8 +188,8 @@ private void initializeTransactionManager(Optional<String> transactionalId,
194188
.setMinVersionLevel(transactionV2Enabled ? (short) 2 : (short) 1)),
195189
finalizedFeaturesEpoch));
196190
finalizedFeaturesEpoch += 1;
197-
this.transactionManager = new TransactionManager(logContext, transactionalId.orElse(null),
198-
transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions, invalidStateTransitionAttemptHandler);
191+
this.transactionManager = new TestableTransactionManager(logContext, transactionalId.orElse(null),
192+
transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions);
199193

200194
int batchSize = 16 * 1024;
201195
int deliveryTimeoutMs = 3000;
@@ -1044,7 +1038,7 @@ public void testTransactionManagerDisablesV2() {
10441038
.setMaxVersionLevel((short) 1)
10451039
.setMinVersionLevel((short) 1)),
10461040
0));
1047-
this.transactionManager = new TransactionManager(logContext, transactionalId,
1041+
this.transactionManager = new TestableTransactionManager(logContext, transactionalId,
10481042
transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions);
10491043

10501044
int batchSize = 16 * 1024;
@@ -3808,8 +3802,8 @@ public void testBackgroundInvalidStateTransitionIsFatal() {
38083802
// The default logic is to poison the transaction manager on an invalid state transition attempt if that
38093803
// attempt happens on the Sender thread. Here the logic is altered to always poison on invalid attempts to
38103804
// mimic as though it's being invoked by the Sender.
3811-
TransactionManager.InvalidStateTransitionAttemptHandler testHandler = () -> true;
3812-
initializeTransactionManager(Optional.of(transactionalId), true, testHandler);
3805+
initializeTransactionManager(Optional.of(transactionalId), true);
3806+
transactionManager.setShouldSetToFatalErrorStateOverride(true);
38133807
doInitTransactions();
38143808
assertTrue(transactionManager.isTransactional());
38153809

@@ -4382,4 +4376,26 @@ private void runUntil(Supplier<Boolean> condition) {
43824376
ProducerTestUtils.runUntil(sender, condition);
43834377
}
43844378

4379+
private static class TestableTransactionManager extends TransactionManager {
4380+
4381+
private Optional<Boolean> shouldSetToFatalErrorStateOverride;
4382+
4383+
public TestableTransactionManager(LogContext logContext,
4384+
String transactionalId,
4385+
int transactionTimeoutMs,
4386+
long retryBackoffMs,
4387+
ApiVersions apiVersions) {
4388+
super(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs, apiVersions);
4389+
this.shouldSetToFatalErrorStateOverride = Optional.empty();
4390+
}
4391+
4392+
private void setShouldSetToFatalErrorStateOverride(boolean override) {
4393+
shouldSetToFatalErrorStateOverride = Optional.of(override);
4394+
}
4395+
4396+
@Override
4397+
protected boolean shouldSetToFatalErrorState() {
4398+
return shouldSetToFatalErrorStateOverride.orElseGet(super::shouldSetToFatalErrorState);
4399+
}
4400+
}
43854401
}

0 commit comments

Comments
 (0)