Skip to content

Commit 6e4e0df

Browse files
authored
KAFKA-18891: Add KIP-877 support to RemoteLogMetadataManager and RemoteStorageManager (apache#19286)
1. Remove `RemoteLogManager#startup` and `RemoteLogManager#onEndpointCreated` 2. Move endpoint creation to `BrokerServer` 3. Move `RemoteLogMetadataManager#configure` and `RemoteLogStorageManager#configure` to RemoteLogManager constructor Reviewers: Mickael Maison <[email protected]>, Ken Huang <[email protected]>, Jhen-Yung Hsu <[email protected]>
1 parent 2cd733c commit 6e4e0df

File tree

10 files changed

+261
-178
lines changed

10 files changed

+261
-178
lines changed

build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -902,6 +902,7 @@ project(':server') {
902902
implementation project(':transaction-coordinator')
903903
implementation project(':raft')
904904
implementation project(':share-coordinator')
905+
implementation project(':storage:storage-api')
905906
implementation libs.jacksonDatabind
906907
implementation libs.metrics
907908
implementation libs.slf4jApi
@@ -913,6 +914,7 @@ project(':server') {
913914
testImplementation testLog4j2Libs
914915
testImplementation project(':test-common:test-common-internal-api')
915916
testImplementation project(':test-common:test-common-runtime')
917+
testImplementation project(':storage:storage-api').sourceSets.test.output
916918

917919
testRuntimeOnly runtimeTestLibs
918920
}

core/src/main/scala/kafka/server/BrokerServer.scala

+21-23
Original file line numberDiff line numberDiff line change
@@ -220,8 +220,6 @@ class BrokerServer(
220220
brokerTopicStats,
221221
logDirFailureChannel)
222222

223-
remoteLogManagerOpt = createRemoteLogManager()
224-
225223
lifecycleManager = new BrokerLifecycleManager(config,
226224
time,
227225
s"broker-${config.nodeId}-",
@@ -280,6 +278,8 @@ class BrokerServer(
280278
withWildcardHostnamesResolved().
281279
withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name)))
282280

281+
remoteLogManagerOpt = createRemoteLogManager(listenerInfo)
282+
283283
alterPartitionManager = AlterPartitionManager(
284284
config,
285285
scheduler = kafkaScheduler,
@@ -471,23 +471,6 @@ class BrokerServer(
471471
socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
472472
config.numIoThreads, "RequestHandlerAvgIdlePercent")
473473

474-
// Start RemoteLogManager before initializing broker metadata publishers.
475-
remoteLogManagerOpt.foreach { rlm =>
476-
val listenerName = config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName()
477-
if (listenerName != null) {
478-
val endpoint = listenerInfo.listeners().values().stream
479-
.filter(e =>
480-
e.listenerName().isPresent &&
481-
ListenerName.normalised(e.listenerName().get()).equals(ListenerName.normalised(listenerName))
482-
)
483-
.findFirst()
484-
.orElseThrow(() => new ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
485-
listenerName, "Should be set as a listener name within valid broker listener name list: " + listenerInfo.listeners().values()))
486-
rlm.onEndPointCreated(endpoint)
487-
}
488-
rlm.startup()
489-
}
490-
491474
metadataPublishers.add(new MetadataVersionConfigValidator(config, sharedServer.metadataPublishingFaultHandler))
492475
brokerMetadataPublisher = new BrokerMetadataPublisher(config,
493476
metadataCache,
@@ -712,16 +695,31 @@ class BrokerServer(
712695
}
713696
}
714697

715-
protected def createRemoteLogManager(): Option[RemoteLogManager] = {
716-
if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) {
717-
Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time,
698+
protected def createRemoteLogManager(listenerInfo: ListenerInfo): Option[RemoteLogManager] = {
699+
if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled) {
700+
val listenerName = config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName()
701+
val endpoint = if (listenerName != null) {
702+
Some(listenerInfo.listeners().values().stream
703+
.filter(e =>
704+
e.listenerName().isPresent &&
705+
ListenerName.normalised(e.listenerName().get()).equals(ListenerName.normalised(listenerName))
706+
)
707+
.findFirst()
708+
.orElseThrow(() => new ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
709+
listenerName, "Should be set as a listener name within valid broker listener name list: " + listenerInfo.listeners().values())))
710+
} else {
711+
None
712+
}
713+
714+
val rlm = new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time,
718715
(tp: TopicPartition) => logManager.getLog(tp).toJava,
719716
(tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => {
720717
logManager.getLog(tp).foreach { log =>
721718
log.updateLogStartOffsetFromRemoteTier(remoteLogStartOffset)
722719
}
723720
},
724-
brokerTopicStats, metrics))
721+
brokerTopicStats, metrics, endpoint.toJava)
722+
Some(rlm)
725723
} else {
726724
None
727725
}

