diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 64a3af83e6571..4819b8232b9d8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -596,14 +596,17 @@ private TransactionManager configureTransactionState(ProducerConfig config, if (config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) { final String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG); + final boolean enable2PC = config.getBoolean(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG); final int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG); final long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); + transactionManager = new TransactionManager( logContext, transactionalId, transactionTimeoutMs, retryBackoffMs, - apiVersions + apiVersions, + enable2PC ); if (transactionManager.isTransactional()) @@ -644,10 +647,43 @@ private TransactionManager configureTransactionState(ProducerConfig config, * @throws InterruptException if the thread is interrupted while blocked */ public void initTransactions() { + initTransactions(false); + } + + /** + * Initialize the transactional state for this producer, similar to {@link #initTransactions()} but + * with additional capabilities to keep a previously prepared transaction. + * Must be called before any send operations that require a {@code transactionalId}. + *

+ * Unlike the standard {@link #initTransactions()}, when {@code keepPreparedTxn} is set to + * {@code true}, the producer does not automatically abort existing transactions. + * Instead, it enters a recovery mode allowing only finalization of those previously prepared transactions. + * + * This behavior is especially crucial for 2PC scenarios, where transactions should remain intact + * until the external transaction manager decides whether to commit or abort. + *

+ * When {@code keepPreparedTxn} is {@code false}, this behaves like the normal transactional + * initialization, aborting any unfinished transactions and resetting the producer for + * new writes. + * + * @param keepPreparedTxn true to retain any in-flight prepared transactions (necessary for 2PC + * recovery), false to abort existing transactions and behave like + * the standard initTransactions + * + * @throws IllegalStateException if no {@code transactional.id} is configured + * @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not + * support transactions (i.e. if its version is lower than 0.11.0.0) + * @throws org.apache.kafka.common.errors.TransactionalIdAuthorizationException if the configured + * {@code transactional.id} is unauthorized either for normal transaction writes or 2PC. + * @throws KafkaException if the producer encounters a fatal error or any other unexpected error + * @throws TimeoutException if the time taken for initialize the transaction has surpassed max.block.ms. + * @throws InterruptException if the thread is interrupted while blocked + */ + public void initTransactions(boolean keepPreparedTxn) { throwIfNoTransactionManager(); throwIfProducerClosed(); long now = time.nanoseconds(); - TransactionalRequestResult result = transactionManager.initializeTransactions(); + TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn); sender.wakeup(); result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS); producerMetrics.recordInit(time.nanoseconds() - now); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index a4aac86df09fc..e3c5a23ca5195 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -142,7 +142,7 @@ public MockProducer() { } @Override - public void initTransactions() { + public void initTransactions(boolean keepPreparedTxn) { verifyNotClosed(); verifyNotFenced(); if (this.transactionInitialized) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index 798034dda6de2..a5cd92295ff96 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -42,7 +42,14 @@ public interface Producer extends Closeable { /** * See {@link KafkaProducer#initTransactions()} */ - void initTransactions(); + default void initTransactions() { + initTransactions(false); + } + + /** + * See {@link KafkaProducer#initTransactions(boolean)} + */ + void initTransactions(boolean keepPreparedTxn); /** * See {@link KafkaProducer#beginTransaction()} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 949c6c167ba8e..362d205e8c1aa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -355,6 +355,11 @@ public class ProducerConfig extends AbstractConfig { "By default the TransactionId is not configured, which means transactions cannot be used. " + "Note that, by default, transactions require a cluster of at least three brokers which is the recommended setting for production; for development you can change this, by adjusting broker setting transaction.state.log.replication.factor."; + /** transaction.two.phase.commit.enable */ + public static final String TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG = "transaction.two.phase.commit.enable"; + private static final String TRANSACTION_TWO_PHASE_COMMIT_ENABLE_DOC = "If set to true, then the broker is informed that the client is participating in " + + "two phase commit protocol and transactions that this client starts never expire."; + /** * security.providers */ @@ -526,6 +531,11 @@ public class ProducerConfig extends AbstractConfig { new ConfigDef.NonEmptyString(), Importance.LOW, TRANSACTIONAL_ID_DOC) + .define(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, + Type.BOOLEAN, + false, + Importance.LOW, + TRANSACTION_TWO_PHASE_COMMIT_ENABLE_DOC) .define(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY, @@ -609,6 +619,20 @@ private void postProcessAndValidateIdempotenceConfigs(final Map if (!idempotenceEnabled && userConfiguredTransactions) { throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence."); } + + // Validate that transaction.timeout.ms is not set when transaction.two.phase.commit.enable is true + // In standard Kafka transactions, the broker enforces transaction.timeout.ms and aborts any + // transaction that isn't completed in time. With two-phase commit (2PC), an external coordinator + // decides when to finalize, so broker-side timeouts don't apply. Disallow using both. + boolean enable2PC = this.getBoolean(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG); + boolean userConfiguredTransactionTimeout = originalConfigs.containsKey(TRANSACTION_TIMEOUT_CONFIG); + if (enable2PC && userConfiguredTransactionTimeout) { + throw new ConfigException( + "Cannot set " + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG + + " when " + ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG + + " is set to true. Transactions will not expire with two-phase commit enabled." + ); + } } private static String parseAcks(String acksString) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index c78134c72ecf2..317acc529db5a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -138,6 +138,7 @@ public class TransactionManager { * *