Skip to content

KAFKA-19082: [1/4] Add client config for enable2PC and overloaded initProducerId (KIP-939) #13

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: trunk
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -596,14 +596,17 @@ private TransactionManager configureTransactionState(ProducerConfig config,

if (config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) {
final String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
final boolean enable2PC = config.getBoolean(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
final int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
final long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);

transactionManager = new TransactionManager(
logContext,
transactionalId,
transactionTimeoutMs,
retryBackoffMs,
apiVersions
apiVersions,
enable2PC
);

if (transactionManager.isTransactional())
Expand Down Expand Up @@ -644,10 +647,43 @@ private TransactionManager configureTransactionState(ProducerConfig config,
* @throws InterruptException if the thread is interrupted while blocked
*/
public void initTransactions() {
initTransactions(false);
}

/**
* Initialize the transactional state for this producer, similar to {@link #initTransactions()} but
* with additional capabilities to keep a previously prepared transaction.
* Must be called before any send operations that require a {@code transactionalId}.
* <p>
* Unlike the standard {@link #initTransactions()}, when {@code keepPreparedTxn} is set to
* {@code true}, the producer does <em>not</em> automatically abort existing transactions.
* Instead, it enters a recovery mode allowing only finalization of those previously prepared transactions.
*
* This behavior is especially crucial for 2PC scenarios, where transactions should remain intact
* until the external transaction manager decides whether to commit or abort.
* <p>
* When {@code keepPreparedTxn} is {@code false}, this behaves like the normal transactional
* initialization, aborting any unfinished transactions and resetting the producer for
* new writes.
*
* @param keepPreparedTxn true to retain any in-flight prepared transactions (necessary for 2PC
* recovery), false to abort existing transactions and behave like
* the standard initTransactions
*
* @throws IllegalStateException if no {@code transactional.id} is configured
* @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not
* support transactions (i.e. if its version is lower than 0.11.0.0)
* @throws org.apache.kafka.common.errors.TransactionalIdAuthorizationException if the configured
* {@code transactional.id} is unauthorized either for normal transaction writes or 2PC.
* @throws KafkaException if the producer encounters a fatal error or any other unexpected error
* @throws TimeoutException if the time taken for initialize the transaction has surpassed <code>max.block.ms</code>.
* @throws InterruptException if the thread is interrupted while blocked
*/
public void initTransactions(boolean keepPreparedTxn) {
throwIfNoTransactionManager();
throwIfProducerClosed();
long now = time.nanoseconds();
TransactionalRequestResult result = transactionManager.initializeTransactions();
TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn);
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
producerMetrics.recordInit(time.nanoseconds() - now);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public MockProducer() {
}

@Override
public void initTransactions() {
public void initTransactions(boolean keepPreparedTxn) {
verifyNotClosed();
verifyNotFenced();
if (this.transactionInitialized) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,14 @@ public interface Producer<K, V> extends Closeable {
/**
* See {@link KafkaProducer#initTransactions()}
*/
void initTransactions();
default void initTransactions() {
initTransactions(false);
}

/**
* See {@link KafkaProducer#initTransactions(boolean)}
*/
void initTransactions(boolean keepPreparedTxn);

/**
* See {@link KafkaProducer#beginTransaction()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,11 @@ public class ProducerConfig extends AbstractConfig {
"By default the TransactionId is not configured, which means transactions cannot be used. " +
"Note that, by default, transactions require a cluster of at least three brokers which is the recommended setting for production; for development you can change this, by adjusting broker setting <code>transaction.state.log.replication.factor</code>.";

/** <code> transaction.two.phase.commit.enable </code> */
public static final String TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG = "transaction.two.phase.commit.enable";
private static final String TRANSACTION_TWO_PHASE_COMMIT_ENABLE_DOC = "If set to true, then the broker is informed that the client is participating in " +
"two phase commit protocol and transactions that this client starts never expire.";

/**
* <code>security.providers</code>
*/
Expand Down Expand Up @@ -526,6 +531,11 @@ public class ProducerConfig extends AbstractConfig {
new ConfigDef.NonEmptyString(),
Importance.LOW,
TRANSACTIONAL_ID_DOC)
.define(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG,
Type.BOOLEAN,
false,
Importance.LOW,
TRANSACTION_TWO_PHASE_COMMIT_ENABLE_DOC)
.define(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY,
Expand Down Expand Up @@ -609,6 +619,20 @@ private void postProcessAndValidateIdempotenceConfigs(final Map<String, Object>
if (!idempotenceEnabled && userConfiguredTransactions) {
throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
}

// Validate that transaction.timeout.ms is not set when transaction.two.phase.commit.enable is true
// In standard Kafka transactions, the broker enforces transaction.timeout.ms and aborts any
// transaction that isn't completed in time. With two-phase commit (2PC), an external coordinator
// decides when to finalize, so broker-side timeouts don't apply. Disallow using both.
boolean enable2PC = this.getBoolean(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
boolean userConfiguredTransactionTimeout = originalConfigs.containsKey(TRANSACTION_TIMEOUT_CONFIG);
if (enable2PC && userConfiguredTransactionTimeout) {
throw new ConfigException(
"Cannot set " + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG +
" when " + ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG +
" is set to true. Transactions will not expire with two-phase commit enabled."
);
}
}

private static String parseAcks(String acksString) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public class TransactionManager {
*
* <ul>
* <li>{@link Producer#initTransactions()} calls {@link #initializeTransactions()}</li>
* <li>{@link Producer#initTransactions(boolean)} calls {@link #initializeTransactions(boolean)}</li>
* <li>{@link Producer#beginTransaction()} calls {@link #beginTransaction()}</li>
* <li>{@link Producer#commitTransaction()}} calls {@link #beginCommit()}</li>
* <li>{@link Producer#abortTransaction()} calls {@link #beginAbort()}
Expand Down Expand Up @@ -195,6 +196,7 @@ public class TransactionManager {
private volatile boolean clientSideEpochBumpRequired = false;
private volatile long latestFinalizedFeaturesEpoch = -1;
private volatile boolean isTransactionV2Enabled = false;
private final boolean enable2PC;

private enum State {
UNINITIALIZED,
Expand Down Expand Up @@ -255,7 +257,8 @@ public TransactionManager(final LogContext logContext,
final String transactionalId,
final int transactionTimeoutMs,
final long retryBackoffMs,
final ApiVersions apiVersions) {
final ApiVersions apiVersions,
final boolean enable2PC) {
this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
this.transactionalId = transactionalId;
this.log = logContext.logger(TransactionManager.class);
Expand All @@ -273,17 +276,29 @@ public TransactionManager(final LogContext logContext,
this.retryBackoffMs = retryBackoffMs;
this.txnPartitionMap = new TxnPartitionMap(logContext);
this.apiVersions = apiVersions;
this.enable2PC = enable2PC;
}

void setPoisonStateOnInvalidTransition(boolean shouldPoisonState) {
shouldPoisonStateOnInvalidTransition.set(shouldPoisonState);
}

public synchronized TransactionalRequestResult initializeTransactions() {
return initializeTransactions(ProducerIdAndEpoch.NONE);
return initializeTransactions(ProducerIdAndEpoch.NONE, false);
}

synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) {
return initializeTransactions(producerIdAndEpoch, false);
}

public synchronized TransactionalRequestResult initializeTransactions(boolean keepPreparedTxn) {
return initializeTransactions(ProducerIdAndEpoch.NONE, keepPreparedTxn);
}

synchronized TransactionalRequestResult initializeTransactions(
ProducerIdAndEpoch producerIdAndEpoch,
boolean keepPreparedTxn
) {
maybeFailWithError();

boolean isEpochBump = producerIdAndEpoch != ProducerIdAndEpoch.NONE;
Expand All @@ -292,14 +307,20 @@ synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoc
if (!isEpochBump) {
transitionTo(State.INITIALIZING);
log.info("Invoking InitProducerId for the first time in order to acquire a producer ID");
if (keepPreparedTxn) {
log.info("Invoking InitProducerId with keepPreparedTxn set to true for 2PC transactions");
}
} else {
log.info("Invoking InitProducerId with current producer ID and epoch {} in order to bump the epoch", producerIdAndEpoch);
}
InitProducerIdRequestData requestData = new InitProducerIdRequestData()
.setTransactionalId(transactionalId)
.setTransactionTimeoutMs(transactionTimeoutMs)
.setProducerId(producerIdAndEpoch.producerId)
.setProducerEpoch(producerIdAndEpoch.epoch);
.setProducerEpoch(producerIdAndEpoch.epoch)
.setEnable2Pc(enable2PC)
.setKeepPreparedTxn(keepPreparedTxn);

InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
isEpochBump);
enqueueRequest(handler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.MetadataResponse;
Expand Down Expand Up @@ -102,6 +103,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
Expand Down Expand Up @@ -1289,7 +1291,7 @@ public void testInitTransactionsResponseAfterTimeout() throws Exception {
((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
FindCoordinatorResponse.prepareResponse(Errors.NONE, "bad-transaction", NODE));

Future<?> future = executor.submit(producer::initTransactions);
Future<?> future = executor.submit(() -> producer.initTransactions());
TestUtils.waitForCondition(client::hasInFlightRequests,
"Timed out while waiting for expected `InitProducerId` request to be sent");

Expand Down Expand Up @@ -1364,6 +1366,59 @@ public void testInitTransactionWhileThrottled() {
}
}

@ParameterizedTest
@CsvSource({
"true, false",
"true, true",
"false, true"
})
public void testInitTransactionsWithKeepPreparedTxnAndTwoPhaseCommit(boolean keepPreparedTxn, boolean enable2PC) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-txn-id");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
if (enable2PC) {
configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, true);
}

Time time = new MockTime(1);
MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
ProducerMetadata metadata = newMetadata(0, 0, Long.MAX_VALUE);
MockClient client = new MockClient(time, metadata);
client.updateMetadata(initialUpdateResponse);

// Capture flags from the InitProducerIdRequest
boolean[] requestFlags = new boolean[2]; // [keepPreparedTxn, enable2Pc]

client.prepareResponse(
request -> request instanceof FindCoordinatorRequest &&
((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
FindCoordinatorResponse.prepareResponse(Errors.NONE, "test-txn-id", NODE));

client.prepareResponse(
request -> {
if (request instanceof InitProducerIdRequest) {
InitProducerIdRequest initRequest = (InitProducerIdRequest) request;
requestFlags[0] = initRequest.data().keepPreparedTxn();
requestFlags[1] = initRequest.data().enable2Pc();
return true;
}
return false;
},
initProducerIdResponse(1L, (short) 5, Errors.NONE));

try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
new StringSerializer(), metadata, client, null, time)) {
producer.initTransactions(keepPreparedTxn);

// Verify request flags match expected values
assertEquals(keepPreparedTxn, requestFlags[0],
"keepPreparedTxn flag should match input parameter");
assertEquals(enable2PC, requestFlags[1],
"enable2Pc flag should match producer configuration");
}
}

@Test
public void testClusterAuthorizationFailure() throws Exception {
int maxBlockMs = 500;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,27 @@ void testUpperboundCheckOfEnableIdempotence() {
configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
assertDoesNotThrow(() -> new ProducerConfig(configs));
}

@Test
void testTwoPhaseCommitIncompatibleWithTransactionTimeout() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-txn-id");
configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, true);
configs.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);

ConfigException ce = assertThrows(ConfigException.class, () -> new ProducerConfig(configs));
assertTrue(ce.getMessage().contains(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG));
assertTrue(ce.getMessage().contains(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG));

// Verify that setting one but not the other is valid
configs.remove(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
assertDoesNotThrow(() -> new ProducerConfig(configs));

configs.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);
configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, false);
assertDoesNotThrow(() -> new ProducerConfig(configs));
}
}
Loading