Skip to content

Commit a842e0e

Browse files
authored
KAFKA-12866: Avoid root access to Zookeeper (apache#10795)
The broker shouldn't assume create access to the chroot. There are deployement scenarios where the chroot is already created is the only znode which the broker can access. Reviewers: Manikumar Reddy <[email protected]>, Ron Dagostino <[email protected]>
1 parent cfe642e commit a842e0e

File tree

2 files changed

+66
-1
lines changed

2 files changed

+66
-1
lines changed

core/src/main/scala/kafka/zk/KafkaZkClient.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -1954,7 +1954,9 @@ object KafkaZkClient {
19541954
connectionTimeoutMs, maxInFlightRequests, time, metricGroup, metricType, name, zkClientConfig)
19551955
try {
19561956
val chroot = connectString.substring(chrootIndex)
1957-
zkClientForChrootCreation.makeSurePersistentPathExists(chroot)
1957+
if (!zkClientForChrootCreation.pathExists(chroot)) {
1958+
zkClientForChrootCreation.makeSurePersistentPathExists(chroot)
1959+
}
19581960
} finally {
19591961
zkClientForChrootCreation.close()
19601962
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package kafka.zk
18+
19+
import java.security.MessageDigest
20+
import java.util.Base64
21+
22+
import org.apache.kafka.common.security.JaasUtils
23+
import org.apache.kafka.common.utils.Time
24+
import org.apache.zookeeper.ZooDefs
25+
import org.apache.zookeeper.data.{ACL, Id}
26+
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
27+
28+
import scala.collection.Seq
29+
import scala.jdk.CollectionConverters._
30+
31+
class ZkClientAclTest extends ZooKeeperTestHarness {
32+
33+
@BeforeEach
34+
override def setUp(): Unit = {
35+
super.setUp()
36+
}
37+
38+
@AfterEach
39+
override def tearDown(): Unit = {
40+
super.tearDown()
41+
}
42+
43+
@Test
44+
def testChrootExistsAndRootIsLocked(): Unit = {
45+
// chroot is accessible
46+
val chroot = "/chroot"
47+
zkClient.makeSurePersistentPathExists(chroot)
48+
zkClient.setAcl(chroot, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala)
49+
50+
// root is inaccessible
51+
val scheme = "digest"
52+
val id = "test"
53+
val pwd = "12345"
54+
val digest = Base64.getEncoder.encode(MessageDigest.getInstance("SHA1").digest(s"$id:$pwd".getBytes()))
55+
zkClient.currentZooKeeper.addAuthInfo(scheme, digest)
56+
zkClient.setAcl("/", Seq(new ACL(ZooDefs.Perms.ALL, new Id(scheme, s"$id:$digest"))))
57+
58+
// this client won't have access to the root, but the chroot already exists
59+
val chrootClient = KafkaZkClient(zkConnect + chroot, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
60+
zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, createChrootIfNecessary = true)
61+
chrootClient.close()
62+
}
63+
}

0 commit comments

Comments
 (0)