Skip to content

Commit e0fa982

Browse files
committed
history: create group listeners and update crd
1 parent f590251 commit e0fa982

File tree

7 files changed

+124
-50
lines changed

7 files changed

+124
-50
lines changed

deploy/helm/spark-k8s-operator/crds/crds.yaml

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1118,17 +1118,8 @@ spec:
11181118
description: A Spark cluster history server component. This resource is managed by the Stackable operator for Apache Spark. Find more information on how to use it in the [operator documentation](https://docs.stackable.tech/home/nightly/spark-k8s/usage-guide/history-server).
11191119
properties:
11201120
clusterConfig:
1121-
default:
1122-
listenerClass: cluster-internal
1121+
default: {}
11231122
description: Global Spark history server configuration that applies to all roles and role groups.
1124-
properties:
1125-
listenerClass:
1126-
default: cluster-internal
1127-
enum:
1128-
- cluster-internal
1129-
- external-unstable
1130-
- external-stable
1131-
type: string
11321123
type: object
11331124
image:
11341125
anyOf:
@@ -1385,6 +1376,9 @@ spec:
13851376
cleaner:
13861377
nullable: true
13871378
type: boolean
1379+
listenerClass:
1380+
nullable: true
1381+
type: string
13881382
logging:
13891383
default:
13901384
containers: {}
@@ -1631,6 +1625,9 @@ spec:
16311625
cleaner:
16321626
nullable: true
16331627
type: boolean
1628+
listenerClass:
1629+
nullable: true
1630+
type: string
16341631
logging:
16351632
default:
16361633
containers: {}

deploy/helm/spark-k8s-operator/templates/roles.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,3 +133,14 @@ rules:
133133
- bind
134134
resourceNames:
135135
- {{ include "operator.name" . }}-clusterrole
136+
- apiGroups:
137+
- listeners.stackable.tech
138+
resources:
139+
- listeners
140+
verbs:
141+
- get
142+
- list
143+
- watch
144+
- patch
145+
- create
146+
- delete

docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@ kind: SparkHistoryServer
1313
metadata:
1414
name: spark-history
1515
spec:
16-
clusterConfig:
17-
listenerClass: external-unstable # <1>
16+
nodes:
17+
config:
18+
listenerClass: external-unstable # <1>
1819
----
1920
<1> Specify one of `external-stable`, `external-unstable`, `cluster-internal` (the default setting is `cluster-internal`).
2021

rust/operator-binary/src/crd/constants.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ pub const SPARK_ENV_SH_FILE_NAME: &str = "spark-env.sh";
8888
pub const SPARK_CLUSTER_ROLE: &str = "spark-k8s-clusterrole";
8989
pub const SPARK_UID: i64 = 1000;
9090
pub const METRICS_PORT: u16 = 18081;
91+
pub const HISTORY_UI_PORT: u16 = 18080;
9192

9293
pub const LISTENER_VOLUME_NAME: &str = "listener";
9394
pub const LISTENER_VOLUME_DIR: &str = "/stackable/listener";

rust/operator-binary/src/crd/history.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,7 @@ use stackable_operator::{
3232
use strum::{Display, EnumIter};
3333

3434
use crate::{
35-
crd::{
36-
affinity::history_affinity, constants::*, listener::SupportedListenerClasses,
37-
logdir::ResolvedLogDir,
38-
},
35+
crd::{affinity::history_affinity, constants::*, logdir::ResolvedLogDir},
3936
history::config::jvm::construct_history_jvm_args,
4037
};
4138

@@ -106,13 +103,17 @@ pub mod versioned {
106103

107104
#[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)]
108105
#[serde(rename_all = "camelCase")]
109-
pub struct SparkHistoryServerClusterConfig {
110-
#[serde(default)]
111-
pub listener_class: SupportedListenerClasses,
112-
}
106+
pub struct SparkHistoryServerClusterConfig {}
113107
}
114108

115109
impl v1alpha1::SparkHistoryServer {
110+
/// The name of the group-listener provided for a specific role-group.
111+
/// History servers will use this group listener so that only one load balancer
112+
/// is needed (per role group).
113+
pub fn group_listener_name(&self, rolegroup: &RoleGroupRef<Self>) -> String {
114+
format!("{}-group", rolegroup.object_name())
115+
}
116+
116117
/// Returns a reference to the role. Raises an error if the role is not defined.
117118
pub fn role(&self) -> &Role<HistoryConfigFragment, GenericRoleConfig, JavaCommonConfig> {
118119
&self.spec.nodes
@@ -343,6 +344,9 @@ pub struct HistoryConfig {
343344
/// This can be shortened by the `maxCertificateLifetime` setting on the SecretClass issuing the TLS certificate.
344345
#[fragment_attrs(serde(default))]
345346
pub requested_secret_lifetime: Option<Duration>,
347+
348+
#[serde(default)]
349+
pub listener_class: String,
346350
}
347351

348352
impl HistoryConfig {
@@ -366,6 +370,7 @@ impl HistoryConfig {
366370
logging: product_logging::spec::default_logging(),
367371
affinity: history_affinity(cluster_name),
368372
requested_secret_lifetime: Some(Self::DEFAULT_HISTORY_SECRET_LIFETIME),
373+
listener_class: Some(default_listener_class()),
369374
}
370375
}
371376
}
@@ -402,6 +407,10 @@ impl Configuration for HistoryConfigFragment {
402407
}
403408
}
404409

410+
fn default_listener_class() -> String {
411+
"cluster-internal".to_string()
412+
}
413+
405414
#[cfg(test)]
406415
mod test {
407416
use indoc::indoc;

rust/operator-binary/src/history/history_controller.rs

Lines changed: 84 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ use stackable_operator::{
2121
},
2222
},
2323
cluster_resources::{ClusterResourceApplyStrategy, ClusterResources},
24-
commons::product_image_selection::ResolvedProductImage,
24+
commons::{
25+
listener::{Listener, ListenerPort, ListenerSpec},
26+
product_image_selection::ResolvedProductImage,
27+
},
2528
k8s_openapi::{
2629
DeepMerge,
2730
api::{
@@ -56,7 +59,7 @@ use crate::{
5659
Ctx,
5760
crd::{
5861
constants::{
59-
ACCESS_KEY_ID, APP_NAME, HISTORY_CONTROLLER_NAME, HISTORY_ROLE_NAME,
62+
ACCESS_KEY_ID, APP_NAME, HISTORY_CONTROLLER_NAME, HISTORY_ROLE_NAME, HISTORY_UI_PORT,
6063
JVM_SECURITY_PROPERTIES_FILE, LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME,
6164
MAX_SPARK_LOG_FILES_SIZE, METRICS_PORT, OPERATOR_NAME, SECRET_ACCESS_KEY,
6265
SPARK_CLUSTER_ROLE, SPARK_DEFAULTS_FILE_NAME, SPARK_ENV_SH_FILE_NAME,
@@ -76,6 +79,11 @@ use crate::{
7679
#[strum_discriminants(derive(IntoStaticStr))]
7780
#[allow(clippy::enum_variant_names)]
7881
pub enum Error {
82+
#[snafu(display("failed to build object meta data"))]
83+
ObjectMeta {
84+
source: stackable_operator::builder::meta::Error,
85+
},
86+
7987
#[snafu(display("failed to build listener volume"))]
8088
BuildListenerVolume {
8189
source: ListenerOperatorVolumeSourceBuilderError,
@@ -216,9 +224,12 @@ pub enum Error {
216224

217225
#[snafu(display("failed to merge environment config and/or overrides"))]
218226
MergeEnv { source: crate::crd::history::Error },
219-
}
220227

221-
type Result<T, E = Error> = std::result::Result<T, E>;
228+
#[snafu(display("failed to apply group listener"))]
229+
ApplyGroupListener {
230+
source: stackable_operator::cluster_resources::Error,
231+
},
232+
}
222233

223234
impl ReconcilerError for Error {
224235
fn category(&self) -> &'static str {
@@ -229,7 +240,7 @@ impl ReconcilerError for Error {
229240
pub async fn reconcile(
230241
shs: Arc<DeserializeGuard<v1alpha1::SparkHistoryServer>>,
231242
ctx: Arc<Ctx>,
232-
) -> Result<Action> {
243+
) -> Result<Action, Error> {
233244
tracing::info!("Starting reconcile history server");
234245

235246
let shs = shs
@@ -322,6 +333,17 @@ pub async fn reconcile(
322333
.add(client, sts)
323334
.await
324335
.context(ApplyDeploymentSnafu)?;
336+
337+
let rg_group_listener = build_group_listener(
338+
shs,
339+
&resolved_product_image,
340+
&rgr,
341+
merged_config.listener_class.to_string(),
342+
)?;
343+
cluster_resources
344+
.add(client, rg_group_listener)
345+
.await
346+
.context(ApplyGroupListenerSnafu)?;
325347
}
326348

327349
let role_config = &shs.spec.nodes.role_config;
@@ -343,6 +365,50 @@ pub async fn reconcile(
343365
Ok(Action::await_change())
344366
}
345367

368+
#[allow(clippy::result_large_err)]
369+
fn build_group_listener(
370+
shs: &v1alpha1::SparkHistoryServer,
371+
resolved_product_image: &ResolvedProductImage,
372+
rolegroup: &RoleGroupRef<v1alpha1::SparkHistoryServer>,
373+
listener_class: String,
374+
) -> Result<Listener, Error> {
375+
Ok(Listener {
376+
metadata: ObjectMetaBuilder::new()
377+
.name_and_namespace(shs)
378+
.name(shs.group_listener_name(rolegroup))
379+
.ownerreference_from_resource(shs, None, Some(true))
380+
.context(ObjectMissingMetadataForOwnerRefSnafu)?
381+
.with_recommended_labels(labels(
382+
shs,
383+
&resolved_product_image.app_version_label,
384+
&rolegroup.role_group,
385+
))
386+
.context(ObjectMetaSnafu)?
387+
.build(),
388+
spec: ListenerSpec {
389+
class_name: Some(listener_class),
390+
ports: Some(listener_ports()),
391+
..ListenerSpec::default()
392+
},
393+
status: None,
394+
})
395+
}
396+
397+
fn listener_ports() -> Vec<ListenerPort> {
398+
vec![
399+
ListenerPort {
400+
name: "metrics".to_string(),
401+
port: METRICS_PORT.into(),
402+
protocol: Some("TCP".to_string()),
403+
},
404+
ListenerPort {
405+
name: "http".to_string(),
406+
port: HISTORY_UI_PORT.into(),
407+
protocol: Some("TCP".to_string()),
408+
},
409+
]
410+
}
411+
346412
pub fn error_policy(
347413
_obj: Arc<DeserializeGuard<v1alpha1::SparkHistoryServer>>,
348414
error: &Error,
@@ -528,7 +594,7 @@ fn build_stateful_set(
528594
"-c".to_string(),
529595
])
530596
.args(command_args(log_dir))
531-
.add_container_port("http", 18080)
597+
.add_container_port("http", HISTORY_UI_PORT.into())
532598
.add_container_port("metrics", METRICS_PORT.into())
533599
.add_env_vars(merged_env)
534600
.add_volume_mounts(log_dir.volume_mounts())
@@ -544,28 +610,19 @@ fn build_stateful_set(
544610
.build();
545611

546612
// Add listener volume
547-
let listener_class = &shs.spec.cluster_config.listener_class;
548-
let pvcs = if listener_class.discoverable() {
549-
// externally reachable listener endpoints will use persistent volumes
550-
// so that load balancers can hard-code the target addresses
551-
let pvc = ListenerOperatorVolumeSourceBuilder::new(
552-
&ListenerReference::ListenerClass(listener_class.to_string()),
613+
// Listener endpoints for the Webserver role will use persistent volumes
614+
// so that load balancers can hard-code the target addresses. This will
615+
// be the case even when no class is set (and the value defaults to
616+
// cluster-internal) as the address should still be consistent.
617+
let pvcs = Some(vec![
618+
ListenerOperatorVolumeSourceBuilder::new(
619+
&ListenerReference::ListenerClass(merged_config.listener_class.to_string()),
553620
&recommended_labels,
554621
)
555622
.context(BuildListenerVolumeSnafu)?
556623
.build_pvc(LISTENER_VOLUME_NAME.to_string())
557-
.context(BuildListenerVolumeSnafu)?;
558-
Some(vec![pvc])
559-
} else {
560-
// non-reachable endpoints use ephemeral volumes
561-
pb.add_listener_volume_by_listener_class(
562-
LISTENER_VOLUME_NAME,
563-
&listener_class.to_string(),
564-
&recommended_labels,
565-
)
566-
.context(AddVolumeSnafu)?;
567-
None
568-
};
624+
.context(BuildListenerVolumeSnafu)?,
625+
]);
569626

570627
pb.add_container(container);
571628

@@ -642,7 +699,7 @@ fn build_stateful_set(
642699
fn build_history_role_serviceaccount(
643700
shs: &v1alpha1::SparkHistoryServer,
644701
app_version_label: &str,
645-
) -> Result<(ServiceAccount, RoleBinding)> {
702+
) -> Result<(ServiceAccount, RoleBinding), Error> {
646703
let sa = ServiceAccount {
647704
metadata: ObjectMetaBuilder::new()
648705
.name_and_namespace(shs)
@@ -784,11 +841,11 @@ fn build_rolegroup_service(
784841
shs: &v1alpha1::SparkHistoryServer,
785842
app_version_label: &str,
786843
group: &RoleGroupRef<v1alpha1::SparkHistoryServer>,
787-
) -> Result<Service> {
844+
) -> Result<Service, Error> {
788845
let ports = Some(vec![
789846
ServicePort {
790847
name: Some(String::from("http")),
791-
port: 18080,
848+
port: HISTORY_UI_PORT.into(),
792849
..ServicePort::default()
793850
},
794851
ServicePort {

tests/templates/kuttl/spark-history-server/06-deploy-history-server.yaml.j2

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,6 @@ spec:
3636
vectorAggregatorConfigMapName: vector-aggregator-discovery
3737
{% endif %}
3838

39-
clusterConfig:
40-
listenerClass: external-unstable
41-
4239
logFileDirectory:
4340
s3:
4441
prefix: eventlogs/
@@ -48,6 +45,7 @@ spec:
4845
#sparkConf:
4946
nodes:
5047
config:
48+
listenerClass: external-unstable
5149
logging:
5250
enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }}
5351
containers:

0 commit comments

Comments
 (0)