Skip to content

Commit c8a135f

Browse files
committed
Merge branch 'trunk' into KAFKA-15767-remove-thread-local-from-transaction-manager
2 parents 05c2e59 + 6e4e0df commit c8a135f

File tree

360 files changed

+12648
-8650
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

360 files changed

+12648
-8650
lines changed

.asf.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,16 @@ notifications:
2929
# Read more here: https://github.com/apache/infrastructure-asfyaml
3030
github:
3131
collaborators:
32-
- apoorvmittal10
3332
- brandboat
34-
- frankvicky
3533
- FrankYang0529
3634
- gongxuanzhang
3735
- m1a2st
3836
- mingyen066
37+
- ShivsundarR
3938
- smjn
4039
- TaiJuWu
4140
- xijiu
41+
- Yunyung
4242
enabled_merge_buttons:
4343
squash: true
4444
squash_commit_message: PR_TITLE_AND_DESC

README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ fail due to code changes. You can just run:
9999

100100
./gradlew processMessages processTestMessages
101101

102+
See [Apache Kafka Message Definitions](clients/src/main/resources/common/message/README.md) for details on Apache Kafka message protocol.
103+
102104
### Running a Kafka broker
103105

104106
Using compiled files:
@@ -111,6 +113,8 @@ Using docker image:
111113

112114
docker run -p 9092:9092 apache/kafka:latest
113115

116+
See [docker/README.md](docker/README.md) for detailed information.
117+
114118
### Cleaning the build ###
115119
./gradlew clean
116120

@@ -263,10 +267,20 @@ default. See https://www.lightbend.com/blog/scala-inliner-optimizer for more det
263267

264268
See [tests/README.md](tests/README.md).
265269

270+
### Using Trogdor for testing ###
271+
272+
We use Trogdor as a test framework for Apache Kafka. You can use it to run benchmarks and other workloads.
273+
274+
See [trogdor/README.md](trogdor/README.md).
275+
266276
### Running in Vagrant ###
267277

268278
See [vagrant/README.md](vagrant/README.md).
269279

280+
### Kafka client examples ###
281+
282+
See [examples/README.md](examples/README.md).
283+
270284
### Contribution ###
271285