core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala

+4-2
Original file line numberDiff line numberDiff line change
@@ -2236,7 +2236,8 @@ class UnifiedLogTest {
22362236
_ => Optional.empty[UnifiedLog](),
22372237
(_, _) => {},
22382238
brokerTopicStats,
2239-
new Metrics()))
2239+
new Metrics(),
2240+
Optional.empty))
22402241
remoteLogManager.setDelayedOperationPurgatory(purgatory)
22412242

22422243
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1,
@@ -2333,7 +2334,8 @@ class UnifiedLogTest {
23332334
_ => Optional.empty[UnifiedLog](),
23342335
(_, _) => {},
23352336
brokerTopicStats,
2336-
new Metrics()))
2337+
new Metrics(),
2338+
Optional.empty))
23372339
remoteLogManager.setDelayedOperationPurgatory(purgatory)
23382340

23392341
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1,

core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -3792,8 +3792,8 @@ class ReplicaManagerTest {
37923792
_ => Optional.of(mockLog),
37933793
(TopicPartition, Long) => {},
37943794
brokerTopicStats,
3795-
metrics)
3796-
remoteLogManager.startup()
3795+
metrics,
3796+
Optional.empty)
37973797
val spyRLM = spy(remoteLogManager)
37983798

37993799
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteLogManager = Some(spyRLM))
@@ -3903,8 +3903,8 @@ class ReplicaManagerTest {
39033903
_ => Optional.of(dummyLog),
39043904
(TopicPartition, Long) => {},
39053905
brokerTopicStats,
3906-
metrics)
3907-
remoteLogManager.startup()
3906+
metrics,
3907+
Optional.empty)
39083908
val spyRLM = spy(remoteLogManager)
39093909
val timer = new MockTimer(time)
39103910

server/src/test/java/org/apache/kafka/server/MonitorablePluginsIntegrationTest.java

+57-14
Original file line numberDiff line numberDiff line change
@@ -27,43 +27,41 @@
2727
import org.apache.kafka.common.test.api.ClusterTest;
2828
import org.apache.kafka.common.test.api.Type;
2929
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
30+
import org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager;
31+
import org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager;
3032

3133
import java.util.LinkedHashMap;
3234
import java.util.Map;
3335

3436
import static org.apache.kafka.server.config.ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG;
3537
import static org.apache.kafka.server.config.ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG;
38+
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP;
39+
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP;
40+
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP;
3641
import static org.junit.jupiter.api.Assertions.assertEquals;
3742

3843
public class MonitorablePluginsIntegrationTest {
39-
4044
private static int controllerId(Type type) {
4145
return type == Type.KRAFT ? 3000 : 0;
4246
}
4347

44-
private static Map<String, String> expectedTags(String config, String clazz) {
45-
return expectedTags(config, clazz, Map.of());
46-
}
47-
48-
private static Map<String, String> expectedTags(String config, String clazz, Map<String, String> extraTags) {
49-
Map<String, String> tags = new LinkedHashMap<>();
50-
tags.put("config", config);
51-
tags.put("class", clazz);
52-
tags.putAll(extraTags);
53-
return tags;
54-
}
55-
5648
@ClusterTest(
5749
types = {Type.KRAFT, Type.CO_KRAFT},
5850
serverProperties = {
5951
@ClusterConfigProperty(key = StandardAuthorizer.SUPER_USERS_CONFIG, value = "User:ANONYMOUS"),
6052
@ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = "org.apache.kafka.metadata.authorizer.StandardAuthorizer"),
61-
@ClusterConfigProperty(key = REPLICA_SELECTOR_CLASS_CONFIG, value = "org.apache.kafka.server.MonitorablePluginsIntegrationTest$MonitorableReplicaSelector")
53+
@ClusterConfigProperty(key = REPLICA_SELECTOR_CLASS_CONFIG, value = "org.apache.kafka.server.MonitorablePluginsIntegrationTest$MonitorableReplicaSelector"),
54+
@ClusterConfigProperty(key = REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, value = "true"),
55+
@ClusterConfigProperty(key = REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
56+
value = "org.apache.kafka.server.MonitorablePluginsIntegrationTest$MonitorableNoOpRemoteLogMetadataManager"),
57+
@ClusterConfigProperty(key = REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
58+
value = "org.apache.kafka.server.MonitorablePluginsIntegrationTest$MonitorableNoOpRemoteStorageManager")
6259
}
6360
)
6461
public void testMonitorableServerPlugins(ClusterInstance clusterInstance) {
6562
assertAuthorizerMetrics(clusterInstance);
6663
assertReplicaSelectorMetrics(clusterInstance);
64+
assertRemoteLogManagerMetrics(clusterInstance);
6765
}
6866

