diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 64a3af83e6571..4819b8232b9d8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -596,14 +596,17 @@ private TransactionManager configureTransactionState(ProducerConfig config,
if (config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) {
final String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+ final boolean enable2PC = config.getBoolean(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
final int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
final long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
+
transactionManager = new TransactionManager(
logContext,
transactionalId,
transactionTimeoutMs,
retryBackoffMs,
- apiVersions
+ apiVersions,
+ enable2PC
);
if (transactionManager.isTransactional())
@@ -644,10 +647,43 @@ private TransactionManager configureTransactionState(ProducerConfig config,
* @throws InterruptException if the thread is interrupted while blocked
*/
public void initTransactions() {
+ initTransactions(false);
+ }
+
+ /**
+ * Initialize the transactional state for this producer, similar to {@link #initTransactions()} but
+ * with additional capabilities to keep a previously prepared transaction.
+ * Must be called before any send operations that require a {@code transactionalId}.
+ *
+ * Unlike the standard {@link #initTransactions()}, when {@code keepPreparedTxn} is set to
+ * {@code true}, the producer does not automatically abort existing transactions.
+ * Instead, it enters a recovery mode allowing only finalization of those previously prepared transactions.
+ *
+ * This behavior is especially crucial for 2PC scenarios, where transactions should remain intact
+ * until the external transaction manager decides whether to commit or abort.
+ *
+ * When {@code keepPreparedTxn} is {@code false}, this behaves like the normal transactional
+ * initialization, aborting any unfinished transactions and resetting the producer for
+ * new writes.
+ *
+ * @param keepPreparedTxn true to retain any in-flight prepared transactions (necessary for 2PC
+ * recovery), false to abort existing transactions and behave like
+ * the standard initTransactions
+ *
+ * @throws IllegalStateException if no {@code transactional.id} is configured
+ * @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not
+ * support transactions (i.e. if its version is lower than 0.11.0.0)
+ * @throws org.apache.kafka.common.errors.TransactionalIdAuthorizationException if the configured
+ * {@code transactional.id} is unauthorized either for normal transaction writes or 2PC.
+ * @throws KafkaException if the producer encounters a fatal error or any other unexpected error
+ * @throws TimeoutException if the time taken for initialize the transaction has surpassed max.block.ms
.
+ * @throws InterruptException if the thread is interrupted while blocked
+ */
+ public void initTransactions(boolean keepPreparedTxn) {
throwIfNoTransactionManager();
throwIfProducerClosed();
long now = time.nanoseconds();
- TransactionalRequestResult result = transactionManager.initializeTransactions();
+ TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn);
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
producerMetrics.recordInit(time.nanoseconds() - now);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index a4aac86df09fc..e3c5a23ca5195 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -142,7 +142,7 @@ public MockProducer() {
}
@Override
- public void initTransactions() {
+ public void initTransactions(boolean keepPreparedTxn) {
verifyNotClosed();
verifyNotFenced();
if (this.transactionInitialized) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index 798034dda6de2..a5cd92295ff96 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -42,7 +42,14 @@ public interface Producer extends Closeable {
/**
* See {@link KafkaProducer#initTransactions()}
*/
- void initTransactions();
+ default void initTransactions() {
+ initTransactions(false);
+ }
+
+ /**
+ * See {@link KafkaProducer#initTransactions(boolean)}
+ */
+ void initTransactions(boolean keepPreparedTxn);
/**
* See {@link KafkaProducer#beginTransaction()}
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 949c6c167ba8e..362d205e8c1aa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -355,6 +355,11 @@ public class ProducerConfig extends AbstractConfig {
"By default the TransactionId is not configured, which means transactions cannot be used. " +
"Note that, by default, transactions require a cluster of at least three brokers which is the recommended setting for production; for development you can change this, by adjusting broker setting transaction.state.log.replication.factor
.";
+ /** transaction.two.phase.commit.enable
*/
+ public static final String TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG = "transaction.two.phase.commit.enable";
+ private static final String TRANSACTION_TWO_PHASE_COMMIT_ENABLE_DOC = "If set to true, then the broker is informed that the client is participating in " +
+ "two phase commit protocol and transactions that this client starts never expire.";
+
/**
* security.providers
*/
@@ -526,6 +531,11 @@ public class ProducerConfig extends AbstractConfig {
new ConfigDef.NonEmptyString(),
Importance.LOW,
TRANSACTIONAL_ID_DOC)
+ .define(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG,
+ Type.BOOLEAN,
+ false,
+ Importance.LOW,
+ TRANSACTION_TWO_PHASE_COMMIT_ENABLE_DOC)
.define(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY,
@@ -609,6 +619,20 @@ private void postProcessAndValidateIdempotenceConfigs(final Map
if (!idempotenceEnabled && userConfiguredTransactions) {
throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
}
+
+ // Validate that transaction.timeout.ms is not set when transaction.two.phase.commit.enable is true
+ // In standard Kafka transactions, the broker enforces transaction.timeout.ms and aborts any
+ // transaction that isn't completed in time. With two-phase commit (2PC), an external coordinator
+ // decides when to finalize, so broker-side timeouts don't apply. Disallow using both.
+ boolean enable2PC = this.getBoolean(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
+ boolean userConfiguredTransactionTimeout = originalConfigs.containsKey(TRANSACTION_TIMEOUT_CONFIG);
+ if (enable2PC && userConfiguredTransactionTimeout) {
+ throw new ConfigException(
+ "Cannot set " + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG +
+ " when " + ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG +
+ " is set to true. Transactions will not expire with two-phase commit enabled."
+ );
+ }
}
private static String parseAcks(String acksString) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index c78134c72ecf2..317acc529db5a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -138,6 +138,7 @@ public class TransactionManager {
*
*
* - {@link Producer#initTransactions()} calls {@link #initializeTransactions()}
+ * - {@link Producer#initTransactions(boolean)} calls {@link #initializeTransactions(boolean)}
* - {@link Producer#beginTransaction()} calls {@link #beginTransaction()}
* - {@link Producer#commitTransaction()}} calls {@link #beginCommit()}
* - {@link Producer#abortTransaction()} calls {@link #beginAbort()}
@@ -195,6 +196,7 @@ public class TransactionManager {
private volatile boolean clientSideEpochBumpRequired = false;
private volatile long latestFinalizedFeaturesEpoch = -1;
private volatile boolean isTransactionV2Enabled = false;
+ private final boolean enable2PC;
private enum State {
UNINITIALIZED,
@@ -255,7 +257,8 @@ public TransactionManager(final LogContext logContext,
final String transactionalId,
final int transactionTimeoutMs,
final long retryBackoffMs,
- final ApiVersions apiVersions) {
+ final ApiVersions apiVersions,
+ final boolean enable2PC) {
this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
this.transactionalId = transactionalId;
this.log = logContext.logger(TransactionManager.class);
@@ -273,6 +276,7 @@ public TransactionManager(final LogContext logContext,
this.retryBackoffMs = retryBackoffMs;
this.txnPartitionMap = new TxnPartitionMap(logContext);
this.apiVersions = apiVersions;
+ this.enable2PC = enable2PC;
}
void setPoisonStateOnInvalidTransition(boolean shouldPoisonState) {
@@ -280,10 +284,21 @@ void setPoisonStateOnInvalidTransition(boolean shouldPoisonState) {
}
public synchronized TransactionalRequestResult initializeTransactions() {
- return initializeTransactions(ProducerIdAndEpoch.NONE);
+ return initializeTransactions(ProducerIdAndEpoch.NONE, false);
}
synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) {
+ return initializeTransactions(producerIdAndEpoch, false);
+ }
+
+ public synchronized TransactionalRequestResult initializeTransactions(boolean keepPreparedTxn) {
+ return initializeTransactions(ProducerIdAndEpoch.NONE, keepPreparedTxn);
+ }
+
+ synchronized TransactionalRequestResult initializeTransactions(
+ ProducerIdAndEpoch producerIdAndEpoch,
+ boolean keepPreparedTxn
+ ) {
maybeFailWithError();
boolean isEpochBump = producerIdAndEpoch != ProducerIdAndEpoch.NONE;
@@ -292,6 +307,9 @@ synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoc
if (!isEpochBump) {
transitionTo(State.INITIALIZING);
log.info("Invoking InitProducerId for the first time in order to acquire a producer ID");
+ if (keepPreparedTxn) {
+ log.info("Invoking InitProducerId with keepPreparedTxn set to true for 2PC transactions");
+ }
} else {
log.info("Invoking InitProducerId with current producer ID and epoch {} in order to bump the epoch", producerIdAndEpoch);
}
@@ -299,7 +317,10 @@ synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoc
.setTransactionalId(transactionalId)
.setTransactionTimeoutMs(transactionTimeoutMs)
.setProducerId(producerIdAndEpoch.producerId)
- .setProducerEpoch(producerIdAndEpoch.epoch);
+ .setProducerEpoch(producerIdAndEpoch.epoch)
+ .setEnable2Pc(enable2PC)
+ .setKeepPreparedTxn(keepPreparedTxn);
+
InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
isEpochBump);
enqueueRequest(handler);
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index fbb3484a03f7f..a87223b331948 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -73,6 +73,7 @@
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.MetadataResponse;
@@ -102,6 +103,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
@@ -1289,7 +1291,7 @@ public void testInitTransactionsResponseAfterTimeout() throws Exception {
((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
FindCoordinatorResponse.prepareResponse(Errors.NONE, "bad-transaction", NODE));
- Future> future = executor.submit(producer::initTransactions);
+ Future> future = executor.submit(() -> producer.initTransactions());
TestUtils.waitForCondition(client::hasInFlightRequests,
"Timed out while waiting for expected `InitProducerId` request to be sent");
@@ -1364,6 +1366,59 @@ public void testInitTransactionWhileThrottled() {
}
}
+ @ParameterizedTest
+ @CsvSource({
+ "true, false",
+ "true, true",
+ "false, true"
+ })
+ public void testInitTransactionsWithKeepPreparedTxnAndTwoPhaseCommit(boolean keepPreparedTxn, boolean enable2PC) {
+ Map configs = new HashMap<>();
+ configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-txn-id");
+ configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+ if (enable2PC) {
+ configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, true);
+ }
+
+ Time time = new MockTime(1);
+ MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ ProducerMetadata metadata = newMetadata(0, 0, Long.MAX_VALUE);
+ MockClient client = new MockClient(time, metadata);
+ client.updateMetadata(initialUpdateResponse);
+
+ // Capture flags from the InitProducerIdRequest
+ boolean[] requestFlags = new boolean[2]; // [keepPreparedTxn, enable2Pc]
+
+ client.prepareResponse(
+ request -> request instanceof FindCoordinatorRequest &&
+ ((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
+ FindCoordinatorResponse.prepareResponse(Errors.NONE, "test-txn-id", NODE));
+
+ client.prepareResponse(
+ request -> {
+ if (request instanceof InitProducerIdRequest) {
+ InitProducerIdRequest initRequest = (InitProducerIdRequest) request;
+ requestFlags[0] = initRequest.data().keepPreparedTxn();
+ requestFlags[1] = initRequest.data().enable2Pc();
+ return true;
+ }
+ return false;
+ },
+ initProducerIdResponse(1L, (short) 5, Errors.NONE));
+
+ try (Producer producer = kafkaProducer(configs, new StringSerializer(),
+ new StringSerializer(), metadata, client, null, time)) {
+ producer.initTransactions(keepPreparedTxn);
+
+ // Verify request flags match expected values
+ assertEquals(keepPreparedTxn, requestFlags[0],
+ "keepPreparedTxn flag should match input parameter");
+ assertEquals(enable2PC, requestFlags[1],
+ "enable2Pc flag should match producer configuration");
+ }
+ }
+
@Test
public void testClusterAuthorizationFailure() throws Exception {
int maxBlockMs = 500;
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
index 830711c0e5449..207bac6476fc1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
@@ -145,4 +145,27 @@ void testUpperboundCheckOfEnableIdempotence() {
configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
assertDoesNotThrow(() -> new ProducerConfig(configs));
}
+
+ @Test
+ void testTwoPhaseCommitIncompatibleWithTransactionTimeout() {
+ Map configs = new HashMap<>();
+ configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
+ configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
+ configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+ configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-txn-id");
+ configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, true);
+ configs.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);
+
+ ConfigException ce = assertThrows(ConfigException.class, () -> new ProducerConfig(configs));
+ assertTrue(ce.getMessage().contains(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG));
+ assertTrue(ce.getMessage().contains(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG));
+
+ // Verify that setting one but not the other is valid
+ configs.remove(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
+ assertDoesNotThrow(() -> new ProducerConfig(configs));
+
+ configs.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);
+ configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, false);
+ assertDoesNotThrow(() -> new ProducerConfig(configs));
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index e67be76eb9baf..eb4eb7df3f0fe 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -490,7 +490,7 @@ public void senderThreadShouldNotGetStuckWhenThrottledAndAddingPartitionsToTxn()
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
- TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions, false);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -616,7 +616,7 @@ public void testInitProducerIdWithMaxInFlightOne() {
// Initialize transaction manager. InitProducerId will be queued up until metadata response
// is processed and FindCoordinator can be sent to `leastLoadedNode`.
TransactionManager transactionManager = new TransactionManager(new LogContext(), "testInitProducerIdWithPendingMetadataRequest",
- 60000, 100L, new ApiVersions());
+ 60000, 100L, new ApiVersions(), false);
setupWithTransactionState(transactionManager, false, null, false);
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(producerId, (short) 0);
transactionManager.initializeTransactions();
@@ -668,7 +668,7 @@ public void testNodeNotReady() {
client = new MockClient(time, metadata);
TransactionManager transactionManager = new TransactionManager(new LogContext(), "testNodeNotReady",
- 60000, 100L, new ApiVersions());
+ 60000, 100L, new ApiVersions(), false);
setupWithTransactionState(transactionManager, false, null, true);
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(producerId, (short) 0);
transactionManager.initializeTransactions();
@@ -1510,7 +1510,7 @@ public void testExpiryOfFirstBatchShouldCauseEpochBumpIfFutureBatchesFail() thro
public void testUnresolvedSequencesAreNotFatal() throws Exception {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
- TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions, false);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -1795,7 +1795,7 @@ public void testCorrectHandlingOfDuplicateSequenceError() throws Exception {
@Test
public void testTransactionalUnknownProducerHandlingWhenRetentionLimitReached() throws Exception {
final long producerId = 343434L;
- TransactionManager transactionManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions);
+ TransactionManager transactionManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions, false);
setupWithTransactionState(transactionManager);
doInitTransactions(transactionManager, new ProducerIdAndEpoch(producerId, (short) 0));
@@ -2352,7 +2352,7 @@ public void testIdempotentSplitBatchAndSend() throws Exception {
public void testTransactionalSplitBatchAndSend() throws Exception {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
TopicPartition tp = new TopicPartition("testSplitBatchAndSend", 1);
- TransactionManager txnManager = new TransactionManager(logContext, "testSplitBatchAndSend", 60000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext, "testSplitBatchAndSend", 60000, 100, apiVersions, false);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -2694,7 +2694,7 @@ public void testTransactionalRequestsSentOnShutdown() {
Metrics m = new Metrics();
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
try {
- TransactionManager txnManager = new TransactionManager(logContext, "testTransactionalRequestsSentOnShutdown", 6000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext, "testTransactionalRequestsSentOnShutdown", 6000, 100, apiVersions, false);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager);
@@ -2727,7 +2727,7 @@ public void testRecordsFlushedImmediatelyOnTransactionCompletion() throws Except
int lingerMs = 50;
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
- TransactionManager txnManager = new TransactionManager(logContext, "txnId", 6000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext, "txnId", 6000, 100, apiVersions, false);
setupWithTransactionState(txnManager, lingerMs);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
@@ -2784,7 +2784,7 @@ public void testAwaitPendingRecordsBeforeCommittingTransaction() throws Exceptio
try (Metrics m = new Metrics()) {
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
- TransactionManager txnManager = new TransactionManager(logContext, "txnId", 6000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext, "txnId", 6000, 100, apiVersions, false);
setupWithTransactionState(txnManager);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
@@ -2855,7 +2855,7 @@ public void testIncompleteTransactionAbortOnShutdown() {
Metrics m = new Metrics();
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
try {
- TransactionManager txnManager = new TransactionManager(logContext, "testIncompleteTransactionAbortOnShutdown", 6000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext, "testIncompleteTransactionAbortOnShutdown", 6000, 100, apiVersions, false);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager);
@@ -2889,7 +2889,7 @@ public void testForceShutdownWithIncompleteTransaction() {
Metrics m = new Metrics();
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
try {
- TransactionManager txnManager = new TransactionManager(logContext, "testForceShutdownWithIncompleteTransaction", 6000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext, "testForceShutdownWithIncompleteTransaction", 6000, 100, apiVersions, false);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager);
@@ -2919,7 +2919,7 @@ public void testForceShutdownWithIncompleteTransaction() {
@Test
public void testTransactionAbortedExceptionOnAbortWithoutError() throws InterruptedException {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
- TransactionManager txnManager = new TransactionManager(logContext, "testTransactionAbortedExceptionOnAbortWithoutError", 60000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext, "testTransactionAbortedExceptionOnAbortWithoutError", 60000, 100, apiVersions, false);
setupWithTransactionState(txnManager, false, null);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -2945,7 +2945,7 @@ public void testTransactionAbortedExceptionOnAbortWithoutError() throws Interrup
public void testDoNotPollWhenNoRequestSent() {
client = spy(new MockClient(time, metadata));
- TransactionManager txnManager = new TransactionManager(logContext, "testDoNotPollWhenNoRequestSent", 6000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext, "testDoNotPollWhenNoRequestSent", 6000, 100, apiVersions, false);
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -2957,7 +2957,7 @@ public void testDoNotPollWhenNoRequestSent() {
@Test
public void testTooLargeBatchesAreSafelyRemoved() throws InterruptedException {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
- TransactionManager txnManager = new TransactionManager(logContext, "testSplitBatchAndSend", 60000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext, "testSplitBatchAndSend", 60000, 100, apiVersions, false);
setupWithTransactionState(txnManager, false, null);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -3026,7 +3026,7 @@ public void testSenderShouldRetryWithBackoffOnRetriableError() {
public void testReceiveFailedBatchTwiceWithTransactions() throws Exception {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
- TransactionManager txnManager = new TransactionManager(logContext, "testFailTwice", 60000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext, "testFailTwice", 60000, 100, apiVersions, false);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -3076,7 +3076,7 @@ public void testReceiveFailedBatchTwiceWithTransactions() throws Exception {
public void testInvalidTxnStateIsAnAbortableError() throws Exception {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
- TransactionManager txnManager = new TransactionManager(logContext, "testInvalidTxnState", 60000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext, "testInvalidTxnState", 60000, 100, apiVersions, false);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -3115,7 +3115,7 @@ public void testInvalidTxnStateIsAnAbortableError() throws Exception {
public void testTransactionAbortableExceptionIsAnAbortableError() throws Exception {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
- TransactionManager txnManager = new TransactionManager(logContext, "textTransactionAbortableException", 60000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext, "textTransactionAbortableException", 60000, 100, apiVersions, false);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -3617,7 +3617,7 @@ private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors e
}
private TransactionManager createTransactionManager() {
- return new TransactionManager(new LogContext(), null, 0, RETRY_BACKOFF_MS, new ApiVersions());
+ return new TransactionManager(new LogContext(), null, 0, RETRY_BACKOFF_MS, new ApiVersions(), false);
}
private void setupWithTransactionState(TransactionManager transactionManager) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 8b4decfb9598c..8a9bdd41ba8b5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -159,17 +159,28 @@ public void setup() {
this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, singletonMap("test", 2)));
this.brokerNode = new Node(0, "localhost", 2211);
- initializeTransactionManager(Optional.of(transactionalId), false);
+ initializeTransactionManager(Optional.of(transactionalId), false, false);
+ }
+
+ private void initializeTransactionManager(
+ Optional transactionalId,
+ boolean transactionV2Enabled
+ ) {
+ initializeTransactionManager(transactionalId, transactionV2Enabled, false);
}
- private void initializeTransactionManager(Optional transactionalId, boolean transactionV2Enabled) {
+ private void initializeTransactionManager(
+ Optional transactionalId,
+ boolean transactionV2Enabled,
+ boolean enable2pc
+ ) {
Metrics metrics = new Metrics(time);
apiVersions.update("0", new NodeApiVersions(Arrays.asList(
new ApiVersion()
.setApiKey(ApiKeys.INIT_PRODUCER_ID.id)
.setMinVersion((short) 0)
- .setMaxVersion((short) 3),
+ .setMaxVersion((short) 6),
new ApiVersion()
.setApiKey(ApiKeys.PRODUCE.id)
.setMinVersion((short) 0)
@@ -189,7 +200,7 @@ private void initializeTransactionManager(Optional transactionalId, bool
finalizedFeaturesEpoch));
finalizedFeaturesEpoch += 1;
this.transactionManager = new TransactionManager(logContext, transactionalId.orElse(null),
- transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions);
+ transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions, enable2pc);
int batchSize = 16 * 1024;
int deliveryTimeoutMs = 3000;
@@ -1039,7 +1050,7 @@ public void testTransactionManagerDisablesV2() {
.setMinVersionLevel((short) 1)),
0));
this.transactionManager = new TransactionManager(logContext, transactionalId,
- transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions);
+ transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions, false);
int batchSize = 16 * 1024;
int deliveryTimeoutMs = 3000;
@@ -4035,16 +4046,39 @@ private void prepareFindCoordinatorResponse(Errors error, boolean shouldDisconne
}, FindCoordinatorResponse.prepareResponse(error, coordinatorKey, brokerNode), shouldDisconnect);
}
- private void prepareInitPidResponse(Errors error, boolean shouldDisconnect, long producerId, short producerEpoch) {
+ private void prepareInitPidResponse(
+ Errors error,
+ boolean shouldDisconnect,
+ long producerId,
+ short producerEpoch
+ ) {
+ prepareInitPidResponse(error, shouldDisconnect, producerId, producerEpoch, false, false, (long) -1, (short) -1);
+ }
+
+ private void prepareInitPidResponse(
+ Errors error,
+ boolean shouldDisconnect,
+ long producerId,
+ short producerEpoch,
+ boolean keepPreparedTxn,
+ boolean enable2Pc,
+ long ongoingProducerId,
+ short ongoingProducerEpoch
+ ) {
InitProducerIdResponseData responseData = new InitProducerIdResponseData()
- .setErrorCode(error.code())
- .setProducerEpoch(producerEpoch)
- .setProducerId(producerId)
- .setThrottleTimeMs(0);
+ .setErrorCode(error.code())
+ .setProducerEpoch(producerEpoch)
+ .setProducerId(producerId)
+ .setThrottleTimeMs(0)
+ .setOngoingTxnProducerId(ongoingProducerId)
+ .setOngoingTxnProducerEpoch(ongoingProducerEpoch);
+
client.prepareResponse(body -> {
InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest) body;
assertEquals(transactionalId, initProducerIdRequest.data().transactionalId());
assertEquals(transactionTimeoutMs, initProducerIdRequest.data().transactionTimeoutMs());
+ assertEquals(keepPreparedTxn, initProducerIdRequest.data().keepPreparedTxn());
+ assertEquals(enable2Pc, initProducerIdRequest.data().enable2Pc());
return true;
}, new InitProducerIdResponse(responseData), shouldDisconnect);
}
@@ -4373,4 +4407,36 @@ private void runUntil(Supplier condition) {
ProducerTestUtils.runUntil(sender, condition);
}
+ @Test
+ public void testInitializeTransactionsWithKeepPreparedTxn() {
+ initializeTransactionManager(Optional.of(transactionalId), true, true);
+
+ client.prepareResponse(
+ FindCoordinatorResponse.prepareResponse(Errors.NONE, transactionalId, brokerNode)
+ );
+
+ // Simulate an ongoing prepared transaction (ongoingProducerId != -1).
+ long ongoingProducerId = 999L;
+ short ongoingEpoch = 10;
+ short bumpedEpoch = 11;
+
+ prepareInitPidResponse(
+ Errors.NONE,
+ false,
+ ongoingProducerId,
+ bumpedEpoch,
+ true,
+ true,
+ ongoingProducerId,
+ ongoingEpoch
+ );
+
+ transactionManager.initializeTransactions(true);
+ runUntil(transactionManager::hasProducerId);
+
+ assertTrue(transactionManager.hasProducerId());
+ assertFalse(transactionManager.hasOngoingTransaction());
+ assertEquals(ongoingProducerId, transactionManager.producerIdAndEpoch().producerId);
+ assertEquals(bumpedEpoch, transactionManager.producerIdAndEpoch().epoch);
+ }
}