Skip to content

Commit 6604b68

Browse files
committed
Minor updates to naming
1 parent c8a135f commit 6604b68

File tree

2 files changed

+16
-15
lines changed

2 files changed

+16
-15
lines changed

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ public class TransactionManager {
171171
*
172172
* See KAFKA-14831 for more detail.
173173
*/
174-
public interface InvalidTransitionAttemptPolicy {
174+
public interface InvalidStateTransitionAttemptHandler {
175175

176176
/**
177177
* Callback to determine if the transaction manager's {@link #currentState} should be set to
@@ -184,11 +184,11 @@ public interface InvalidTransitionAttemptPolicy {
184184
}
185185

186186
/**
187-
* The default policy is to poison the internal {@link #currentState state} if an invalid state transition is
187+
* The default logic is to poison the internal {@link #currentState state} if an invalid state transition is
188188
* attempted on the {@link Sender.SenderThread sender's thread}.
189189
*/
190-
public static final InvalidTransitionAttemptPolicy DEFAULT_INVALID_TRANSITION_ATTEMPT_POLICY = () -> Thread.currentThread() instanceof Sender.SenderThread;
191-
private final InvalidTransitionAttemptPolicy invalidTransitionAttemptPolicy;
190+
public static final InvalidStateTransitionAttemptHandler DEFAULT_INVALID_STATE_TRANSITION_ATTEMPT_HANDLER = () -> Thread.currentThread() instanceof Sender.SenderThread;
191+
private final InvalidStateTransitionAttemptHandler invalidStateTransitionAttemptHandler;
192192

193193
private PendingStateTransition pendingTransition;
194194

@@ -274,15 +274,15 @@ public TransactionManager(final LogContext logContext,
274274
final int transactionTimeoutMs,
275275
final long retryBackoffMs,
276276
final ApiVersions apiVersions) {
277-
this(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs, apiVersions, DEFAULT_INVALID_TRANSITION_ATTEMPT_POLICY);
277+
this(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs, apiVersions, DEFAULT_INVALID_STATE_TRANSITION_ATTEMPT_HANDLER);
278278
}
279279

280280
public TransactionManager(final LogContext logContext,
281281
final String transactionalId,
282282
final int transactionTimeoutMs,
283283
final long retryBackoffMs,
284284
final ApiVersions apiVersions,
285-
final InvalidTransitionAttemptPolicy invalidTransitionAttemptPolicy) {
285+
final InvalidStateTransitionAttemptHandler invalidStateTransitionAttemptHandler) {
286286
this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
287287
this.transactionalId = transactionalId;
288288
this.log = logContext.logger(TransactionManager.class);
@@ -299,7 +299,7 @@ public TransactionManager(final LogContext logContext,
299299
this.retryBackoffMs = retryBackoffMs;
300300
this.txnPartitionMap = new TxnPartitionMap(logContext);
301301
this.apiVersions = apiVersions;
302-
this.invalidTransitionAttemptPolicy = invalidTransitionAttemptPolicy;
302+
this.invalidStateTransitionAttemptHandler = invalidStateTransitionAttemptHandler;
303303
}
304304

305305
public synchronized TransactionalRequestResult initializeTransactions() {
@@ -1086,7 +1086,7 @@ private void transitionTo(State target, RuntimeException error) {
10861086
String message = idString + "Invalid transition attempted from state "
10871087
+ currentState.name() + " to state " + target.name();
10881088

1089-
if (invalidTransitionAttemptPolicy.isFatalState()) {
1089+
if (invalidStateTransitionAttemptHandler.isFatalState()) {
10901090
currentState = State.FATAL_ERROR;
10911091
lastError = new IllegalStateException(message);
10921092
throw lastError;

clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java

+8-7
Original file line numberDiff line numberDiff line change
@@ -163,12 +163,12 @@ public void setup() {
163163
}
164164

165165
private void initializeTransactionManager(Optional<String> transactionalId, boolean transactionV2Enabled) {
166-
initializeTransactionManager(transactionalId, transactionV2Enabled, TransactionManager.DEFAULT_INVALID_TRANSITION_ATTEMPT_POLICY);
166+
initializeTransactionManager(transactionalId, transactionV2Enabled, TransactionManager.DEFAULT_INVALID_STATE_TRANSITION_ATTEMPT_HANDLER);
167167
}
168168

169169
private void initializeTransactionManager(Optional<String> transactionalId,
170170
boolean transactionV2Enabled,
171-
TransactionManager.InvalidTransitionAttemptPolicy invalidTransitionAttemptPolicy) {
171+
TransactionManager.InvalidStateTransitionAttemptHandler invalidStateTransitionAttemptHandler) {
172172
Metrics metrics = new Metrics(time);
173173

174174
apiVersions.update("0", new NodeApiVersions(Arrays.asList(
@@ -195,7 +195,7 @@ private void initializeTransactionManager(Optional<String> transactionalId,
195195
finalizedFeaturesEpoch));
196196
finalizedFeaturesEpoch += 1;
197197
this.transactionManager = new TransactionManager(logContext, transactionalId.orElse(null),
198-
transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions, invalidTransitionAttemptPolicy);
198+
transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions, invalidStateTransitionAttemptHandler);
199199

200200
int batchSize = 16 * 1024;
201201
int deliveryTimeoutMs = 3000;
@@ -3805,10 +3805,11 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(t
38053805

38063806
@Test
38073807
public void testBackgroundInvalidStateTransitionIsFatal() {
3808-
// Usually the policy is the poison the transaction manager only by the Sender thread, but for the
3809-
// test the policy is forced to true to mimic it being called from the Sender.
3810-
TransactionManager.InvalidTransitionAttemptPolicy policy = () -> true;
3811-
initializeTransactionManager(Optional.of(transactionalId), true, policy);
3808+
// The default logic is to poison the transaction manager on an invalid state transition attempt if that
3809+
// attempt happens on the Sender thread. Here the logic is altered to always poison on invalid attempts to
3810+
// mimic as though it's being invoked by the Sender.
3811+
TransactionManager.InvalidStateTransitionAttemptHandler testHandler = () -> true;
3812+
initializeTransactionManager(Optional.of(transactionalId), true, testHandler);
38123813
doInitTransactions();
38133814
assertTrue(transactionManager.isTransactional());
38143815

0 commit comments

Comments
 (0)