6967
private void assertAuthorizerMetrics(ClusterInstance clusterInstance) {
@@ -78,6 +76,17 @@ private void assertAuthorizerMetrics(ClusterInstance clusterInstance) {
7876
expectedTags(AUTHORIZER_CLASS_NAME_CONFIG, "StandardAuthorizer", Map.of("role", "controller")));
7977
}
8078

79+
private void assertRemoteLogManagerMetrics(ClusterInstance clusterInstance) {
80+
assertMetrics(
81+
clusterInstance.brokers().get(0).metrics(),
82+
MonitorableNoOpRemoteLogMetadataManager.METRICS_COUNT,
83+
expectedTags(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, MonitorableNoOpRemoteLogMetadataManager.class.getSimpleName()));
84+
assertMetrics(
85+
clusterInstance.brokers().get(0).metrics(),
86+
MonitorableNoOpRemoteStorageManager.METRICS_COUNT,
87+
expectedTags(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, MonitorableNoOpRemoteStorageManager.class.getSimpleName()));
88+
}
89+
8190
private void assertReplicaSelectorMetrics(ClusterInstance clusterInstance) {
8291
assertMetrics(
8392
clusterInstance.brokers().get(0).metrics(),
@@ -98,6 +107,17 @@ private void assertMetrics(Metrics metrics, int expected, Map<String, String> ex
98107
assertEquals(expected, found);
99108
}
100109

110+
public static class MonitorableNoOpRemoteLogMetadataManager extends NoOpRemoteLogMetadataManager implements Monitorable {
111+
112+
private static final int METRICS_COUNT = 1;
113+
114+
@Override
115+
public void withPluginMetrics(PluginMetrics metrics) {
116+
MetricName name = metrics.metricName("name", "description", Map.of());
117+
metrics.addMetric(name, (Measurable) (config, now) -> 123);
118+
}
119+
}
120+
101121
public static class MonitorableReplicaSelector extends RackAwareReplicaSelector implements Monitorable {
102122

103123
private static final int METRICS_COUNT = 1;
@@ -108,4 +128,27 @@ public void withPluginMetrics(PluginMetrics metrics) {
108128
metrics.addMetric(name, (Measurable) (config, now) -> 123);
109129
}
110130
}
131+
132+
public static class MonitorableNoOpRemoteStorageManager extends NoOpRemoteStorageManager implements Monitorable {
133+
134+
private static final int METRICS_COUNT = 1;
135+
136+
@Override
137+
public void withPluginMetrics(PluginMetrics metrics) {
138+
MetricName name = metrics.metricName("name", "description", Map.of());
139+
metrics.addMetric(name, (Measurable) (config, now) -> 123);
140+
}
141+
}
142+
143+
private static Map<String, String> expectedTags(String config, String clazz) {
144+
return expectedTags(config, clazz, Map.of());
145+
}
146+
147+
private static Map<String, String> expectedTags(String config, String clazz, Map<String, String> extraTags) {
148+
Map<String, String> tags = new LinkedHashMap<>();
149+
tags.put("config", config);
150+
tags.put("class", clazz);
151+
tags.putAll(extraTags);
152+
return tags;
153+
}
111154
}

storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java

+4
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@
4747
* "cluster.id", "broker.id" and all other properties prefixed with the config: "remote.log.metadata.manager.impl.prefix"
4848
* (default value is "rlmm.config.") are passed when {@link #configure(Map)} is invoked on this instance.
4949
* <p>
50+
*
51+
* Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the manager to register metrics.
52+
* The following tags are automatically added to all metrics registered: <code>config</code> set to
53+
* <code>remote.log.metadata.manager.class.name</code>, and <code>class</code> set to the RemoteLogMetadataManager class name.
5054
*/
5155
public interface RemoteLogMetadataManager extends Configurable, Closeable {
5256

storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java

+4
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@
3939
* <p>
4040
* All properties prefixed with the config: "remote.log.storage.manager.impl.prefix"
4141
* (default value is "rsm.config.") are passed when {@link #configure(Map)} is invoked on this instance.
42+
*
43+
* Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the manager to register metrics.
44+
* The following tags are automatically added to all metrics registered: <code>config</code> set to
45+
* <code>remote.log.storage.manager.class.name</code>, and <code>class</code> set to the RemoteStorageManager class name.
4246
*/
4347
public interface RemoteStorageManager extends Configurable, Closeable {
4448

0 commit comments

Comments
 (0)