272286
Apache Kafka is interested in building the community; we would welcome any thoughts or [patches](https://issues.apache.org/jira/browse/KAFKA). You can reach us [on the Apache mailing lists](http://kafka.apache.org/contact.html).

bin/kafka-run-class.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ if [ -z "$KAFKA_LOG4J_OPTS" ]; then
225225
(( WINDOWS_OS_FORMAT )) && LOG4J_DIR=$(cygpath --path --mixed "${LOG4J_DIR}")
226226
KAFKA_LOG4J_OPTS="-Dlog4j2.configurationFile=${LOG4J_DIR}"
227227
else
228-
if echo "$KAFKA_LOG4J_OPTS" | grep -E "log4j\.[^[:space:]]+(\.properties|\.xml)$"; then
228+
if echo "$KAFKA_LOG4J_OPTS" | grep -E "log4j\.[^[:space:]]+(\.properties|\.xml)$" >/dev/null; then
229229
# Enable Log4j 1.x configuration compatibility mode for Log4j 2
230230
export LOG4J_COMPATIBILITY=true
231231
echo DEPRECATED: A Log4j 1.x configuration file has been detected, which is no longer recommended. >&2

build.gradle

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -902,6 +902,7 @@ project(':server') {
902902
implementation project(':transaction-coordinator')
903903
implementation project(':raft')
904904
implementation project(':share-coordinator')
905+
implementation project(':storage:storage-api')
905906
implementation libs.jacksonDatabind
906907
implementation libs.metrics
907908
implementation libs.slf4jApi
@@ -913,6 +914,7 @@ project(':server') {
913914
testImplementation testLog4j2Libs
914915
testImplementation project(':test-common:test-common-internal-api')
915916
testImplementation project(':test-common:test-common-runtime')
917+
testImplementation project(':storage:storage-api').sourceSets.test.output
916918

917919
testRuntimeOnly runtimeTestLibs
918920
}
@@ -2242,6 +2244,7 @@ project(':storage') {
22422244
testImplementation project(':clients').sourceSets.test.output
22432245
testImplementation project(':core')
22442246
testImplementation project(':core').sourceSets.test.output
2247+
testImplementation project(':storage:storage-api').sourceSets.test.output
22452248
testImplementation project(':test-common:test-common-internal-api')
22462249
testImplementation project(':test-common:test-common-runtime')
22472250
testImplementation project(':test-common:test-common-util')
@@ -2415,6 +2418,7 @@ project(':tools') {
24152418
implementation project(':group-coordinator')
24162419
implementation project(':coordinator-common')
24172420
implementation project(':share-coordinator')
2421+
implementation project(':raft')
24182422
implementation libs.argparse4j
24192423
implementation libs.jacksonDatabind
24202424
implementation libs.jacksonDataformatCsv
@@ -3737,7 +3741,7 @@ project(':connect:mirror') {
37373741
testImplementation project(':core')
37383742
testImplementation project(':test-common:test-common-runtime')
37393743
testImplementation project(':server')
3740-
testImplementation project(':server-common').sourceSets.test.output
3744+
testImplementation project(':server-common')
37413745

37423746

37433747
testRuntimeOnly project(':connect:runtime')

checkstyle/import-control-clients-integration-tests.xml

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,17 @@
1919
-->
2020

2121
<import-control pkg="org.apache.kafka">
22-
<allow pkg="java" />
23-
<allow pkg="org.junit" />
22+
<allow pkg="java"/>
23+
<allow pkg="org.junit"/>
24+
<allow pkg="scala" />
2425

25-
<!-- These are tests, allow whatever -->
26-
<allow pkg="org.apache.kafka"/>
27-
<allow pkg="org.junit" />
28-
<allow pkg="kafka"/>
26+
<!-- These are tests, allow whatever -->
27+
<allow pkg="org.apache.kafka"/>
28+
<allow pkg="org.junit"/>
29+
<allow pkg="kafka"/>
30+
31+
<subpackage name="clients.producer">
32+
<allow pkg="org.opentest4j"/>
33+
</subpackage>
2934

3035
</import-control>

checkstyle/import-control-core.xml

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -66,20 +66,6 @@
6666
<allow class="com.fasterxml.jackson.annotation.JsonIgnoreProperties" />
6767
</subpackage>
6868

69-
<subpackage name="log.remote">
70-
<allow pkg="org.apache.kafka.server.common" />
71-
<allow pkg="org.apache.kafka.server.log.remote" />
72-
<allow pkg="org.apache.kafka.server.log.remote.quota" />
73-
<allow pkg="org.apache.kafka.server.metrics" />
74-
<allow pkg="org.apache.kafka.storage.internals" />
75-
<allow pkg="org.apache.kafka.storage.log.metrics" />
76-
<allow pkg="kafka.log" />
77-
<allow pkg="kafka.cluster" />
78-
<allow pkg="kafka.server" />
79-
<allow pkg="org.mockito" />
80-
<allow pkg="org.apache.kafka.test" />
81-
</subpackage>
82-
8369
<subpackage name="server">
8470
<allow pkg="kafka" />
8571
<allow pkg="org.apache.kafka" />

checkstyle/import-control-metadata.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,9 @@
176176
<allow pkg="org.apache.kafka.controller" />
177177
<allow pkg="org.apache.kafka.metadata" />
178178
<allow pkg="org.apache.kafka.common.internals" />
179+
<allow pkg="org.apache.kafka.common.metrics" />
180+
<allow pkg="org.apache.kafka.common.metrics.internals" />
181+
<allow pkg="org.apache.kafka.common.metrics.stats" />
179182
</subpackage>
180183
<subpackage name="bootstrap">
181184
<allow pkg="org.apache.kafka.snapshot" />

checkstyle/import-control-server-common.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@
130130
</subpackage>
131131
<subpackage name="config">
132132
<allow pkg="org.apache.kafka.server"/>
133+
<allow pkg="org.apache.kafka.clients"/>
133134
</subpackage>
134135
</subpackage>
135136

checkstyle/import-control-server.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,10 @@
8282
<allow pkg="org.apache.kafka.raft" />
8383

8484
<subpackage name="server">
85+
<allow pkg="javax.crypto" />
8586
<allow pkg="org.apache.kafka.server" />
8687
<allow pkg="org.apache.kafka.image" />
88+
<allow pkg="org.apache.kafka.network.metrics" />
8789
<allow pkg="org.apache.kafka.storage.internals.log" />
8890
<allow pkg="org.apache.kafka.storage.internals.checkpoint" />
8991
<subpackage name="metrics">
@@ -103,6 +105,7 @@
103105
<subpackage name="security">
104106
<allow pkg="org.apache.kafka.common.resource" />
105107
<allow pkg="org.apache.kafka.network" />
108+
<allow pkg="org.apache.kafka.server" />
106109
<allow pkg="org.apache.kafka.server.authorizer" />
107110
</subpackage>
108111

checkstyle/import-control-storage.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
<allow pkg="org.junit" />
3030
<allow pkg="org.hamcrest" />
3131
<allow pkg="org.mockito" />
32+
<allow pkg="org.opentest4j" />
3233
<allow pkg="java.security" />
3334
<allow pkg="javax.net.ssl" />
3435
<allow pkg="javax.security" />
@@ -74,8 +75,12 @@
7475
</subpackage>
7576
<subpackage name="storage">
7677
<allow pkg="com.yammer.metrics.core" />
77-
<allow pkg="org.apache.kafka.server.metrics" />
7878
<allow pkg="org.apache.kafka.common.test" />
79+
<allow pkg="org.apache.kafka.server.metrics" />
80+
<allow pkg="org.apache.kafka.server.purgatory" />
81+
<allow pkg="org.apache.kafka.server.quota" />
82+
<allow pkg="org.apache.kafka.server.storage.log" />
83+
<allow pkg="org.apache.kafka.server.util" />
7984
</subpackage>
8085
</subpackage>
8186
</subpackage>
@@ -88,6 +93,7 @@
8893
<allow pkg="com.fasterxml.jackson" />
8994
<allow pkg="com.yammer.metrics.core" />
9095
<allow pkg="org.apache.kafka.common" />
96+
<allow pkg="org.apache.kafka.config" />
9197
<allow pkg="org.apache.kafka.server"/>
9298
<allow pkg="org.apache.kafka.storage.internals"/>
9399
<allow pkg="org.apache.kafka.storage.log.metrics"/>

checkstyle/import-control.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@
295295
<allow pkg="org.apache.kafka.metadata" />
296296
<allow pkg="org.apache.kafka.metadata.properties" />
297297
<allow pkg="org.apache.kafka.network" />
298+
<allow pkg="org.apache.kafka.raft" />
298299
<allow pkg="org.apache.kafka.server.util" />
299300
<allow pkg="kafka.admin" />
300301
<allow pkg="kafka.server" />

checkstyle/suppressions.xml

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,11 @@
3737

3838
<!-- core -->
3939
<suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder|SharePartition|SharePartitionManager).java"/>
40-
<suppress checks="NPathComplexity|ClassFanOutComplexity|ClassDataAbstractionCoupling|JavaNCSS" files="(RemoteLogManager|RemoteLogManagerTest).java"/>
41-
<suppress checks="MethodLength" files="RemoteLogManager.java"/>
42-
<suppress checks="MethodLength" files="RemoteLogManagerConfig.java"/>
43-
<suppress checks="ClassFanOutComplexity" files="RemoteLogManagerTest.java"/>
4440
<suppress checks="MethodLength"
4541
files="(KafkaClusterTestKit).java"/>
4642
<suppress checks="NPathComplexity" files="TestKitNodes.java"/>
4743
<suppress checks="JavaNCSS"
48-
files="(RemoteLogManagerTest|SharePartitionManagerTest|SharePartitionTest).java"/>
44+
files="(SharePartitionManagerTest|SharePartitionTest).java"/>
4945
<suppress checks="ClassDataAbstractionCoupling|ClassFanOutComplexity" files="SharePartitionManagerTest"/>
5046
<suppress checks="CyclomaticComplexity" files="SharePartition.java"/>
5147

@@ -364,7 +360,9 @@
364360
<suppress checks="ParameterNumber"
365361
files="(LogAppendInfo|LogLoader|RemoteLogManagerConfig|UnifiedLog).java"/>
366362
<suppress checks="(ClassDataAbstractionCoupling|ClassFanOutComplexity)"
367-
files="(UnifiedLog).java"/>
363+
files="(UnifiedLog|RemoteLogManager|RemoteLogManagerTest).java"/>
364+
<suppress checks="MethodLength" files="(RemoteLogManager|RemoteLogManagerConfig).java"/>
365+
<suppress checks="JavaNCSS" files="RemoteLogManagerTest.java"/>
368366

369367
<!-- benchmarks -->
370368
<suppress checks="(ClassDataAbstractionCoupling|ClassFanOutComplexity)"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients;
18+
19+
import org.apache.kafka.clients.consumer.Consumer;
20+
import org.apache.kafka.clients.consumer.ConsumerConfig;
21+
import org.apache.kafka.clients.consumer.ConsumerRecord;
22+
import org.apache.kafka.clients.consumer.ConsumerRecords;
23+
import org.apache.kafka.clients.consumer.GroupProtocol;
24+
import org.apache.kafka.clients.producer.Producer;
25+
import org.apache.kafka.clients.producer.ProducerConfig;
26+
import org.apache.kafka.clients.producer.ProducerRecord;
27+
import org.apache.kafka.common.header.Header;
28+
import org.apache.kafka.common.header.internals.RecordHeader;
29+
import org.apache.kafka.common.test.ClusterInstance;
30+
import org.apache.kafka.common.test.TestUtils;
31+
import org.apache.kafka.common.test.api.ClusterConfigProperty;
32+
import org.apache.kafka.common.test.api.ClusterTest;
33+
import org.apache.kafka.common.test.api.ClusterTestDefaults;
34+
import org.apache.kafka.common.test.api.Type;
35+
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
36+
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
37+
import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig;
38+
import org.apache.kafka.server.config.ReplicationConfigs;
39+
import org.apache.kafka.server.config.ServerConfigs;
40+
import org.apache.kafka.server.config.ServerLogConfigs;
41+
42+
import java.time.Duration;
43+
import java.util.ArrayList;
44+
import java.util.Collections;
45+
import java.util.Iterator;
46+
import java.util.List;
47+
import java.util.Map;
48+
49+
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
50+
import static org.junit.jupiter.api.Assertions.assertEquals;
51+
import static org.junit.jupiter.api.Assertions.assertTrue;
52+
53+
@ClusterTestDefaults(
54+
types = {Type.CO_KRAFT},
55+
serverProperties = {
56+
@ClusterConfigProperty(key = ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"),
57+
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
58+
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
59+
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"),
60+
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
61+
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "1"),
62+
@ClusterConfigProperty(key = ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, value = "true"),
63+
@ClusterConfigProperty(key = "log.unclean.leader.election.enable", value = "false"),
64+
@ClusterConfigProperty(key = ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG, value = "false"),
65+
@ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "0"),
66+
@ClusterConfigProperty(key = TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, value = "200")
67+
}
68+
)
69+
public class TransactionsWithMaxInFlightOneTest {
70+
private static final String TOPIC1 = "topic1";
71+
private static final String TOPIC2 = "topic2";
72+
private static final String HEADER_KEY = "transactionStatus";
73+
private static final byte[] ABORTED_VALUE = "aborted".getBytes();
74+
private static final byte[] COMMITTED_VALUE = "committed".getBytes();
75+
76+
@ClusterTest
77+
public void testTransactionalProducerSingleBrokerMaxInFlightOne(ClusterInstance clusterInstance) throws InterruptedException {
78+
// We want to test with one broker to verify multiple requests queued on a connection
79+
assertEquals(1, clusterInstance.brokers().size());
80+
81+
clusterInstance.createTopic(TOPIC1, 4, (short) 1);
82+
clusterInstance.createTopic(TOPIC2, 4, (short) 1);
83+
84+
try (Producer<byte[], byte[]> producer = clusterInstance.producer(Map.of(
85+
ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional-producer",
86+
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1
87+
))
88+
) {
89+
producer.initTransactions();
90+
91+
producer.beginTransaction();
92+
producer.send(new ProducerRecord<>(TOPIC2, null, "2".getBytes(), "2".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, ABORTED_VALUE))));
93+
producer.send(new ProducerRecord<>(TOPIC1, null, "4".getBytes(), "4".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, ABORTED_VALUE))));
94+
producer.flush();
95+
producer.abortTransaction();
96+
97+
producer.beginTransaction();
98+
producer.send(new ProducerRecord<>(TOPIC1, null, "1".getBytes(), "1".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, COMMITTED_VALUE))));
99+
producer.send(new ProducerRecord<>(TOPIC2, null, "3".getBytes(), "3".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, COMMITTED_VALUE))));
100+
producer.commitTransaction();
101+
102+
for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
103+
ArrayList<ConsumerRecord<byte[], byte[]>> consumerRecords = new ArrayList<>();
104+
try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(
105+
ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name(),
106+
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
107+
ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"
108+
)
109+
)) {
110+
consumer.subscribe(List.of(TOPIC1, TOPIC2));
111+
TestUtils.waitForCondition(() -> {
112+
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
113+
records.forEach(consumerRecords::add);
114+
return consumerRecords.size() == 2;
115+
}, 15_000, () -> "Consumer with protocol " + groupProtocol.name + " should consume 2 records, but get " + consumerRecords.size());
116+
}
117+
consumerRecords.forEach(record -> {
118+
Iterator<Header> headers = record.headers().headers(HEADER_KEY).iterator();
119+
assertTrue(headers.hasNext());
120+
Header header = headers.next();
121+
assertArrayEquals(COMMITTED_VALUE, header.value(), "Record does not have the expected header value");
122+
});
123+
}
124+
}
125+
}
126+
}

0 commit comments

Comments
 (0)