Skip to content

KAFKA-15767: Refactor TransactionManager to avoid use of ThreadLocal #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: trunk
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
Expand Down Expand Up @@ -256,7 +255,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
private final ProducerMetadata metadata;
private final RecordAccumulator accumulator;
private final Sender sender;
private final Thread ioThread;
private final Sender.SenderThread ioThread;
private final Compression compression;
private final Sensor errors;
private final Time time;
Expand Down Expand Up @@ -454,7 +453,7 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
this.errors = this.metrics.sensor("errors");
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread = new Sender.SenderThread(ioThreadName, this.sender, true);
this.ioThread.start();
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
Expand All @@ -480,7 +479,7 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
ProducerInterceptors<K, V> interceptors,
Partitioner partitioner,
Time time,
KafkaThread ioThread,
Sender.SenderThread ioThread,
Optional<ClientTelemetryReporter> clientTelemetryReporter) {
this.producerConfig = config;
this.time = time;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;

Expand Down Expand Up @@ -234,9 +235,6 @@ private boolean hasPendingTransactionalRequests() {
public void run() {
log.debug("Starting Kafka producer I/O thread.");

if (transactionManager != null)
transactionManager.setPoisonStateOnInvalidTransition(true);

// main loop, runs until close is called
while (running) {
try {
Expand Down Expand Up @@ -1072,4 +1070,10 @@ void recordBatchSplit() {
}
}

public static class SenderThread extends KafkaThread {

public SenderThread(final String name, Runnable runnable, boolean daemon) {
super(name, runnable, daemon);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,58 +120,6 @@ public class TransactionManager {
private final Set<TopicPartition> newPartitionsInTransaction;
private final Set<TopicPartition> pendingPartitionsInTransaction;
private final Set<TopicPartition> partitionsInTransaction;

/**
* During its normal course of operations, the transaction manager transitions through different internal
* states (i.e. by updating {@link #currentState}) to one of those defined in {@link State}. These state transitions
* result from actions on one of the following classes of threads:
*
* <ul>
* <li><em>Application</em> threads that invokes {@link Producer} API calls</li>
* <li><em>{@link Sender}</em> thread operations</li>
* </ul>
*
* When an invalid state transition is detected during execution on an <em>application</em> thread, the
* {@link #currentState} is <em>not updated</em> and an {@link IllegalStateException} is thrown. This gives the
* application the opportunity to fix the issue without permanently poisoning the state of the
* transaction manager. The {@link Producer} API calls that perform a state transition include:
*
* <ul>
* <li>{@link Producer#initTransactions()} calls {@link #initializeTransactions()}</li>
* <li>{@link Producer#beginTransaction()} calls {@link #beginTransaction()}</li>
* <li>{@link Producer#commitTransaction()}} calls {@link #beginCommit()}</li>
* <li>{@link Producer#abortTransaction()} calls {@link #beginAbort()}
* </li>
* <li>{@link Producer#sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} calls
* {@link #sendOffsetsToTransaction(Map, ConsumerGroupMetadata)}
* </li>
* <li>{@link Producer#send(ProducerRecord)} (and its variants) calls
* {@link #maybeAddPartition(TopicPartition)} and
* {@link #maybeTransitionToErrorState(RuntimeException)}
* </li>
* </ul>
*
* <p/>
*
* The {@link Producer} is implemented such that much of its work delegated to and performed asynchronously on the
* <em>{@link Sender}</em> thread. This includes record batching, network I/O, broker response handlers, etc. If an
* invalid state transition is detected in the <em>{@link Sender}</em> thread, in addition to throwing an
* {@link IllegalStateException}, the transaction manager intentionally "poisons" itself by setting its
* {@link #currentState} to {@link State#FATAL_ERROR}, a state from which it cannot recover.
*
* <p/>
*
* It's important to prevent possible corruption when the transaction manager has determined that it is in a
* fatal state. Subsequent transaction operations attempted via either the <em>application</em> or the
* <em>{@link Sender}</em> thread should fail. This is achieved when these operations invoke the
* {@link #maybeFailWithError()} method, as it causes a {@link KafkaException} to be thrown, ensuring the stated
* transactional guarantees are not violated.
*
* <p/>
*
* See KAFKA-14831 for more detail.
*/
private final ThreadLocal<Boolean> shouldPoisonStateOnInvalidTransition;
private PendingStateTransition pendingTransition;

// This is used by the TxnRequestHandlers to control how long to back off before a given request is retried.
Expand Down Expand Up @@ -265,7 +213,6 @@ public TransactionManager(final LogContext logContext,
this.newPartitionsInTransaction = new HashSet<>();
this.pendingPartitionsInTransaction = new HashSet<>();
this.partitionsInTransaction = new HashSet<>();
this.shouldPoisonStateOnInvalidTransition = ThreadLocal.withInitial(() -> false);
this.pendingRequests = new PriorityQueue<>(10, Comparator.comparingInt(o -> o.priority().priority));
this.pendingTxnOffsetCommits = new HashMap<>();
this.partitionsWithUnresolvedSequences = new HashMap<>();
Expand All @@ -275,8 +222,61 @@ public TransactionManager(final LogContext logContext,
this.apiVersions = apiVersions;
}

void setPoisonStateOnInvalidTransition(boolean shouldPoisonState) {
shouldPoisonStateOnInvalidTransition.set(shouldPoisonState);
/**
* During its normal course of operations, the transaction manager transitions through different internal
* states (i.e. by updating {@link #currentState}) to one of those defined in {@link State}. These state transitions
* result from actions on one of the following classes of threads:
*
* <ul>
* <li><em>Application</em> threads that invokes {@link Producer} API calls</li>
* <li><em>{@link Sender}</em> thread operations</li>
* </ul>
*
* When an invalid state transition is detected during execution on an <em>application</em> thread, the
* {@link #currentState} is <em>not updated</em> and an {@link IllegalStateException} is thrown. This gives the
* application the opportunity to fix the issue without permanently poisoning the state of the
* transaction manager. The {@link Producer} API calls that perform a state transition include:
*
* <ul>
* <li>{@link Producer#initTransactions()} calls {@link #initializeTransactions()}</li>
* <li>{@link Producer#beginTransaction()} calls {@link #beginTransaction()}</li>
* <li>{@link Producer#commitTransaction()}} calls {@link #beginCommit()}</li>
* <li>{@link Producer#abortTransaction()} calls {@link #beginAbort()}
* </li>
* <li>{@link Producer#sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} calls
* {@link #sendOffsetsToTransaction(Map, ConsumerGroupMetadata)}
* </li>
* <li>{@link Producer#send(ProducerRecord)} (and its variants) calls
* {@link #maybeAddPartition(TopicPartition)} and
* {@link #maybeTransitionToErrorState(RuntimeException)}
* </li>
* </ul>
*
* <p/>
*
* The {@link Producer} is implemented such that much of its work delegated to and performed asynchronously on the
* <em>{@link Sender}</em> thread. This includes record batching, network I/O, broker response handlers, etc. If an
* invalid state transition is detected in the <em>{@link Sender}</em> thread, in addition to throwing an
* {@link IllegalStateException}, the transaction manager intentionally "poisons" itself by setting its
* {@link #currentState} to {@link State#FATAL_ERROR}, a state from which it cannot recover.
*
* <p/>
*
* It's important to prevent possible corruption when the transaction manager has determined that it is in a
* fatal state. Subsequent transaction operations attempted via either the <em>application</em> or the
* <em>{@link Sender}</em> thread should fail. This is achieved when these operations invoke the
* {@link #maybeFailWithError()} method, as it causes a {@link KafkaException} to be thrown, ensuring the stated
* transactional guarantees are not violated.
*
* <p/>
*
* See KAFKA-14831 for more detail.
*
* @return {@code true} to set state to {@link State#FATAL_ERROR} before throwing an exception,
* {@code false} to throw an exception without first changing the state
*/
protected boolean shouldPoisonStateOnInvalidTransition() {
return Thread.currentThread() instanceof Sender.SenderThread;
}

public synchronized TransactionalRequestResult initializeTransactions() {
Expand Down Expand Up @@ -1063,7 +1063,7 @@ private void transitionTo(State target, RuntimeException error) {
String message = idString + "Invalid transition attempted from state "
+ currentState.name() + " to state " + target.name();

if (shouldPoisonStateOnInvalidTransition.get()) {
if (shouldPoisonStateOnInvalidTransition()) {
currentState = State.FATAL_ERROR;
lastError = new IllegalStateException(message);
throw lastError;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
Expand Down Expand Up @@ -2542,7 +2541,7 @@ private static class KafkaProducerTestContext<T> {
private final Map<String, Object> configs;
private final Serializer<T> serializer;
private final Partitioner partitioner = mock(Partitioner.class);
private final KafkaThread ioThread = mock(KafkaThread.class);
private final Sender.SenderThread senderThread = mock(Sender.SenderThread.class);
private final List<ProducerInterceptor<T, T>> interceptors = new ArrayList<>();
private ProducerMetadata metadata = mock(ProducerMetadata.class);
private RecordAccumulator accumulator = mock(RecordAccumulator.class);
Expand Down Expand Up @@ -2623,7 +2622,7 @@ public KafkaProducer<T, T> newKafkaProducer() {
interceptors,
partitioner,
time,
ioThread,
senderThread,
Optional.empty()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public class TransactionManagerTest {

private RecordAccumulator accumulator = null;
private Sender sender = null;
private TransactionManager transactionManager = null;
private TestableTransactionManager transactionManager = null;
private Node brokerNode = null;
private long finalizedFeaturesEpoch = 0;

Expand Down Expand Up @@ -188,7 +188,7 @@ private void initializeTransactionManager(Optional<String> transactionalId, bool
.setMinVersionLevel(transactionV2Enabled ? (short) 2 : (short) 1)),
finalizedFeaturesEpoch));
finalizedFeaturesEpoch += 1;
this.transactionManager = new TransactionManager(logContext, transactionalId.orElse(null),
this.transactionManager = new TestableTransactionManager(logContext, transactionalId.orElse(null),
transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions);

int batchSize = 16 * 1024;
Expand Down Expand Up @@ -1038,7 +1038,7 @@ public void testTransactionManagerDisablesV2() {
.setMaxVersionLevel((short) 1)
.setMinVersionLevel((short) 1)),
0));
this.transactionManager = new TransactionManager(logContext, transactionalId,
this.transactionManager = new TestableTransactionManager(logContext, transactionalId,
transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions);

int batchSize = 16 * 1024;
Expand Down Expand Up @@ -3799,10 +3799,11 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(t

@Test
public void testBackgroundInvalidStateTransitionIsFatal() {
initializeTransactionManager(Optional.of(transactionalId), true);
doInitTransactions();
assertTrue(transactionManager.isTransactional());

transactionManager.setPoisonStateOnInvalidTransition(true);
transactionManager.setShouldPoisonStateOnInvalidTransitionOverride(true);

// Intentionally perform an operation that will cause an invalid state transition. The detection of this
// will result in a poisoning of the transaction manager for all subsequent transactional operations since
Expand Down Expand Up @@ -4373,4 +4374,31 @@ private void runUntil(Supplier<Boolean> condition) {
ProducerTestUtils.runUntil(sender, condition);
}

/**
* This subclass exists only to optionally change the default behavior related to poisoning the state
* on invalid state transition attempts.
*/
private static class TestableTransactionManager extends TransactionManager {

private Optional<Boolean> shouldPoisonStateOnInvalidTransitionOverride;

public TestableTransactionManager(LogContext logContext,
String transactionalId,
int transactionTimeoutMs,
long retryBackoffMs,
ApiVersions apiVersions) {
super(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs, apiVersions);
this.shouldPoisonStateOnInvalidTransitionOverride = Optional.empty();
}

private void setShouldPoisonStateOnInvalidTransitionOverride(boolean override) {
shouldPoisonStateOnInvalidTransitionOverride = Optional.of(override);
}

@Override
protected boolean shouldPoisonStateOnInvalidTransition() {
// If there's an override, use it, otherwise invoke the default (i.e. super class) logic.
return shouldPoisonStateOnInvalidTransitionOverride.orElseGet(super::shouldPoisonStateOnInvalidTransition);
}
}
}