From 474df2e832d6d2de209feba31c3e654abeb2d7fb Mon Sep 17 00:00:00 2001 From: Kirk True Date: Thu, 10 Apr 2025 11:38:28 -0700 Subject: [PATCH 1/8] KAFKA-15767: Refactor TransactionManager to avoid use of ThreadLocal Introduces a concrete subclass of KafkaThread named SenderThread. The poisoning of the TransactionManager can be achieved by looking at the type of the current thread. --- .../kafka/clients/producer/KafkaProducer.java | 7 +++--- .../clients/producer/internals/Sender.java | 10 +++++--- .../internals/TransactionManager.java | 25 +++++++++++++------ .../clients/producer/KafkaProducerTest.java | 5 ++-- .../internals/TransactionManagerTest.java | 15 ++++++++--- 5 files changed, 42 insertions(+), 20 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index aed5d75d70115..6945ded0cd530 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -75,7 +75,6 @@ import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter; import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils; import org.apache.kafka.common.utils.AppInfoParser; -import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; @@ -256,7 +255,7 @@ public class KafkaProducer implements Producer { private final ProducerMetadata metadata; private final RecordAccumulator accumulator; private final Sender sender; - private final Thread ioThread; + private final Sender.SenderThread ioThread; private final Compression compression; private final Sensor errors; private final Time time; @@ -455,7 +454,7 @@ public KafkaProducer(Properties properties, Serializer keySerializer, Seriali this.errors = this.metrics.sensor("errors"); this.sender = newSender(logContext, kafkaClient, this.metadata); String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId; - this.ioThread = new KafkaThread(ioThreadName, this.sender, true); + this.ioThread = new Sender.SenderThread(ioThreadName, this.sender); this.ioThread.start(); config.logUnused(); AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds()); @@ -481,7 +480,7 @@ public KafkaProducer(Properties properties, Serializer keySerializer, Seriali ProducerInterceptors interceptors, Partitioner partitioner, Time time, - KafkaThread ioThread, + Sender.SenderThread ioThread, Optional clientTelemetryReporter) { this.producerConfig = config; this.time = time; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index fe4a97b12295b..0973d07194286 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -53,6 +53,7 @@ import org.apache.kafka.common.requests.ProduceRequest; import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; @@ -240,9 +241,6 @@ private boolean hasPendingTransactionalRequests() { public void run() { log.debug("Starting Kafka producer I/O thread."); - if (transactionManager != null) - transactionManager.setPoisonStateOnInvalidTransition(true); - // main loop, runs until close is called while (running) { try { @@ -1078,4 +1076,10 @@ void recordBatchSplit() { } } + public static class SenderThread extends KafkaThread { + + public SenderThread(final String name, Runnable runnable) { + super(name, runnable, true); + } + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index c78134c72ecf2..2391dc463c8ac 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -171,7 +171,13 @@ public class TransactionManager { * * See KAFKA-14831 for more detail. */ - private final ThreadLocal shouldPoisonStateOnInvalidTransition; + public interface InvalidTransitionPolicy { + boolean shouldPoisonState(); + } + + public static final InvalidTransitionPolicy DEFAULT_INVALID_TRANSITION_POLICY = () -> Thread.currentThread() instanceof Sender.SenderThread; + private final InvalidTransitionPolicy invalidTransitionPolicy; + private PendingStateTransition pendingTransition; // This is used by the TxnRequestHandlers to control how long to back off before a given request is retried. @@ -256,6 +262,15 @@ public TransactionManager(final LogContext logContext, final int transactionTimeoutMs, final long retryBackoffMs, final ApiVersions apiVersions) { + this(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs, apiVersions, DEFAULT_INVALID_TRANSITION_POLICY); + } + + public TransactionManager(final LogContext logContext, + final String transactionalId, + final int transactionTimeoutMs, + final long retryBackoffMs, + final ApiVersions apiVersions, + final InvalidTransitionPolicy invalidTransitionPolicy) { this.producerIdAndEpoch = ProducerIdAndEpoch.NONE; this.transactionalId = transactionalId; this.log = logContext.logger(TransactionManager.class); @@ -265,7 +280,6 @@ public TransactionManager(final LogContext logContext, this.newPartitionsInTransaction = new HashSet<>(); this.pendingPartitionsInTransaction = new HashSet<>(); this.partitionsInTransaction = new HashSet<>(); - this.shouldPoisonStateOnInvalidTransition = ThreadLocal.withInitial(() -> false); this.pendingRequests = new PriorityQueue<>(10, Comparator.comparingInt(o -> o.priority().priority)); this.pendingTxnOffsetCommits = new HashMap<>(); this.partitionsWithUnresolvedSequences = new HashMap<>(); @@ -273,10 +287,7 @@ public TransactionManager(final LogContext logContext, this.retryBackoffMs = retryBackoffMs; this.txnPartitionMap = new TxnPartitionMap(logContext); this.apiVersions = apiVersions; - } - - void setPoisonStateOnInvalidTransition(boolean shouldPoisonState) { - shouldPoisonStateOnInvalidTransition.set(shouldPoisonState); + this.invalidTransitionPolicy = invalidTransitionPolicy; } public synchronized TransactionalRequestResult initializeTransactions() { @@ -1063,7 +1074,7 @@ private void transitionTo(State target, RuntimeException error) { String message = idString + "Invalid transition attempted from state " + currentState.name() + " to state " + target.name(); - if (shouldPoisonStateOnInvalidTransition.get()) { + if (invalidTransitionPolicy.shouldPoisonState()) { currentState = State.FATAL_ERROR; lastError = new IllegalStateException(message); throw lastError; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index fbb3484a03f7f..12782f5b1242d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -85,7 +85,6 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter; import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; -import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; @@ -2542,7 +2541,7 @@ private static class KafkaProducerTestContext { private final Map configs; private final Serializer serializer; private final Partitioner partitioner = mock(Partitioner.class); - private final KafkaThread ioThread = mock(KafkaThread.class); + private final Sender.SenderThread senderThread = mock(Sender.SenderThread.class); private final List> interceptors = new ArrayList<>(); private ProducerMetadata metadata = mock(ProducerMetadata.class); private RecordAccumulator accumulator = mock(RecordAccumulator.class); @@ -2623,7 +2622,7 @@ public KafkaProducer newKafkaProducer() { interceptors, partitioner, time, - ioThread, + senderThread, Optional.empty() ); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 0d582bf80168d..42947cf07de2e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -108,6 +108,7 @@ import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -163,6 +164,12 @@ public void setup() { } private void initializeTransactionManager(Optional transactionalId, boolean transactionV2Enabled) { + initializeTransactionManager(transactionalId, transactionV2Enabled, TransactionManager.DEFAULT_INVALID_TRANSITION_POLICY); + } + + private void initializeTransactionManager(Optional transactionalId, + boolean transactionV2Enabled, + TransactionManager.InvalidTransitionPolicy invalidTransitionPolicy) { Metrics metrics = new Metrics(time); apiVersions.update("0", new NodeApiVersions(Arrays.asList( @@ -189,7 +196,7 @@ private void initializeTransactionManager(Optional transactionalId, bool finalizedFeaturesEpoch)); finalizedFeaturesEpoch += 1; this.transactionManager = new TransactionManager(logContext, transactionalId.orElse(null), - transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions); + transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions, invalidTransitionPolicy); int batchSize = 16 * 1024; int deliveryTimeoutMs = 3000; @@ -3799,11 +3806,13 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(t @Test public void testBackgroundInvalidStateTransitionIsFatal() { + // Usually the policy is the poison the transaction manager only by the Sender thread, but for the + // test the policy is forced to true to mimic it being called from the Sender. + TransactionManager.InvalidTransitionPolicy policy = () -> true; + initializeTransactionManager(Optional.of(transactionalId), true, policy); doInitTransactions(); assertTrue(transactionManager.isTransactional()); - transactionManager.setPoisonStateOnInvalidTransition(true); - // Intentionally perform an operation that will cause an invalid state transition. The detection of this // will result in a poisoning of the transaction manager for all subsequent transactional operations since // it was performed in the background. From 3ede47b978a2304a2fcb9a7700a83433dc279d64 Mon Sep 17 00:00:00 2001 From: Kirk True Date: Thu, 10 Apr 2025 11:39:08 -0700 Subject: [PATCH 2/8] Remove unnecessary import --- .../kafka/clients/producer/internals/TransactionManagerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 42947cf07de2e..e86a76a25b5b8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -108,7 +108,6 @@ import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; From 05c2e59d1d77729fc6bedc6cd1653f87153dce37 Mon Sep 17 00:00:00 2001 From: Kirk True Date: Thu, 10 Apr 2025 14:27:48 -0700 Subject: [PATCH 3/8] Updated naming and added more documentation --- .../internals/TransactionManager.java | 28 +++++++++++++------ .../internals/TransactionManagerTest.java | 8 +++--- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 2391dc463c8ac..f7045683a9f5f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -171,12 +171,24 @@ public class TransactionManager { * * See KAFKA-14831 for more detail. */ - public interface InvalidTransitionPolicy { - boolean shouldPoisonState(); + public interface InvalidTransitionAttemptPolicy { + + /** + * Callback to determine if the transaction manager's {@link #currentState} should be set to + * {@link State#FATAL_ERROR} to prevent possible transaction corruption. + * + * @return {@code true} to set state to {@link State#FATAL_ERROR} before throwing an exception, + * {@code false} to throw an exception without first changing the state + */ + boolean isFatalState(); } - public static final InvalidTransitionPolicy DEFAULT_INVALID_TRANSITION_POLICY = () -> Thread.currentThread() instanceof Sender.SenderThread; - private final InvalidTransitionPolicy invalidTransitionPolicy; + /** + * The default policy is to poison the internal {@link #currentState state} if an invalid state transition is + * attempted on the {@link Sender.SenderThread sender's thread}. + */ + public static final InvalidTransitionAttemptPolicy DEFAULT_INVALID_TRANSITION_ATTEMPT_POLICY = () -> Thread.currentThread() instanceof Sender.SenderThread; + private final InvalidTransitionAttemptPolicy invalidTransitionAttemptPolicy; private PendingStateTransition pendingTransition; @@ -262,7 +274,7 @@ public TransactionManager(final LogContext logContext, final int transactionTimeoutMs, final long retryBackoffMs, final ApiVersions apiVersions) { - this(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs, apiVersions, DEFAULT_INVALID_TRANSITION_POLICY); + this(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs, apiVersions, DEFAULT_INVALID_TRANSITION_ATTEMPT_POLICY); } public TransactionManager(final LogContext logContext, @@ -270,7 +282,7 @@ public TransactionManager(final LogContext logContext, final int transactionTimeoutMs, final long retryBackoffMs, final ApiVersions apiVersions, - final InvalidTransitionPolicy invalidTransitionPolicy) { + final InvalidTransitionAttemptPolicy invalidTransitionAttemptPolicy) { this.producerIdAndEpoch = ProducerIdAndEpoch.NONE; this.transactionalId = transactionalId; this.log = logContext.logger(TransactionManager.class); @@ -287,7 +299,7 @@ public TransactionManager(final LogContext logContext, this.retryBackoffMs = retryBackoffMs; this.txnPartitionMap = new TxnPartitionMap(logContext); this.apiVersions = apiVersions; - this.invalidTransitionPolicy = invalidTransitionPolicy; + this.invalidTransitionAttemptPolicy = invalidTransitionAttemptPolicy; } public synchronized TransactionalRequestResult initializeTransactions() { @@ -1074,7 +1086,7 @@ private void transitionTo(State target, RuntimeException error) { String message = idString + "Invalid transition attempted from state " + currentState.name() + " to state " + target.name(); - if (invalidTransitionPolicy.shouldPoisonState()) { + if (invalidTransitionAttemptPolicy.isFatalState()) { currentState = State.FATAL_ERROR; lastError = new IllegalStateException(message); throw lastError; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index e86a76a25b5b8..6751487fc5386 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -163,12 +163,12 @@ public void setup() { } private void initializeTransactionManager(Optional transactionalId, boolean transactionV2Enabled) { - initializeTransactionManager(transactionalId, transactionV2Enabled, TransactionManager.DEFAULT_INVALID_TRANSITION_POLICY); + initializeTransactionManager(transactionalId, transactionV2Enabled, TransactionManager.DEFAULT_INVALID_TRANSITION_ATTEMPT_POLICY); } private void initializeTransactionManager(Optional transactionalId, boolean transactionV2Enabled, - TransactionManager.InvalidTransitionPolicy invalidTransitionPolicy) { + TransactionManager.InvalidTransitionAttemptPolicy invalidTransitionAttemptPolicy) { Metrics metrics = new Metrics(time); apiVersions.update("0", new NodeApiVersions(Arrays.asList( @@ -195,7 +195,7 @@ private void initializeTransactionManager(Optional transactionalId, finalizedFeaturesEpoch)); finalizedFeaturesEpoch += 1; this.transactionManager = new TransactionManager(logContext, transactionalId.orElse(null), - transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions, invalidTransitionPolicy); + transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions, invalidTransitionAttemptPolicy); int batchSize = 16 * 1024; int deliveryTimeoutMs = 3000; @@ -3807,7 +3807,7 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(t public void testBackgroundInvalidStateTransitionIsFatal() { // Usually the policy is the poison the transaction manager only by the Sender thread, but for the // test the policy is forced to true to mimic it being called from the Sender. - TransactionManager.InvalidTransitionPolicy policy = () -> true; + TransactionManager.InvalidTransitionAttemptPolicy policy = () -> true; initializeTransactionManager(Optional.of(transactionalId), true, policy); doInitTransactions(); assertTrue(transactionManager.isTransactional()); From 6604b6815883dd1bff5e7f6c259d5f23552fc5de Mon Sep 17 00:00:00 2001 From: Kirk True Date: Fri, 18 Apr 2025 17:34:49 -0700 Subject: [PATCH 4/8] Minor updates to naming --- .../producer/internals/TransactionManager.java | 16 ++++++++-------- .../internals/TransactionManagerTest.java | 15 ++++++++------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index f7045683a9f5f..a9a05118085af 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -171,7 +171,7 @@ public class TransactionManager { * * See KAFKA-14831 for more detail. */ - public interface InvalidTransitionAttemptPolicy { + public interface InvalidStateTransitionAttemptHandler { /** * Callback to determine if the transaction manager's {@link #currentState} should be set to @@ -184,11 +184,11 @@ public interface InvalidTransitionAttemptPolicy { } /** - * The default policy is to poison the internal {@link #currentState state} if an invalid state transition is + * The default logic is to poison the internal {@link #currentState state} if an invalid state transition is * attempted on the {@link Sender.SenderThread sender's thread}. */ - public static final InvalidTransitionAttemptPolicy DEFAULT_INVALID_TRANSITION_ATTEMPT_POLICY = () -> Thread.currentThread() instanceof Sender.SenderThread; - private final InvalidTransitionAttemptPolicy invalidTransitionAttemptPolicy; + public static final InvalidStateTransitionAttemptHandler DEFAULT_INVALID_STATE_TRANSITION_ATTEMPT_HANDLER = () -> Thread.currentThread() instanceof Sender.SenderThread; + private final InvalidStateTransitionAttemptHandler invalidStateTransitionAttemptHandler; private PendingStateTransition pendingTransition; @@ -274,7 +274,7 @@ public TransactionManager(final LogContext logContext, final int transactionTimeoutMs, final long retryBackoffMs, final ApiVersions apiVersions) { - this(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs, apiVersions, DEFAULT_INVALID_TRANSITION_ATTEMPT_POLICY); + this(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs, apiVersions, DEFAULT_INVALID_STATE_TRANSITION_ATTEMPT_HANDLER); } public TransactionManager(final LogContext logContext, @@ -282,7 +282,7 @@ public TransactionManager(final LogContext logContext, final int transactionTimeoutMs, final long retryBackoffMs, final ApiVersions apiVersions, - final InvalidTransitionAttemptPolicy invalidTransitionAttemptPolicy) { + final InvalidStateTransitionAttemptHandler invalidStateTransitionAttemptHandler) { this.producerIdAndEpoch = ProducerIdAndEpoch.NONE; this.transactionalId = transactionalId; this.log = logContext.logger(TransactionManager.class); @@ -299,7 +299,7 @@ public TransactionManager(final LogContext logContext, this.retryBackoffMs = retryBackoffMs; this.txnPartitionMap = new TxnPartitionMap(logContext); this.apiVersions = apiVersions; - this.invalidTransitionAttemptPolicy = invalidTransitionAttemptPolicy; + this.invalidStateTransitionAttemptHandler = invalidStateTransitionAttemptHandler; } public synchronized TransactionalRequestResult initializeTransactions() { @@ -1086,7 +1086,7 @@ private void transitionTo(State target, RuntimeException error) { String message = idString + "Invalid transition attempted from state " + currentState.name() + " to state " + target.name(); - if (invalidTransitionAttemptPolicy.isFatalState()) { + if (invalidStateTransitionAttemptHandler.isFatalState()) { currentState = State.FATAL_ERROR; lastError = new IllegalStateException(message); throw lastError; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index ca6e7d51321f8..399f1600354ca 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -163,12 +163,12 @@ public void setup() { } private void initializeTransactionManager(Optional transactionalId, boolean transactionV2Enabled) { - initializeTransactionManager(transactionalId, transactionV2Enabled, TransactionManager.DEFAULT_INVALID_TRANSITION_ATTEMPT_POLICY); + initializeTransactionManager(transactionalId, transactionV2Enabled, TransactionManager.DEFAULT_INVALID_STATE_TRANSITION_ATTEMPT_HANDLER); } private void initializeTransactionManager(Optional transactionalId, boolean transactionV2Enabled, - TransactionManager.InvalidTransitionAttemptPolicy invalidTransitionAttemptPolicy) { + TransactionManager.InvalidStateTransitionAttemptHandler invalidStateTransitionAttemptHandler) { Metrics metrics = new Metrics(time); apiVersions.update("0", new NodeApiVersions(Arrays.asList( @@ -195,7 +195,7 @@ private void initializeTransactionManager(Optional transactionalId, finalizedFeaturesEpoch)); finalizedFeaturesEpoch += 1; this.transactionManager = new TransactionManager(logContext, transactionalId.orElse(null), - transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions, invalidTransitionAttemptPolicy); + transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions, invalidStateTransitionAttemptHandler); int batchSize = 16 * 1024; int deliveryTimeoutMs = 3000; @@ -3805,10 +3805,11 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(t @Test public void testBackgroundInvalidStateTransitionIsFatal() { - // Usually the policy is the poison the transaction manager only by the Sender thread, but for the - // test the policy is forced to true to mimic it being called from the Sender. - TransactionManager.InvalidTransitionAttemptPolicy policy = () -> true; - initializeTransactionManager(Optional.of(transactionalId), true, policy); + // The default logic is to poison the transaction manager on an invalid state transition attempt if that + // attempt happens on the Sender thread. Here the logic is altered to always poison on invalid attempts to + // mimic as though it's being invoked by the Sender. + TransactionManager.InvalidStateTransitionAttemptHandler testHandler = () -> true; + initializeTransactionManager(Optional.of(transactionalId), true, testHandler); doInitTransactions(); assertTrue(transactionManager.isTransactional()); From 72fd9c7cea3e185d70500d53a6bce023f1cc1058 Mon Sep 17 00:00:00 2001 From: Kirk True Date: Fri, 18 Apr 2025 17:48:16 -0700 Subject: [PATCH 5/8] Update to use subclassing to avoid so much change --- .../internals/TransactionManager.java | 139 ++++++++---------- .../internals/TransactionManagerTest.java | 40 +++-- 2 files changed, 86 insertions(+), 93 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index a9a05118085af..a36eaf67038bc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -120,76 +120,6 @@ public class TransactionManager { private final Set newPartitionsInTransaction; private final Set pendingPartitionsInTransaction; private final Set partitionsInTransaction; - - /** - * During its normal course of operations, the transaction manager transitions through different internal - * states (i.e. by updating {@link #currentState}) to one of those defined in {@link State}. These state transitions - * result from actions on one of the following classes of threads: - * - *
    - *
  • Application threads that invokes {@link Producer} API calls
  • - *
  • {@link Sender} thread operations
  • - *
- * - * When an invalid state transition is detected during execution on an application thread, the - * {@link #currentState} is not updated and an {@link IllegalStateException} is thrown. This gives the - * application the opportunity to fix the issue without permanently poisoning the state of the - * transaction manager. The {@link Producer} API calls that perform a state transition include: - * - *
    - *
  • {@link Producer#initTransactions()} calls {@link #initializeTransactions()}
  • - *
  • {@link Producer#beginTransaction()} calls {@link #beginTransaction()}
  • - *
  • {@link Producer#commitTransaction()}} calls {@link #beginCommit()}
  • - *
  • {@link Producer#abortTransaction()} calls {@link #beginAbort()} - *
  • - *
  • {@link Producer#sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} calls - * {@link #sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} - *
  • - *
  • {@link Producer#send(ProducerRecord)} (and its variants) calls - * {@link #maybeAddPartition(TopicPartition)} and - * {@link #maybeTransitionToErrorState(RuntimeException)} - *
  • - *
- * - *

- * - * The {@link Producer} is implemented such that much of its work delegated to and performed asynchronously on the - * {@link Sender} thread. This includes record batching, network I/O, broker response handlers, etc. If an - * invalid state transition is detected in the {@link Sender} thread, in addition to throwing an - * {@link IllegalStateException}, the transaction manager intentionally "poisons" itself by setting its - * {@link #currentState} to {@link State#FATAL_ERROR}, a state from which it cannot recover. - * - *

- * - * It's important to prevent possible corruption when the transaction manager has determined that it is in a - * fatal state. Subsequent transaction operations attempted via either the application or the - * {@link Sender} thread should fail. This is achieved when these operations invoke the - * {@link #maybeFailWithError()} method, as it causes a {@link KafkaException} to be thrown, ensuring the stated - * transactional guarantees are not violated. - * - *

- * - * See KAFKA-14831 for more detail. - */ - public interface InvalidStateTransitionAttemptHandler { - - /** - * Callback to determine if the transaction manager's {@link #currentState} should be set to - * {@link State#FATAL_ERROR} to prevent possible transaction corruption. - * - * @return {@code true} to set state to {@link State#FATAL_ERROR} before throwing an exception, - * {@code false} to throw an exception without first changing the state - */ - boolean isFatalState(); - } - - /** - * The default logic is to poison the internal {@link #currentState state} if an invalid state transition is - * attempted on the {@link Sender.SenderThread sender's thread}. - */ - public static final InvalidStateTransitionAttemptHandler DEFAULT_INVALID_STATE_TRANSITION_ATTEMPT_HANDLER = () -> Thread.currentThread() instanceof Sender.SenderThread; - private final InvalidStateTransitionAttemptHandler invalidStateTransitionAttemptHandler; - private PendingStateTransition pendingTransition; // This is used by the TxnRequestHandlers to control how long to back off before a given request is retried. @@ -274,15 +204,6 @@ public TransactionManager(final LogContext logContext, final int transactionTimeoutMs, final long retryBackoffMs, final ApiVersions apiVersions) { - this(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs, apiVersions, DEFAULT_INVALID_STATE_TRANSITION_ATTEMPT_HANDLER); - } - - public TransactionManager(final LogContext logContext, - final String transactionalId, - final int transactionTimeoutMs, - final long retryBackoffMs, - final ApiVersions apiVersions, - final InvalidStateTransitionAttemptHandler invalidStateTransitionAttemptHandler) { this.producerIdAndEpoch = ProducerIdAndEpoch.NONE; this.transactionalId = transactionalId; this.log = logContext.logger(TransactionManager.class); @@ -299,7 +220,6 @@ public TransactionManager(final LogContext logContext, this.retryBackoffMs = retryBackoffMs; this.txnPartitionMap = new TxnPartitionMap(logContext); this.apiVersions = apiVersions; - this.invalidStateTransitionAttemptHandler = invalidStateTransitionAttemptHandler; } public synchronized TransactionalRequestResult initializeTransactions() { @@ -1086,7 +1006,7 @@ private void transitionTo(State target, RuntimeException error) { String message = idString + "Invalid transition attempted from state " + currentState.name() + " to state " + target.name(); - if (invalidStateTransitionAttemptHandler.isFatalState()) { + if (shouldSetToFatalErrorState()) { currentState = State.FATAL_ERROR; lastError = new IllegalStateException(message); throw lastError; @@ -1109,6 +1029,63 @@ private void transitionTo(State target, RuntimeException error) { currentState = target; } + /** + * During its normal course of operations, the transaction manager transitions through different internal + * states (i.e. by updating {@link #currentState}) to one of those defined in {@link State}. These state transitions + * result from actions on one of the following classes of threads: + * + *

    + *
  • Application threads that invokes {@link Producer} API calls
  • + *
  • {@link Sender} thread operations
  • + *
+ * + * When an invalid state transition is detected during execution on an application thread, the + * {@link #currentState} is not updated and an {@link IllegalStateException} is thrown. This gives the + * application the opportunity to fix the issue without permanently poisoning the state of the + * transaction manager. The {@link Producer} API calls that perform a state transition include: + * + *
    + *
  • {@link Producer#initTransactions()} calls {@link #initializeTransactions()}
  • + *
  • {@link Producer#beginTransaction()} calls {@link #beginTransaction()}
  • + *
  • {@link Producer#commitTransaction()}} calls {@link #beginCommit()}
  • + *
  • {@link Producer#abortTransaction()} calls {@link #beginAbort()} + *
  • + *
  • {@link Producer#sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} calls + * {@link #sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} + *
  • + *
  • {@link Producer#send(ProducerRecord)} (and its variants) calls + * {@link #maybeAddPartition(TopicPartition)} and + * {@link #maybeTransitionToErrorState(RuntimeException)} + *
  • + *
+ * + *

+ * + * The {@link Producer} is implemented such that much of its work delegated to and performed asynchronously on the + * {@link Sender} thread. This includes record batching, network I/O, broker response handlers, etc. If an + * invalid state transition is detected in the {@link Sender} thread, in addition to throwing an + * {@link IllegalStateException}, the transaction manager intentionally "poisons" itself by setting its + * {@link #currentState} to {@link State#FATAL_ERROR}, a state from which it cannot recover. + * + *

+ * + * It's important to prevent possible corruption when the transaction manager has determined that it is in a + * fatal state. Subsequent transaction operations attempted via either the application or the + * {@link Sender} thread should fail. This is achieved when these operations invoke the + * {@link #maybeFailWithError()} method, as it causes a {@link KafkaException} to be thrown, ensuring the stated + * transactional guarantees are not violated. + * + *

+ * + * See KAFKA-14831 for more detail. + * + * @return {@code true} to set state to {@link State#FATAL_ERROR} before throwing an exception, + * {@code false} to throw an exception without first changing the state + */ + protected boolean shouldSetToFatalErrorState() { + return Thread.currentThread() instanceof Sender.SenderThread; + } + private void ensureTransactional() { if (!isTransactional()) throw new IllegalStateException("Transactional method invoked on a non-transactional producer."); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 399f1600354ca..2e1008d514a85 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -149,7 +149,7 @@ public class TransactionManagerTest { private RecordAccumulator accumulator = null; private Sender sender = null; - private TransactionManager transactionManager = null; + private TestableTransactionManager transactionManager = null; private Node brokerNode = null; private long finalizedFeaturesEpoch = 0; @@ -163,12 +163,6 @@ public void setup() { } private void initializeTransactionManager(Optional transactionalId, boolean transactionV2Enabled) { - initializeTransactionManager(transactionalId, transactionV2Enabled, TransactionManager.DEFAULT_INVALID_STATE_TRANSITION_ATTEMPT_HANDLER); - } - - private void initializeTransactionManager(Optional transactionalId, - boolean transactionV2Enabled, - TransactionManager.InvalidStateTransitionAttemptHandler invalidStateTransitionAttemptHandler) { Metrics metrics = new Metrics(time); apiVersions.update("0", new NodeApiVersions(Arrays.asList( @@ -194,8 +188,8 @@ private void initializeTransactionManager(Optional transactionalId, .setMinVersionLevel(transactionV2Enabled ? (short) 2 : (short) 1)), finalizedFeaturesEpoch)); finalizedFeaturesEpoch += 1; - this.transactionManager = new TransactionManager(logContext, transactionalId.orElse(null), - transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions, invalidStateTransitionAttemptHandler); + this.transactionManager = new TestableTransactionManager(logContext, transactionalId.orElse(null), + transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions); int batchSize = 16 * 1024; int deliveryTimeoutMs = 3000; @@ -1044,7 +1038,7 @@ public void testTransactionManagerDisablesV2() { .setMaxVersionLevel((short) 1) .setMinVersionLevel((short) 1)), 0)); - this.transactionManager = new TransactionManager(logContext, transactionalId, + this.transactionManager = new TestableTransactionManager(logContext, transactionalId, transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions); int batchSize = 16 * 1024; @@ -3808,8 +3802,8 @@ public void testBackgroundInvalidStateTransitionIsFatal() { // The default logic is to poison the transaction manager on an invalid state transition attempt if that // attempt happens on the Sender thread. Here the logic is altered to always poison on invalid attempts to // mimic as though it's being invoked by the Sender. - TransactionManager.InvalidStateTransitionAttemptHandler testHandler = () -> true; - initializeTransactionManager(Optional.of(transactionalId), true, testHandler); + initializeTransactionManager(Optional.of(transactionalId), true); + transactionManager.setShouldSetToFatalErrorStateOverride(true); doInitTransactions(); assertTrue(transactionManager.isTransactional()); @@ -4382,4 +4376,26 @@ private void runUntil(Supplier condition) { ProducerTestUtils.runUntil(sender, condition); } + private static class TestableTransactionManager extends TransactionManager { + + private Optional shouldSetToFatalErrorStateOverride; + + public TestableTransactionManager(LogContext logContext, + String transactionalId, + int transactionTimeoutMs, + long retryBackoffMs, + ApiVersions apiVersions) { + super(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs, apiVersions); + this.shouldSetToFatalErrorStateOverride = Optional.empty(); + } + + private void setShouldSetToFatalErrorStateOverride(boolean override) { + shouldSetToFatalErrorStateOverride = Optional.of(override); + } + + @Override + protected boolean shouldSetToFatalErrorState() { + return shouldSetToFatalErrorStateOverride.orElseGet(super::shouldSetToFatalErrorState); + } + } } From aa4817f1b2bbe49573a3444154bbe6534c663825 Mon Sep 17 00:00:00 2001 From: Kirk True Date: Fri, 18 Apr 2025 18:03:53 -0700 Subject: [PATCH 6/8] Attempt to minimize diffs --- .../kafka/clients/producer/KafkaProducer.java | 2 +- .../clients/producer/internals/Sender.java | 4 +- .../internals/TransactionManager.java | 116 +++++++++--------- .../internals/TransactionManagerTest.java | 10 +- 4 files changed, 66 insertions(+), 66 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index e9c3f927aaf0b..38dd7ad8abb0b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -453,7 +453,7 @@ public KafkaProducer(Properties properties, Serializer keySerializer, Seriali this.errors = this.metrics.sensor("errors"); this.sender = newSender(logContext, kafkaClient, this.metadata); String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId; - this.ioThread = new Sender.SenderThread(ioThreadName, this.sender); + this.ioThread = new Sender.SenderThread(ioThreadName, this.sender, true); this.ioThread.start(); config.logUnused(); AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds()); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 1a8ddf3d60df9..6739facfc3415 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -1072,8 +1072,8 @@ void recordBatchSplit() { public static class SenderThread extends KafkaThread { - public SenderThread(final String name, Runnable runnable) { - super(name, runnable, true); + public SenderThread(final String name, Runnable runnable, boolean daemon) { + super(name, runnable, daemon); } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index a36eaf67038bc..b52d5d4836d6c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -222,6 +222,63 @@ public TransactionManager(final LogContext logContext, this.apiVersions = apiVersions; } + /** + * During its normal course of operations, the transaction manager transitions through different internal + * states (i.e. by updating {@link #currentState}) to one of those defined in {@link State}. These state transitions + * result from actions on one of the following classes of threads: + * + *

    + *
  • Application threads that invokes {@link Producer} API calls
  • + *
  • {@link Sender} thread operations
  • + *
+ * + * When an invalid state transition is detected during execution on an application thread, the + * {@link #currentState} is not updated and an {@link IllegalStateException} is thrown. This gives the + * application the opportunity to fix the issue without permanently poisoning the state of the + * transaction manager. The {@link Producer} API calls that perform a state transition include: + * + *
    + *
  • {@link Producer#initTransactions()} calls {@link #initializeTransactions()}
  • + *
  • {@link Producer#beginTransaction()} calls {@link #beginTransaction()}
  • + *
  • {@link Producer#commitTransaction()}} calls {@link #beginCommit()}
  • + *
  • {@link Producer#abortTransaction()} calls {@link #beginAbort()} + *
  • + *
  • {@link Producer#sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} calls + * {@link #sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} + *
  • + *
  • {@link Producer#send(ProducerRecord)} (and its variants) calls + * {@link #maybeAddPartition(TopicPartition)} and + * {@link #maybeTransitionToErrorState(RuntimeException)} + *
  • + *
+ * + *

+ * + * The {@link Producer} is implemented such that much of its work delegated to and performed asynchronously on the + * {@link Sender} thread. This includes record batching, network I/O, broker response handlers, etc. If an + * invalid state transition is detected in the {@link Sender} thread, in addition to throwing an + * {@link IllegalStateException}, the transaction manager intentionally "poisons" itself by setting its + * {@link #currentState} to {@link State#FATAL_ERROR}, a state from which it cannot recover. + * + *

+ * + * It's important to prevent possible corruption when the transaction manager has determined that it is in a + * fatal state. Subsequent transaction operations attempted via either the application or the + * {@link Sender} thread should fail. This is achieved when these operations invoke the + * {@link #maybeFailWithError()} method, as it causes a {@link KafkaException} to be thrown, ensuring the stated + * transactional guarantees are not violated. + * + *

+ * + * See KAFKA-14831 for more detail. + * + * @return {@code true} to set state to {@link State#FATAL_ERROR} before throwing an exception, + * {@code false} to throw an exception without first changing the state + */ + protected boolean shouldPoisonStateOnInvalidTransition() { + return Thread.currentThread() instanceof Sender.SenderThread; + } + public synchronized TransactionalRequestResult initializeTransactions() { return initializeTransactions(ProducerIdAndEpoch.NONE); } @@ -1006,7 +1063,7 @@ private void transitionTo(State target, RuntimeException error) { String message = idString + "Invalid transition attempted from state " + currentState.name() + " to state " + target.name(); - if (shouldSetToFatalErrorState()) { + if (shouldPoisonStateOnInvalidTransition()) { currentState = State.FATAL_ERROR; lastError = new IllegalStateException(message); throw lastError; @@ -1029,63 +1086,6 @@ private void transitionTo(State target, RuntimeException error) { currentState = target; } - /** - * During its normal course of operations, the transaction manager transitions through different internal - * states (i.e. by updating {@link #currentState}) to one of those defined in {@link State}. These state transitions - * result from actions on one of the following classes of threads: - * - *

    - *
  • Application threads that invokes {@link Producer} API calls
  • - *
  • {@link Sender} thread operations
  • - *
- * - * When an invalid state transition is detected during execution on an application thread, the - * {@link #currentState} is not updated and an {@link IllegalStateException} is thrown. This gives the - * application the opportunity to fix the issue without permanently poisoning the state of the - * transaction manager. The {@link Producer} API calls that perform a state transition include: - * - *
    - *
  • {@link Producer#initTransactions()} calls {@link #initializeTransactions()}
  • - *
  • {@link Producer#beginTransaction()} calls {@link #beginTransaction()}
  • - *
  • {@link Producer#commitTransaction()}} calls {@link #beginCommit()}
  • - *
  • {@link Producer#abortTransaction()} calls {@link #beginAbort()} - *
  • - *
  • {@link Producer#sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} calls - * {@link #sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} - *
  • - *
  • {@link Producer#send(ProducerRecord)} (and its variants) calls - * {@link #maybeAddPartition(TopicPartition)} and - * {@link #maybeTransitionToErrorState(RuntimeException)} - *
  • - *
- * - *

- * - * The {@link Producer} is implemented such that much of its work delegated to and performed asynchronously on the - * {@link Sender} thread. This includes record batching, network I/O, broker response handlers, etc. If an - * invalid state transition is detected in the {@link Sender} thread, in addition to throwing an - * {@link IllegalStateException}, the transaction manager intentionally "poisons" itself by setting its - * {@link #currentState} to {@link State#FATAL_ERROR}, a state from which it cannot recover. - * - *

- * - * It's important to prevent possible corruption when the transaction manager has determined that it is in a - * fatal state. Subsequent transaction operations attempted via either the application or the - * {@link Sender} thread should fail. This is achieved when these operations invoke the - * {@link #maybeFailWithError()} method, as it causes a {@link KafkaException} to be thrown, ensuring the stated - * transactional guarantees are not violated. - * - *

- * - * See KAFKA-14831 for more detail. - * - * @return {@code true} to set state to {@link State#FATAL_ERROR} before throwing an exception, - * {@code false} to throw an exception without first changing the state - */ - protected boolean shouldSetToFatalErrorState() { - return Thread.currentThread() instanceof Sender.SenderThread; - } - private void ensureTransactional() { if (!isTransactional()) throw new IllegalStateException("Transactional method invoked on a non-transactional producer."); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 2e1008d514a85..42787d600742e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -4378,7 +4378,7 @@ private void runUntil(Supplier condition) { private static class TestableTransactionManager extends TransactionManager { - private Optional shouldSetToFatalErrorStateOverride; + private Optional shouldPoisonStateOnInvalidTransitionOverride; public TestableTransactionManager(LogContext logContext, String transactionalId, @@ -4386,16 +4386,16 @@ public TestableTransactionManager(LogContext logContext, long retryBackoffMs, ApiVersions apiVersions) { super(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs, apiVersions); - this.shouldSetToFatalErrorStateOverride = Optional.empty(); + this.shouldPoisonStateOnInvalidTransitionOverride = Optional.empty(); } private void setShouldSetToFatalErrorStateOverride(boolean override) { - shouldSetToFatalErrorStateOverride = Optional.of(override); + shouldPoisonStateOnInvalidTransitionOverride = Optional.of(override); } @Override - protected boolean shouldSetToFatalErrorState() { - return shouldSetToFatalErrorStateOverride.orElseGet(super::shouldSetToFatalErrorState); + protected boolean shouldPoisonStateOnInvalidTransition() { + return shouldPoisonStateOnInvalidTransitionOverride.orElseGet(super::shouldPoisonStateOnInvalidTransition); } } } From d1558b4d3fa193a65e02615a430891af48c1083e Mon Sep 17 00:00:00 2001 From: Kirk True Date: Fri, 18 Apr 2025 18:07:52 -0700 Subject: [PATCH 7/8] More clean up --- .../producer/internals/TransactionManagerTest.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 42787d600742e..a6d866d77af9d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -3799,14 +3799,12 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(t @Test public void testBackgroundInvalidStateTransitionIsFatal() { - // The default logic is to poison the transaction manager on an invalid state transition attempt if that - // attempt happens on the Sender thread. Here the logic is altered to always poison on invalid attempts to - // mimic as though it's being invoked by the Sender. initializeTransactionManager(Optional.of(transactionalId), true); - transactionManager.setShouldSetToFatalErrorStateOverride(true); doInitTransactions(); assertTrue(transactionManager.isTransactional()); + transactionManager.setShouldSetToFatalErrorStateOverride(true); + // Intentionally perform an operation that will cause an invalid state transition. The detection of this // will result in a poisoning of the transaction manager for all subsequent transactional operations since // it was performed in the background. @@ -4376,6 +4374,10 @@ private void runUntil(Supplier condition) { ProducerTestUtils.runUntil(sender, condition); } + /** + * This subclass exists only to optionally change the default behavior related to poisoning the state + * on invalid state transition attempts. + */ private static class TestableTransactionManager extends TransactionManager { private Optional shouldPoisonStateOnInvalidTransitionOverride; @@ -4395,6 +4397,7 @@ private void setShouldSetToFatalErrorStateOverride(boolean override) { @Override protected boolean shouldPoisonStateOnInvalidTransition() { + // If there's an override, use it, otherwise invoke the default (i.e. super class) logic. return shouldPoisonStateOnInvalidTransitionOverride.orElseGet(super::shouldPoisonStateOnInvalidTransition); } } From a6357726c913353627361da0824eb129fb5a4737 Mon Sep 17 00:00:00 2001 From: Kirk True Date: Fri, 18 Apr 2025 18:09:50 -0700 Subject: [PATCH 8/8] Minor change in test method name --- .../clients/producer/internals/TransactionManagerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index a6d866d77af9d..628f7c8570e2b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -3803,7 +3803,7 @@ public void testBackgroundInvalidStateTransitionIsFatal() { doInitTransactions(); assertTrue(transactionManager.isTransactional()); - transactionManager.setShouldSetToFatalErrorStateOverride(true); + transactionManager.setShouldPoisonStateOnInvalidTransitionOverride(true); // Intentionally perform an operation that will cause an invalid state transition. The detection of this // will result in a poisoning of the transaction manager for all subsequent transactional operations since @@ -4391,7 +4391,7 @@ public TestableTransactionManager(LogContext logContext, this.shouldPoisonStateOnInvalidTransitionOverride = Optional.empty(); } - private void setShouldSetToFatalErrorStateOverride(boolean override) { + private void setShouldPoisonStateOnInvalidTransitionOverride(boolean override) { shouldPoisonStateOnInvalidTransitionOverride = Optional.of(override); }