Skip to content

Commit aa4817f

Browse files
committed
Attempt to minimize diffs
1 parent 72fd9c7 commit aa4817f

File tree

4 files changed

+66
-66
lines changed

4 files changed

+66
-66
lines changed

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,7 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
453453
this.errors = this.metrics.sensor("errors");
454454
this.sender = newSender(logContext, kafkaClient, this.metadata);
455455
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
456-
this.ioThread = new Sender.SenderThread(ioThreadName, this.sender);
456+
this.ioThread = new Sender.SenderThread(ioThreadName, this.sender, true);
457457
this.ioThread.start();
458458
config.logUnused();
459459
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());

clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1072,8 +1072,8 @@ void recordBatchSplit() {
10721072

10731073
public static class SenderThread extends KafkaThread {
10741074

1075-
public SenderThread(final String name, Runnable runnable) {
1076-
super(name, runnable, true);
1075+
public SenderThread(final String name, Runnable runnable, boolean daemon) {
1076+
super(name, runnable, daemon);
10771077
}
10781078
}
10791079
}

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java

+58-58
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,63 @@ public TransactionManager(final LogContext logContext,
222222
this.apiVersions = apiVersions;
223223
}
224224

225+
/**
226+
* During its normal course of operations, the transaction manager transitions through different internal
227+
* states (i.e. by updating {@link #currentState}) to one of those defined in {@link State}. These state transitions
228+
* result from actions on one of the following classes of threads:
229+
*
230+
* <ul>
231+
* <li><em>Application</em> threads that invokes {@link Producer} API calls</li>
232+
* <li><em>{@link Sender}</em> thread operations</li>
233+
* </ul>
234+
*
235+
* When an invalid state transition is detected during execution on an <em>application</em> thread, the
236+
* {@link #currentState} is <em>not updated</em> and an {@link IllegalStateException} is thrown. This gives the
237+
* application the opportunity to fix the issue without permanently poisoning the state of the
238+
* transaction manager. The {@link Producer} API calls that perform a state transition include:
239+
*
240+
* <ul>
241+
* <li>{@link Producer#initTransactions()} calls {@link #initializeTransactions()}</li>
242+
* <li>{@link Producer#beginTransaction()} calls {@link #beginTransaction()}</li>
243+
* <li>{@link Producer#commitTransaction()}} calls {@link #beginCommit()}</li>
244+
* <li>{@link Producer#abortTransaction()} calls {@link #beginAbort()}
245+
* </li>
246+
* <li>{@link Producer#sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} calls
247+
* {@link #sendOffsetsToTransaction(Map, ConsumerGroupMetadata)}
248+
* </li>
249+
* <li>{@link Producer#send(ProducerRecord)} (and its variants) calls
250+
* {@link #maybeAddPartition(TopicPartition)} and
251+
* {@link #maybeTransitionToErrorState(RuntimeException)}
252+
* </li>
253+
* </ul>
254+
*
255+
* <p/>
256+
*
257+
* The {@link Producer} is implemented such that much of its work delegated to and performed asynchronously on the
258+
* <em>{@link Sender}</em> thread. This includes record batching, network I/O, broker response handlers, etc. If an
259+
* invalid state transition is detected in the <em>{@link Sender}</em> thread, in addition to throwing an
260+
* {@link IllegalStateException}, the transaction manager intentionally "poisons" itself by setting its
261+
* {@link #currentState} to {@link State#FATAL_ERROR}, a state from which it cannot recover.
262+
*
263+
* <p/>
264+
*
265+
* It's important to prevent possible corruption when the transaction manager has determined that it is in a
266+
* fatal state. Subsequent transaction operations attempted via either the <em>application</em> or the
267+
* <em>{@link Sender}</em> thread should fail. This is achieved when these operations invoke the
268+
* {@link #maybeFailWithError()} method, as it causes a {@link KafkaException} to be thrown, ensuring the stated
269+
* transactional guarantees are not violated.
270+
*
271+
* <p/>
272+
*
273+
* See KAFKA-14831 for more detail.
274+
*
275+
* @return {@code true} to set state to {@link State#FATAL_ERROR} before throwing an exception,
276+
* {@code false} to throw an exception without first changing the state
277+
*/
278+
protected boolean shouldPoisonStateOnInvalidTransition() {
279+
return Thread.currentThread() instanceof Sender.SenderThread;
280+
}
281+
225282
public synchronized TransactionalRequestResult initializeTransactions() {
226283
return initializeTransactions(ProducerIdAndEpoch.NONE);
227284
}
@@ -1006,7 +1063,7 @@ private void transitionTo(State target, RuntimeException error) {
10061063
String message = idString + "Invalid transition attempted from state "
10071064
+ currentState.name() + " to state " + target.name();
10081065

1009-
if (shouldSetToFatalErrorState()) {
1066+
if (shouldPoisonStateOnInvalidTransition()) {
10101067
currentState = State.FATAL_ERROR;
10111068
lastError = new IllegalStateException(message);
10121069
throw lastError;
@@ -1029,63 +1086,6 @@ private void transitionTo(State target, RuntimeException error) {
10291086
currentState = target;
10301087
}
10311088

1032-
/**
1033-
* During its normal course of operations, the transaction manager transitions through different internal
1034-
* states (i.e. by updating {@link #currentState}) to one of those defined in {@link State}. These state transitions
1035-
* result from actions on one of the following classes of threads:
1036-
*
1037-
* <ul>
1038-
* <li><em>Application</em> threads that invokes {@link Producer} API calls</li>
1039-
* <li><em>{@link Sender}</em> thread operations</li>
1040-
* </ul>
1041-
*
1042-
* When an invalid state transition is detected during execution on an <em>application</em> thread, the
1043-
* {@link #currentState} is <em>not updated</em> and an {@link IllegalStateException} is thrown. This gives the
1044-
* application the opportunity to fix the issue without permanently poisoning the state of the
1045-
* transaction manager. The {@link Producer} API calls that perform a state transition include:
1046-
*
1047-
* <ul>
1048-
* <li>{@link Producer#initTransactions()} calls {@link #initializeTransactions()}</li>
1049-
* <li>{@link Producer#beginTransaction()} calls {@link #beginTransaction()}</li>
1050-
* <li>{@link Producer#commitTransaction()}} calls {@link #beginCommit()}</li>
1051-
* <li>{@link Producer#abortTransaction()} calls {@link #beginAbort()}
1052-
* </li>
1053-
* <li>{@link Producer#sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} calls
1054-
* {@link #sendOffsetsToTransaction(Map, ConsumerGroupMetadata)}
1055-
* </li>
1056-
* <li>{@link Producer#send(ProducerRecord)} (and its variants) calls
1057-
* {@link #maybeAddPartition(TopicPartition)} and
1058-
* {@link #maybeTransitionToErrorState(RuntimeException)}
1059-
* </li>
1060-
* </ul>
1061-
*
1062-
* <p/>
1063-
*
1064-
* The {@link Producer} is implemented such that much of its work delegated to and performed asynchronously on the
1065-
* <em>{@link Sender}</em> thread. This includes record batching, network I/O, broker response handlers, etc. If an
1066-
* invalid state transition is detected in the <em>{@link Sender}</em> thread, in addition to throwing an
1067-
* {@link IllegalStateException}, the transaction manager intentionally "poisons" itself by setting its
1068-
* {@link #currentState} to {@link State#FATAL_ERROR}, a state from which it cannot recover.
1069-
*
1070-
* <p/>
1071-
*
1072-
* It's important to prevent possible corruption when the transaction manager has determined that it is in a
1073-
* fatal state. Subsequent transaction operations attempted via either the <em>application</em> or the
1074-
* <em>{@link Sender}</em> thread should fail. This is achieved when these operations invoke the
1075-
* {@link #maybeFailWithError()} method, as it causes a {@link KafkaException} to be thrown, ensuring the stated
1076-
* transactional guarantees are not violated.
1077-
*
1078-
* <p/>
1079-
*
1080-
* See KAFKA-14831 for more detail.
1081-
*
1082-
* @return {@code true} to set state to {@link State#FATAL_ERROR} before throwing an exception,
1083-
* {@code false} to throw an exception without first changing the state
1084-
*/
1085-
protected boolean shouldSetToFatalErrorState() {
1086-
return Thread.currentThread() instanceof Sender.SenderThread;
1087-
}
1088-
10891089
private void ensureTransactional() {
10901090
if (!isTransactional())
10911091
throw new IllegalStateException("Transactional method invoked on a non-transactional producer.");

clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -4378,24 +4378,24 @@ private void runUntil(Supplier<Boolean> condition) {
43784378

43794379
private static class TestableTransactionManager extends TransactionManager {
43804380

4381-
private Optional<Boolean> shouldSetToFatalErrorStateOverride;
4381+
private Optional<Boolean> shouldPoisonStateOnInvalidTransitionOverride;
43824382

43834383
public TestableTransactionManager(LogContext logContext,
43844384
String transactionalId,
43854385
int transactionTimeoutMs,
43864386
long retryBackoffMs,
43874387
ApiVersions apiVersions) {
43884388
super(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs, apiVersions);
4389-
this.shouldSetToFatalErrorStateOverride = Optional.empty();
4389+
this.shouldPoisonStateOnInvalidTransitionOverride = Optional.empty();
43904390
}
43914391

43924392
private void setShouldSetToFatalErrorStateOverride(boolean override) {
4393-
shouldSetToFatalErrorStateOverride = Optional.of(override);
4393+
shouldPoisonStateOnInvalidTransitionOverride = Optional.of(override);
43944394
}
43954395

43964396
@Override
4397-
protected boolean shouldSetToFatalErrorState() {
4398-
return shouldSetToFatalErrorStateOverride.orElseGet(super::shouldSetToFatalErrorState);
4397+
protected boolean shouldPoisonStateOnInvalidTransition() {
4398+
return shouldPoisonStateOnInvalidTransitionOverride.orElseGet(super::shouldPoisonStateOnInvalidTransition);
43994399
}
44004400
}
44014401
}

0 commit comments

Comments
 (0)