Skip to content

Commit 4a08713

Browse files
committed
address comments
1 parent 4618b63 commit 4a08713

File tree

3 files changed

+11
-33
lines changed

3 files changed

+11
-33
lines changed

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

+8-15
Original file line numberDiff line numberDiff line change
@@ -647,27 +647,20 @@ private TransactionManager configureTransactionState(ProducerConfig config,
647647
* @throws InterruptException if the thread is interrupted while blocked
648648
*/
649649
public void initTransactions() {
650-
throwIfNoTransactionManager();
651-
throwIfProducerClosed();
652-
long now = time.nanoseconds();
653-
TransactionalRequestResult result = transactionManager.initializeTransactions();
654-
sender.wakeup();
655-
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
656-
producerMetrics.recordInit(time.nanoseconds() - now);
657-
transactionManager.maybeUpdateTransactionV2Enabled(true);
650+
initTransactions(false);
658651
}
659652

660653
/**
661654
* Initialize the transactional state for this producer, similar to {@link #initTransactions()} but
662-
* with additional handling for two-phase commit (2PC). Must be called before any send operations
663-
* that require a {@code transactionalId}.
655+
* with additional capabilities to keep a previously prepared transaction.
656+
* Must be called before any send operations that require a {@code transactionalId}.
664657
* <p>
665658
* Unlike the standard {@link #initTransactions()}, when {@code keepPreparedTxn} is set to
666-
* {@code true}, the producer does <em>not</em> automatically abort existing transactions
667-
* in the “prepare” phase. Instead, it enters a recovery mode allowing only finalization
668-
* of those previously prepared transactions. This behavior is crucial for 2PC scenarios,
669-
* where transactions should remain intact until the external transaction manager decides
670-
* whether to commit or abort.
659+
* {@code true}, the producer does <em>not</em> automatically abort existing transactions.
660+
* Instead, it enters a recovery mode allowing only finalization of those previously prepared transactions.
661+
*
662+
* This behavior is especially crucial for 2PC scenarios, where transactions should remain intact
663+
* until the external transaction manager decides whether to commit or abort.
671664
* <p>
672665
* When {@code keepPreparedTxn} is {@code false}, this behaves like the normal transactional
673666
* initialization, aborting any unfinished transactions and resetting the producer for

clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java

-17
Original file line numberDiff line numberDiff line change
@@ -141,23 +141,6 @@ public MockProducer() {
141141
this(Cluster.empty(), false, null, null, null);
142142
}
143143

144-
@Override
145-
public void initTransactions() {
146-
verifyNotClosed();
147-
verifyNotFenced();
148-
if (this.transactionInitialized) {
149-
throw new IllegalStateException("MockProducer has already been initialized for transactions.");
150-
}
151-
if (this.initTransactionException != null) {
152-
throw this.initTransactionException;
153-
}
154-
this.transactionInitialized = true;
155-
this.transactionInFlight = false;
156-
this.transactionCommitted = false;
157-
this.transactionAborted = false;
158-
this.sentOffsets = false;
159-
}
160-
161144
@Override
162145
public void initTransactions(boolean keepPreparedTxn) {
163146
verifyNotClosed();

clients/src/main/java/org/apache/kafka/clients/producer/Producer.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ public interface Producer<K, V> extends Closeable {
4242
/**
4343
* See {@link KafkaProducer#initTransactions()}
4444
*/
45-
void initTransactions();
45+
default void initTransactions() {
46+
initTransactions(false);
47+
}
4648

4749
/**
4850
* See {@link KafkaProducer#initTransactions(boolean)}

0 commit comments

Comments
 (0)