From 29d2cd38cfa9964426e326ca063b64763a0edd43 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Wed, 23 Apr 2025 12:01:02 +0200 Subject: [PATCH 01/19] add listener.rs and replace usages of CurrentlySupportedListenerClasses --- deploy/helm/spark-k8s-operator/crds/crds.yaml | 25 +--- .../operator-binary/src/connect/controller.rs | 16 +- rust/operator-binary/src/connect/crd.rs | 39 +---- rust/operator-binary/src/connect/server.rs | 39 ++--- rust/operator-binary/src/crd/history.rs | 42 +----- rust/operator-binary/src/crd/listener.rs | 25 ++++ rust/operator-binary/src/crd/mod.rs | 1 + .../src/history/history_controller.rs | 141 +++++++----------- 8 files changed, 107 insertions(+), 221 deletions(-) create mode 100644 rust/operator-binary/src/crd/listener.rs diff --git a/deploy/helm/spark-k8s-operator/crds/crds.yaml b/deploy/helm/spark-k8s-operator/crds/crds.yaml index ab67da5b..2cc0d369 100644 --- a/deploy/helm/spark-k8s-operator/crds/crds.yaml +++ b/deploy/helm/spark-k8s-operator/crds/crds.yaml @@ -1124,16 +1124,6 @@ spec: properties: listenerClass: default: cluster-internal - description: |- - This field controls which type of Service the Operator creates for this HistoryServer: - - * cluster-internal: Use a ClusterIP service - - * external-unstable: Use a NodePort service - - * external-stable: Use a LoadBalancer service - - This is a temporary solution with the goal to keep yaml manifests forward compatible. In the future, this setting will control which ListenerClass will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change. enum: - cluster-internal - external-unstable @@ -1880,21 +1870,12 @@ spec: type: array clusterConfig: default: - listenerClass: external-unstable + listenerClass: cluster-internal description: Global Spark Connect server configuration that applies to all roles. properties: listenerClass: - default: external-unstable - description: |- - This field controls which type of Service the Operator creates for this ConnectServer: - - * cluster-internal: Use a ClusterIP service - - * external-unstable: Use a NodePort service - - * external-stable: Use a LoadBalancer service - - This is a temporary solution with the goal to keep yaml manifests forward compatible. In the future, this setting will control which ListenerClass will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change. + default: cluster-internal + description: This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) is used to expose the Spark services. enum: - cluster-internal - external-unstable diff --git a/rust/operator-binary/src/connect/controller.rs b/rust/operator-binary/src/connect/controller.rs index e1031a17..d1d49959 100644 --- a/rust/operator-binary/src/connect/controller.rs +++ b/rust/operator-binary/src/connect/controller.rs @@ -192,21 +192,9 @@ pub async fn reconcile( .await .context(ApplyRoleBindingSnafu)?; - // Expose connect server to the outside world - let service = server::build_service(scs, &resolved_product_image.app_version_label, None) - .context(BuildServiceSnafu)?; - cluster_resources - .add(client, service.clone()) - .await - .context(ApplyServiceSnafu)?; - // Headless service used by executors connect back to the driver - let service = server::build_service( - scs, - &resolved_product_image.app_version_label, - Some("None".to_string()), - ) - .context(BuildServiceSnafu)?; + let service = server::build_internal_service(scs, &resolved_product_image.app_version_label) + .context(BuildServiceSnafu)?; cluster_resources .add(client, service.clone()) diff --git a/rust/operator-binary/src/connect/crd.rs b/rust/operator-binary/src/connect/crd.rs index a491e952..127bdb40 100644 --- a/rust/operator-binary/src/connect/crd.rs +++ b/rust/operator-binary/src/connect/crd.rs @@ -33,7 +33,7 @@ use stackable_operator::{ use strum::{Display, EnumIter}; use super::common::SparkConnectRole; -use crate::crd::constants::APP_NAME; +use crate::crd::{constants::APP_NAME, listener::SupportedListenerClasses}; pub const CONNECT_CONTROLLER_NAME: &str = "connect"; pub const CONNECT_FULL_CONTROLLER_NAME: &str = concatcp!( @@ -107,19 +107,9 @@ pub mod versioned { #[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] pub struct SparkConnectServerClusterConfig { - /// This field controls which type of Service the Operator creates for this ConnectServer: - /// - /// * cluster-internal: Use a ClusterIP service - /// - /// * external-unstable: Use a NodePort service - /// - /// * external-stable: Use a LoadBalancer service - /// - /// This is a temporary solution with the goal to keep yaml manifests forward compatible. - /// In the future, this setting will control which ListenerClass - /// will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change. + /// This field controls which [ListenerClass](DOCS_BASE_URL_PLACEHOLDER/listener-operator/listenerclass.html) is used to expose the Spark services. #[serde(default)] - pub listener_class: CurrentlySupportedListenerClasses, + pub listener_class: SupportedListenerClasses, } #[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)] @@ -178,29 +168,6 @@ pub mod versioned { } } -// TODO: Temporary solution until listener-operator is finished -#[derive(Clone, Debug, Default, Display, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] -#[serde(rename_all = "PascalCase")] -pub(crate) enum CurrentlySupportedListenerClasses { - #[serde(rename = "cluster-internal")] - ClusterInternal, - #[default] - #[serde(rename = "external-unstable")] - ExternalUnstable, - #[serde(rename = "external-stable")] - ExternalStable, -} - -impl CurrentlySupportedListenerClasses { - pub fn k8s_service_type(&self) -> String { - match self { - CurrentlySupportedListenerClasses::ClusterInternal => "ClusterIP".to_string(), - CurrentlySupportedListenerClasses::ExternalUnstable => "NodePort".to_string(), - CurrentlySupportedListenerClasses::ExternalStable => "LoadBalancer".to_string(), - } - } -} - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)] #[fragment_attrs( diff --git a/rust/operator-binary/src/connect/server.rs b/rust/operator-binary/src/connect/server.rs index 18944430..fb647079 100644 --- a/rust/operator-binary/src/connect/server.rs +++ b/rust/operator-binary/src/connect/server.rs @@ -349,35 +349,13 @@ pub(crate) fn build_deployment( }) } -#[allow(clippy::result_large_err)] -pub(crate) fn build_service( +// This is the headless driver service used for the internal +// communication with the executors as recommended by the Spark docs. +pub(crate) fn build_internal_service( scs: &v1alpha1::SparkConnectServer, app_version_label: &str, - service_cluster_ip: Option, ) -> Result { - let (service_name, service_type, publish_not_ready_addresses) = match service_cluster_ip.clone() - { - Some(_) => ( - // These are the properties of the headless driver service used for the internal - // communication with the executors as recommended by the Spark docs. - // - // The flag `publish_not_ready_addresses` *must* be `true` to allow for readiness - // probes. Without it, the driver runs into a deadlock beacuse the Pod cannot become - // "ready" until the Service is "ready" and vice versa. - object_name(&scs.name_any(), SparkConnectRole::Server), - "ClusterIP".to_string(), - Some(true), - ), - None => ( - format!( - "{}-{}", - object_name(&scs.name_any(), SparkConnectRole::Server), - SparkConnectRole::Server - ), - scs.spec.cluster_config.listener_class.k8s_service_type(), - Some(false), - ), - }; + let service_name = object_name(&scs.name_any(), SparkConnectRole::Server); let selector = Labels::role_selector(scs, APP_NAME, &SparkConnectRole::Server.to_string()) .context(LabelBuildSnafu)? @@ -398,8 +376,8 @@ pub(crate) fn build_service( .with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?) .build(), spec: Some(ServiceSpec { - type_: Some(service_type), - cluster_ip: service_cluster_ip, + type_: Some("ClusterIP".to_string()), + cluster_ip: Some("None".to_string()), ports: Some(vec![ ServicePort { name: Some(String::from(GRPC)), @@ -413,7 +391,10 @@ pub(crate) fn build_service( }, ]), selector: Some(selector), - publish_not_ready_addresses, + // The flag `publish_not_ready_addresses` *must* be `true` to allow for readiness + // probes. Without it, the driver runs into a deadlock beacuse the Pod cannot become + // "ready" until the Service is "ready" and vice versa. + publish_not_ready_addresses: Some(true), ..ServiceSpec::default() }), status: None, diff --git a/rust/operator-binary/src/crd/history.rs b/rust/operator-binary/src/crd/history.rs index df01178a..4144acbc 100644 --- a/rust/operator-binary/src/crd/history.rs +++ b/rust/operator-binary/src/crd/history.rs @@ -32,7 +32,10 @@ use stackable_operator::{ use strum::{Display, EnumIter}; use crate::{ - crd::{affinity::history_affinity, constants::*, logdir::ResolvedLogDir}, + crd::{ + affinity::history_affinity, constants::*, listener::SupportedListenerClasses, + logdir::ResolvedLogDir, + }, history::config::jvm::construct_history_jvm_args, }; @@ -62,6 +65,7 @@ pub enum Error { #[versioned(version(name = "v1alpha1"))] pub mod versioned { + /// A Spark cluster history server component. This resource is managed by the Stackable operator /// for Apache Spark. Find more information on how to use it in the /// [operator documentation](DOCS_BASE_URL_PLACEHOLDER/spark-k8s/usage-guide/history-server). @@ -103,42 +107,8 @@ pub mod versioned { #[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] pub struct SparkHistoryServerClusterConfig { - /// This field controls which type of Service the Operator creates for this HistoryServer: - /// - /// * cluster-internal: Use a ClusterIP service - /// - /// * external-unstable: Use a NodePort service - /// - /// * external-stable: Use a LoadBalancer service - /// - /// This is a temporary solution with the goal to keep yaml manifests forward compatible. - /// In the future, this setting will control which ListenerClass - /// will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change. #[serde(default)] - pub listener_class: CurrentlySupportedListenerClasses, - } -} - -// TODO: Temporary solution until listener-operator is finished -#[derive(Clone, Debug, Default, Display, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] -#[serde(rename_all = "PascalCase")] -pub enum CurrentlySupportedListenerClasses { - #[default] - #[serde(rename = "cluster-internal")] - ClusterInternal, - #[serde(rename = "external-unstable")] - ExternalUnstable, - #[serde(rename = "external-stable")] - ExternalStable, -} - -impl CurrentlySupportedListenerClasses { - pub fn k8s_service_type(&self) -> String { - match self { - CurrentlySupportedListenerClasses::ClusterInternal => "ClusterIP".to_string(), - CurrentlySupportedListenerClasses::ExternalUnstable => "NodePort".to_string(), - CurrentlySupportedListenerClasses::ExternalStable => "LoadBalancer".to_string(), - } + pub listener_class: SupportedListenerClasses, } } diff --git a/rust/operator-binary/src/crd/listener.rs b/rust/operator-binary/src/crd/listener.rs new file mode 100644 index 00000000..c4b82b79 --- /dev/null +++ b/rust/operator-binary/src/crd/listener.rs @@ -0,0 +1,25 @@ +use serde::{Deserialize, Serialize}; +use stackable_operator::{ + config::merge::Atomic, + schemars::{self, JsonSchema}, +}; +use strum::Display; + +impl Atomic for SupportedListenerClasses {} + +#[derive(Clone, Debug, Default, Display, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "PascalCase")] +pub enum SupportedListenerClasses { + #[default] + #[serde(rename = "cluster-internal")] + #[strum(serialize = "cluster-internal")] + ClusterInternal, + + #[serde(rename = "external-unstable")] + #[strum(serialize = "external-unstable")] + ExternalUnstable, + + #[serde(rename = "external-stable")] + #[strum(serialize = "external-stable")] + ExternalStable, +} diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 340ff5fa..d0d82691 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -55,6 +55,7 @@ use crate::{ pub mod affinity; pub mod constants; pub mod history; +pub mod listener; pub mod logdir; pub mod roles; pub mod tlscerts; diff --git a/rust/operator-binary/src/history/history_controller.rs b/rust/operator-binary/src/history/history_controller.rs index 2bc679cb..0fa05ce1 100644 --- a/rust/operator-binary/src/history/history_controller.rs +++ b/rust/operator-binary/src/history/history_controller.rs @@ -268,17 +268,6 @@ pub async fn reconcile( .context(ProductConfigValidationSnafu)? .iter() { - let service = build_service( - shs, - &resolved_product_image.app_version_label, - role_name, - None, - )?; - cluster_resources - .add(client, service) - .await - .context(ApplyServiceSnafu)?; - for (rolegroup_name, rolegroup_config) in role_config.iter() { let rgr = RoleGroupRef { cluster: ObjectRef::from_obj(shs), @@ -290,12 +279,8 @@ pub async fn reconcile( .merged_config(&rgr) .context(FailedToResolveConfigSnafu)?; - let service = build_service( - shs, - &resolved_product_image.app_version_label, - role_name, - Some(&rgr), - )?; + let service = + build_rolegroup_service(shs, &resolved_product_image.app_version_label, &rgr)?; cluster_resources .add(client, service) .await @@ -610,73 +595,6 @@ fn build_stateful_set( }) } -#[allow(clippy::result_large_err)] -fn build_service( - shs: &v1alpha1::SparkHistoryServer, - app_version_label: &str, - role: &str, - group: Option<&RoleGroupRef>, -) -> Result { - let group_name = match group { - Some(rgr) => rgr.role_group.clone(), - None => "global".to_owned(), - }; - - let (service_name, service_type, service_cluster_ip) = match group { - Some(rgr) => ( - rgr.object_name(), - "ClusterIP".to_string(), - Some("None".to_string()), - ), - None => ( - // TODO (@NickLarsenNZ): Explain this unwrap. Either convert to expect, or gracefully handle the error. - format!("{}-{}", shs.metadata.name.as_ref().unwrap(), role), - shs.spec.cluster_config.listener_class.k8s_service_type(), - None, - ), - }; - - let selector = match group { - Some(rgr) => Labels::role_group_selector(shs, APP_NAME, &rgr.role, &rgr.role_group) - .context(LabelBuildSnafu)? - .into(), - None => Labels::role_selector(shs, APP_NAME, role) - .context(LabelBuildSnafu)? - .into(), - }; - - Ok(Service { - metadata: ObjectMetaBuilder::new() - .name_and_namespace(shs) - .name(service_name) - .ownerreference_from_resource(shs, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(labels(shs, app_version_label, &group_name)) - .context(MetadataBuildSnafu)? - .with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?) - .build(), - spec: Some(ServiceSpec { - type_: Some(service_type), - cluster_ip: service_cluster_ip, - ports: Some(vec![ - ServicePort { - name: Some(String::from("http")), - port: 18080, - ..ServicePort::default() - }, - ServicePort { - name: Some(String::from("metrics")), - port: METRICS_PORT.into(), - ..ServicePort::default() - }, - ]), - selector: Some(selector), - ..ServiceSpec::default() - }), - status: None, - }) -} - // TODO: This function should be replaced with operator-rs build_rbac_resources. // See: https://github.com/stackabletech/spark-k8s-operator/issues/499 #[allow(clippy::result_large_err)] @@ -816,3 +734,58 @@ fn cleaner_config( Ok(result) } + +/// The rolegroup [`Service`] is a headless service that allows direct access to the instances of a certain rolegroup +/// +/// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing. +#[allow(clippy::result_large_err)] +fn build_rolegroup_service( + shs: &v1alpha1::SparkHistoryServer, + app_version_label: &str, + group: &RoleGroupRef, +) -> Result { + let ports = Some(vec![ + ServicePort { + name: Some(String::from("http")), + port: 18080, + ..ServicePort::default() + }, + ServicePort { + name: Some(String::from("metrics")), + port: METRICS_PORT.into(), + ..ServicePort::default() + }, + ]); + + let metadata = ObjectMetaBuilder::new() + .name_and_namespace(shs) + .name(group.object_name()) + .ownerreference_from_resource(shs, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(labels(shs, app_version_label, &group.role_group)) + .context(MetadataBuildSnafu)? + .with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?) + .build(); + + let selector = Some( + Labels::role_group_selector(shs, APP_NAME, &group.role, &group.role_group) + .context(LabelBuildSnafu)? + .into(), + ); + + let service_spec = ServiceSpec { + // Internal communication does not need to be exposed + type_: Some("ClusterIP".to_string()), + cluster_ip: Some("None".to_string()), + ports, + selector, + publish_not_ready_addresses: Some(true), + ..ServiceSpec::default() + }; + + Ok(Service { + metadata, + spec: Some(service_spec), + status: None, + }) +} From c3b98b24d09b6fdb1d019e6f79003ceca8379856 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Wed, 23 Apr 2025 15:18:21 +0200 Subject: [PATCH 02/19] expose history pods via listener classes --- .../pages/usage-guide/listenerclass.adoc | 23 +++---- rust/operator-binary/src/crd/constants.rs | 3 + .../src/history/history_controller.rs | 66 ++++++++++++------- .../06-deploy-history-server.yaml.j2 | 4 ++ 4 files changed, 61 insertions(+), 35 deletions(-) diff --git a/docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc b/docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc index 7df222d0..06e19059 100644 --- a/docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc +++ b/docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc @@ -1,18 +1,19 @@ -= Service exposition with ListenerClasses += History service exposition with listener classes +:description: Configure the Spark history service exposure with listener classes: cluster-internal, external-unstable, or external-stable. -The Spark operator deploys SparkApplications, and does not offer a UI or other API, so no services are exposed. -However, the operator can also deploy HistoryServers, which do offer a UI and API. -The operator deploys a service called `-historyserver` (where `` is the name of the spark application) through which the HistoryServer can be reached. - -This service can have three different types: `cluster-internal`, `external-unstable` and `external-stable`. -Read more about the types in the xref:concepts:service-exposition.adoc[service exposition] documentation at platform level. - -This is how the ListenerClass is configured: +The operator deploys a xref:listener-operator:listener.adoc[Listener] for each spark history pod. +The default is to only being accessible from within the Kubernetes cluster, but this can be changed by setting `.spec.clusterConfig.listenerClass`: [source,yaml] ---- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkHistoryServer +metadata: + name: spark-history spec: clusterConfig: - listenerClass: cluster-internal # <1> + listenerClass: external-unstable # <1> ---- -<1> The default `cluster-internal` setting. +<1> Specify one of `external-stable`, `external-unstable`, `cluster-internal` (the default setting is `cluster-internal`). + +For the example above, the listener operator creates a service named `spark-history-node-default-0-listener` where `spark-history` is the name of the SparkHistoryServer, `node` is the service role (the only service role available for history servers) and `default` is the role group. diff --git a/rust/operator-binary/src/crd/constants.rs b/rust/operator-binary/src/crd/constants.rs index f25eb254..acbd5909 100644 --- a/rust/operator-binary/src/crd/constants.rs +++ b/rust/operator-binary/src/crd/constants.rs @@ -88,3 +88,6 @@ pub const SPARK_ENV_SH_FILE_NAME: &str = "spark-env.sh"; pub const SPARK_CLUSTER_ROLE: &str = "spark-k8s-clusterrole"; pub const SPARK_UID: i64 = 1000; pub const METRICS_PORT: u16 = 18081; + +pub const LISTENER_VOLUME_NAME: &str = "listener"; +pub const LISTENER_VOLUME_DIR: &str = "/stackable/listener"; diff --git a/rust/operator-binary/src/history/history_controller.rs b/rust/operator-binary/src/history/history_controller.rs index 0fa05ce1..3fec9236 100644 --- a/rust/operator-binary/src/history/history_controller.rs +++ b/rust/operator-binary/src/history/history_controller.rs @@ -52,11 +52,12 @@ use crate::{ crd::{ constants::{ ACCESS_KEY_ID, APP_NAME, HISTORY_CONTROLLER_NAME, HISTORY_ROLE_NAME, - JVM_SECURITY_PROPERTIES_FILE, MAX_SPARK_LOG_FILES_SIZE, METRICS_PORT, OPERATOR_NAME, - SECRET_ACCESS_KEY, SPARK_CLUSTER_ROLE, SPARK_DEFAULTS_FILE_NAME, - SPARK_ENV_SH_FILE_NAME, SPARK_IMAGE_BASE_NAME, SPARK_UID, STACKABLE_TRUST_STORE, - VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_NAME_LOG_CONFIG, - VOLUME_MOUNT_PATH_CONFIG, VOLUME_MOUNT_PATH_LOG, VOLUME_MOUNT_PATH_LOG_CONFIG, + JVM_SECURITY_PROPERTIES_FILE, LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, + MAX_SPARK_LOG_FILES_SIZE, METRICS_PORT, OPERATOR_NAME, SECRET_ACCESS_KEY, + SPARK_CLUSTER_ROLE, SPARK_DEFAULTS_FILE_NAME, SPARK_ENV_SH_FILE_NAME, + SPARK_IMAGE_BASE_NAME, SPARK_UID, STACKABLE_TRUST_STORE, VOLUME_MOUNT_NAME_CONFIG, + VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_CONFIG, + VOLUME_MOUNT_PATH_LOG, VOLUME_MOUNT_PATH_LOG_CONFIG, }, history::{self, HistoryConfig, SparkHistoryServerContainer, v1alpha1}, logdir::ResolvedLogDir, @@ -437,12 +438,16 @@ fn build_stateful_set( rolegroupref.object_name() }; - let metadata = ObjectMetaBuilder::new() - .with_recommended_labels(labels( - shs, - &resolved_product_image.app_version_label, - &rolegroupref.role_group, - )) + let recommended_object_labels = labels( + shs, + &resolved_product_image.app_version_label, + rolegroupref.role_group.as_ref(), + ); + let recommended_labels = + Labels::recommended(recommended_object_labels.clone()).context(LabelBuildSnafu)?; + + let pb_metadata = ObjectMetaBuilder::new() + .with_recommended_labels(recommended_object_labels.clone()) .context(MetadataBuildSnafu)? .build(); @@ -452,7 +457,7 @@ fn build_stateful_set( .requested_secret_lifetime .context(MissingSecretLifetimeSnafu)?; pb.service_account_name(serviceaccount.name_unchecked()) - .metadata(metadata) + .metadata(pb_metadata) .image_pull_secrets_from_product_image(resolved_product_image) .add_volume( VolumeBuilder::new(VOLUME_MOUNT_NAME_CONFIG) @@ -524,7 +529,22 @@ fn build_stateful_set( .context(AddVolumeMountSnafu)? .add_volume_mount(VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_PATH_LOG) .context(AddVolumeMountSnafu)? + .add_volume_mount(LISTENER_VOLUME_NAME, LISTENER_VOLUME_DIR) + .context(AddVolumeMountSnafu)? .build(); + + // Add listener volume + let listener_class = &shs.spec.cluster_config.listener_class; + // all listeners will use ephemeral volumes as they can/should + // be removed when the pods are *terminated* (ephemeral PVCs will + // survive re-starts) + pb.add_listener_volume_by_listener_class( + LISTENER_VOLUME_NAME, + &listener_class.to_string(), + &recommended_labels.clone(), + ) + .context(AddVolumeSnafu)?; + pb.add_container(container); if merged_config.logging.enable_vector_agent { @@ -560,19 +580,17 @@ fn build_stateful_set( pod_template.merge_from(shs.role().config.pod_overrides.clone()); pod_template.merge_from(role_group.config.pod_overrides); + let sts_metadata = ObjectMetaBuilder::new() + .name_and_namespace(shs) + .name(rolegroupref.object_name()) + .ownerreference_from_resource(shs, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(recommended_object_labels) + .context(MetadataBuildSnafu)? + .build(); + Ok(StatefulSet { - metadata: ObjectMetaBuilder::new() - .name_and_namespace(shs) - .name(rolegroupref.object_name()) - .ownerreference_from_resource(shs, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(labels( - shs, - &resolved_product_image.app_version_label, - rolegroupref.role_group.as_ref(), - )) - .context(MetadataBuildSnafu)? - .build(), + metadata: sts_metadata, spec: Some(StatefulSetSpec { template: pod_template, replicas: shs.replicas(rolegroupref), diff --git a/tests/templates/kuttl/spark-history-server/06-deploy-history-server.yaml.j2 b/tests/templates/kuttl/spark-history-server/06-deploy-history-server.yaml.j2 index 967cdd7e..769bb97f 100644 --- a/tests/templates/kuttl/spark-history-server/06-deploy-history-server.yaml.j2 +++ b/tests/templates/kuttl/spark-history-server/06-deploy-history-server.yaml.j2 @@ -35,6 +35,10 @@ spec: {% if lookup('env', 'VECTOR_AGGREGATOR') %} vectorAggregatorConfigMapName: vector-aggregator-discovery {% endif %} + + clusterConfig: + listenerClass: external-unstable + logFileDirectory: s3: prefix: eventlogs/ From 9ffb187840bcb13bb164c72c6681f5d4d5277552 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Wed, 23 Apr 2025 17:56:16 +0200 Subject: [PATCH 03/19] expose spark connect pods via listener classes --- .../pages/usage-guide/listenerclass.adoc | 10 +++++-- rust/operator-binary/src/connect/server.rs | 30 +++++++++++++------ 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc b/docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc index 06e19059..3936e3e2 100644 --- a/docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc +++ b/docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc @@ -1,5 +1,7 @@ -= History service exposition with listener classes -:description: Configure the Spark history service exposure with listener classes: cluster-internal, external-unstable, or external-stable. += Service exposition with listener classes +:description: Configure the Spark connect and history services exposure with listener classes: cluster-internal, external-unstable, or external-stable. + +== History services The operator deploys a xref:listener-operator:listener.adoc[Listener] for each spark history pod. The default is to only being accessible from within the Kubernetes cluster, but this can be changed by setting `.spec.clusterConfig.listenerClass`: @@ -17,3 +19,7 @@ spec: <1> Specify one of `external-stable`, `external-unstable`, `cluster-internal` (the default setting is `cluster-internal`). For the example above, the listener operator creates a service named `spark-history-node-default-0-listener` where `spark-history` is the name of the SparkHistoryServer, `node` is the service role (the only service role available for history servers) and `default` is the role group. + +== Connect services + +Connect pods can be exposed using listener classes in exactly tha same fashion as history servers. diff --git a/rust/operator-binary/src/connect/server.rs b/rust/operator-binary/src/connect/server.rs index fb647079..f1914f1e 100644 --- a/rust/operator-binary/src/connect/server.rs +++ b/rust/operator-binary/src/connect/server.rs @@ -38,10 +38,11 @@ use crate::{ }, }, crd::constants::{ - APP_NAME, JVM_SECURITY_PROPERTIES_FILE, LOG4J2_CONFIG_FILE, MAX_SPARK_LOG_FILES_SIZE, - METRICS_PROPERTIES_FILE, POD_TEMPLATE_FILE, SPARK_DEFAULTS_FILE_NAME, SPARK_UID, - VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_NAME_LOG_CONFIG, - VOLUME_MOUNT_PATH_CONFIG, VOLUME_MOUNT_PATH_LOG, VOLUME_MOUNT_PATH_LOG_CONFIG, + APP_NAME, JVM_SECURITY_PROPERTIES_FILE, LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, + LOG4J2_CONFIG_FILE, MAX_SPARK_LOG_FILES_SIZE, METRICS_PROPERTIES_FILE, POD_TEMPLATE_FILE, + SPARK_DEFAULTS_FILE_NAME, SPARK_UID, VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_NAME_LOG, + VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_CONFIG, VOLUME_MOUNT_PATH_LOG, + VOLUME_MOUNT_PATH_LOG_CONFIG, }, product_logging, }; @@ -196,12 +197,15 @@ pub(crate) fn build_deployment( config_map: &ConfigMap, args: Vec, ) -> Result { + let server_role = SparkConnectRole::Server.to_string(); + let recommended_object_labels = + common::labels(scs, &resolved_product_image.app_version_label, &server_role); + + let recommended_labels = + Labels::recommended(recommended_object_labels.clone()).context(LabelBuildSnafu)?; + let metadata = ObjectMetaBuilder::new() - .with_recommended_labels(common::labels( - scs, - &resolved_product_image.app_version_label, - &SparkConnectRole::Server.to_string(), - )) + .with_recommended_labels(recommended_object_labels) .context(MetadataBuildSnafu)? .with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?) .build(); @@ -226,6 +230,12 @@ pub(crate) fn build_deployment( .build(), ) .context(AddVolumeSnafu)? + .add_listener_volume_by_listener_class( + LISTENER_VOLUME_NAME, + &scs.spec.cluster_config.listener_class.to_string(), + &recommended_labels.clone(), + ) + .context(AddVolumeSnafu)? .security_context(PodSecurityContext { run_as_user: Some(SPARK_UID), run_as_group: Some(0), @@ -260,6 +270,8 @@ pub(crate) fn build_deployment( .context(AddVolumeMountSnafu)? .add_volume_mount(VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_PATH_LOG) .context(AddVolumeMountSnafu)? + .add_volume_mount(LISTENER_VOLUME_NAME, LISTENER_VOLUME_DIR) + .context(AddVolumeMountSnafu)? .readiness_probe(probe()) .liveness_probe(probe()); From 4c1c18f6329274560efccfd0ae6d682721744312 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Wed, 23 Apr 2025 17:58:36 +0200 Subject: [PATCH 04/19] update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c0168c5..75b0bd91 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file. - Use `--file-log-max-files` (or `FILE_LOG_MAX_FILES`) to limit the number of log files kept. - Use `--file-log-rotation-period` (or `FILE_LOG_ROTATION_PERIOD`) to configure the frequency of rotation. - Use `--console-log-format` (or `CONSOLE_LOG_FORMAT`) to set the format to `plain` (default) or `json`. +- Expose history and connect services via listener classes ([#562]) ### Changed @@ -35,6 +36,7 @@ All notable changes to this project will be documented in this file. [#554]: https://github.com/stackabletech/spark-k8s-operator/pull/554 [#559]: https://github.com/stackabletech/spark-k8s-operator/pull/559 [#560]: https://github.com/stackabletech/spark-k8s-operator/pull/560 +[#562]: https://github.com/stackabletech/spark-k8s-operator/pull/562 ## [25.3.0] - 2025-03-21 From 3f79af6bcee40bafd01951236f15deb73632eb76 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Mon, 28 Apr 2025 12:36:44 +0200 Subject: [PATCH 05/19] connect: use persistent volumes for listener and move from deplument to stateful set Decided to move to stateful set because deployments don't support volume claim templates. --- .../operator-binary/src/connect/controller.rs | 8 +-- rust/operator-binary/src/connect/server.rs | 60 ++++++++++++++----- rust/operator-binary/src/crd/listener.rs | 10 ++++ .../kuttl/spark-connect/10-assert.yaml | 3 +- 4 files changed, 60 insertions(+), 21 deletions(-) diff --git a/rust/operator-binary/src/connect/controller.rs b/rust/operator-binary/src/connect/controller.rs index d1d49959..1234f096 100644 --- a/rust/operator-binary/src/connect/controller.rs +++ b/rust/operator-binary/src/connect/controller.rs @@ -11,8 +11,8 @@ use stackable_operator::{ }, logging::controller::ReconcilerError, status::condition::{ - compute_conditions, deployment::DeploymentConditionBuilder, - operations::ClusterOperationsConditionBuilder, + compute_conditions, operations::ClusterOperationsConditionBuilder, + statefulset::StatefulSetConditionBuilder, }, time::Duration, }; @@ -264,7 +264,7 @@ pub async fn reconcile( })?; let args = server::command_args(&scs.spec.args); - let deployment = server::build_deployment( + let deployment = server::build_stateful_set( scs, &server_config, &resolved_product_image, @@ -274,7 +274,7 @@ pub async fn reconcile( ) .context(BuildServerDeploymentSnafu)?; - let mut ss_cond_builder = DeploymentConditionBuilder::default(); + let mut ss_cond_builder = StatefulSetConditionBuilder::default(); ss_cond_builder.add( cluster_resources diff --git a/rust/operator-binary/src/connect/server.rs b/rust/operator-binary/src/connect/server.rs index f1914f1e..f445839e 100644 --- a/rust/operator-binary/src/connect/server.rs +++ b/rust/operator-binary/src/connect/server.rs @@ -7,15 +7,20 @@ use stackable_operator::{ configmap::ConfigMapBuilder, meta::ObjectMetaBuilder, pod::{ - PodBuilder, container::ContainerBuilder, resources::ResourceRequirementsBuilder, - volume::VolumeBuilder, + PodBuilder, + container::ContainerBuilder, + resources::ResourceRequirementsBuilder, + volume::{ + ListenerOperatorVolumeSourceBuilder, ListenerOperatorVolumeSourceBuilderError, + ListenerReference, VolumeBuilder, + }, }, }, commons::product_image_selection::ResolvedProductImage, k8s_openapi::{ DeepMerge, api::{ - apps::v1::{Deployment, DeploymentSpec}, + apps::v1::{StatefulSet, StatefulSetSpec}, core::v1::{ ConfigMap, EnvVar, HTTPGetAction, PodSecurityContext, Probe, Service, ServiceAccount, ServicePort, ServiceSpec, @@ -53,6 +58,11 @@ const HTTP: &str = "http"; #[derive(Snafu, Debug)] #[allow(clippy::enum_variant_names)] pub enum Error { + #[snafu(display("failed to build listener volume"))] + BuildListenerVolume { + source: ListenerOperatorVolumeSourceBuilderError, + }, + #[snafu(display("vector agent is enabled but vector aggregator ConfigMap is missing"))] VectorAggregatorConfigMapMissing, @@ -188,15 +198,14 @@ pub(crate) fn server_config_map( .context(InvalidConfigMapSnafu { name: cm_name }) } -#[allow(clippy::result_large_err)] -pub(crate) fn build_deployment( +pub(crate) fn build_stateful_set( scs: &v1alpha1::SparkConnectServer, config: &v1alpha1::ServerConfig, resolved_product_image: &ResolvedProductImage, service_account: &ServiceAccount, config_map: &ConfigMap, args: Vec, -) -> Result { +) -> Result { let server_role = SparkConnectRole::Server.to_string(); let recommended_object_labels = common::labels(scs, &resolved_product_image.app_version_label, &server_role); @@ -230,12 +239,6 @@ pub(crate) fn build_deployment( .build(), ) .context(AddVolumeSnafu)? - .add_listener_volume_by_listener_class( - LISTENER_VOLUME_NAME, - &scs.spec.cluster_config.listener_class.to_string(), - &recommended_labels.clone(), - ) - .context(AddVolumeSnafu)? .security_context(PodSecurityContext { run_as_user: Some(SPARK_UID), run_as_group: Some(0), @@ -320,13 +323,37 @@ pub(crate) fn build_deployment( } } + // Add listener volume + let listener_class = &scs.spec.cluster_config.listener_class; + let pvcs = if listener_class.discoverable() { + // externally reachable listener endpoints will use persistent volumes + // so that load balancers can hard-code the target addresses + let pvc = ListenerOperatorVolumeSourceBuilder::new( + &ListenerReference::ListenerClass(listener_class.to_string()), + &recommended_labels, + ) + .context(BuildListenerVolumeSnafu)? + .build_pvc(LISTENER_VOLUME_NAME.to_string()) + .context(BuildListenerVolumeSnafu)?; + Some(vec![pvc]) + } else { + // non-reachable endpoints use ephemeral volumes + pb.add_listener_volume_by_listener_class( + LISTENER_VOLUME_NAME, + &listener_class.to_string(), + &recommended_labels, + ) + .context(AddVolumeSnafu)?; + None + }; + // Merge user defined pod template if available let mut pod_template = pb.build_template(); if let Some(pod_overrides_spec) = scs.spec.server.as_ref().map(|s| s.pod_overrides.clone()) { pod_template.merge_from(pod_overrides_spec); } - Ok(Deployment { + Ok(StatefulSet { metadata: ObjectMetaBuilder::new() .name_and_namespace(scs) .name(object_name(&scs.name_any(), SparkConnectRole::Server)) @@ -339,9 +366,10 @@ pub(crate) fn build_deployment( )) .context(MetadataBuildSnafu)? .build(), - spec: Some(DeploymentSpec { + spec: Some(StatefulSetSpec { template: pod_template, replicas: Some(1), + volume_claim_templates: pvcs, selector: LabelSelector { match_labels: Some( Labels::role_group_selector( @@ -355,9 +383,9 @@ pub(crate) fn build_deployment( ), ..LabelSelector::default() }, - ..DeploymentSpec::default() + ..StatefulSetSpec::default() }), - ..Deployment::default() + ..StatefulSet::default() }) } diff --git a/rust/operator-binary/src/crd/listener.rs b/rust/operator-binary/src/crd/listener.rs index c4b82b79..1f3331ec 100644 --- a/rust/operator-binary/src/crd/listener.rs +++ b/rust/operator-binary/src/crd/listener.rs @@ -23,3 +23,13 @@ pub enum SupportedListenerClasses { #[strum(serialize = "external-stable")] ExternalStable, } + +impl SupportedListenerClasses { + pub fn discoverable(&self) -> bool { + match self { + SupportedListenerClasses::ClusterInternal => false, + SupportedListenerClasses::ExternalUnstable => true, + SupportedListenerClasses::ExternalStable => true, + } + } +} diff --git a/tests/templates/kuttl/spark-connect/10-assert.yaml b/tests/templates/kuttl/spark-connect/10-assert.yaml index 5b75cf2c..8a5162de 100644 --- a/tests/templates/kuttl/spark-connect/10-assert.yaml +++ b/tests/templates/kuttl/spark-connect/10-assert.yaml @@ -4,7 +4,7 @@ kind: TestAssert timeout: 300 --- apiVersion: apps/v1 -kind: Deployment +kind: StatefulSet metadata: name: spark-connect-server status: @@ -18,6 +18,7 @@ commands: # Sleep to prevent the following spark connect app from failing # while the spark-connect server is busy setting up the executors. - script: | + set -xou pipefail sleep 10 EXECUTOR_COUNT=$(kubectl get pods -n $NAMESPACE --selector 'spark-app-name=spark-connect-server' --field-selector='status.phase=Running' -o NAME|wc -l) test 1 -eq "$EXECUTOR_COUNT" From 2c9488fac4d9dfe38cf0a76672eb1d34e7145c2c Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Mon, 28 Apr 2025 14:32:45 +0200 Subject: [PATCH 06/19] history: use persistent columes for listener volumes --- .../src/history/history_controller.rs | 45 ++++++++++++++----- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/rust/operator-binary/src/history/history_controller.rs b/rust/operator-binary/src/history/history_controller.rs index 3fec9236..8df21f1b 100644 --- a/rust/operator-binary/src/history/history_controller.rs +++ b/rust/operator-binary/src/history/history_controller.rs @@ -11,8 +11,13 @@ use stackable_operator::{ configmap::ConfigMapBuilder, meta::ObjectMetaBuilder, pod::{ - PodBuilder, container::ContainerBuilder, resources::ResourceRequirementsBuilder, - volume::VolumeBuilder, + PodBuilder, + container::ContainerBuilder, + resources::ResourceRequirementsBuilder, + volume::{ + ListenerOperatorVolumeSourceBuilder, ListenerOperatorVolumeSourceBuilderError, + ListenerReference, VolumeBuilder, + }, }, }, cluster_resources::{ClusterResourceApplyStrategy, ClusterResources}, @@ -71,6 +76,11 @@ use crate::{ #[strum_discriminants(derive(IntoStaticStr))] #[allow(clippy::enum_variant_names)] pub enum Error { + #[snafu(display("failed to build listener volume"))] + BuildListenerVolume { + source: ListenerOperatorVolumeSourceBuilderError, + }, + #[snafu(display("missing secret lifetime"))] MissingSecretLifetime, @@ -535,15 +545,27 @@ fn build_stateful_set( // Add listener volume let listener_class = &shs.spec.cluster_config.listener_class; - // all listeners will use ephemeral volumes as they can/should - // be removed when the pods are *terminated* (ephemeral PVCs will - // survive re-starts) - pb.add_listener_volume_by_listener_class( - LISTENER_VOLUME_NAME, - &listener_class.to_string(), - &recommended_labels.clone(), - ) - .context(AddVolumeSnafu)?; + let pvcs = if listener_class.discoverable() { + // externally reachable listener endpoints will use persistent volumes + // so that load balancers can hard-code the target addresses + let pvc = ListenerOperatorVolumeSourceBuilder::new( + &ListenerReference::ListenerClass(listener_class.to_string()), + &recommended_labels, + ) + .context(BuildListenerVolumeSnafu)? + .build_pvc(LISTENER_VOLUME_NAME.to_string()) + .context(BuildListenerVolumeSnafu)?; + Some(vec![pvc]) + } else { + // non-reachable endpoints use ephemeral volumes + pb.add_listener_volume_by_listener_class( + LISTENER_VOLUME_NAME, + &listener_class.to_string(), + &recommended_labels, + ) + .context(AddVolumeSnafu)?; + None + }; pb.add_container(container); @@ -593,6 +615,7 @@ fn build_stateful_set( metadata: sts_metadata, spec: Some(StatefulSetSpec { template: pod_template, + volume_claim_templates: pvcs, replicas: shs.replicas(rolegroupref), selector: LabelSelector { match_labels: Some( From e0fa9822060431be0fcfcf68a4b2e506f7f8cfbf Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Tue, 29 Apr 2025 12:07:40 +0200 Subject: [PATCH 07/19] history: create group listeners and update crd --- deploy/helm/spark-k8s-operator/crds/crds.yaml | 17 ++- .../spark-k8s-operator/templates/roles.yaml | 11 ++ .../pages/usage-guide/listenerclass.adoc | 5 +- rust/operator-binary/src/crd/constants.rs | 1 + rust/operator-binary/src/crd/history.rs | 25 ++-- .../src/history/history_controller.rs | 111 +++++++++++++----- .../06-deploy-history-server.yaml.j2 | 4 +- 7 files changed, 124 insertions(+), 50 deletions(-) diff --git a/deploy/helm/spark-k8s-operator/crds/crds.yaml b/deploy/helm/spark-k8s-operator/crds/crds.yaml index 2cc0d369..e26d109e 100644 --- a/deploy/helm/spark-k8s-operator/crds/crds.yaml +++ b/deploy/helm/spark-k8s-operator/crds/crds.yaml @@ -1118,17 +1118,8 @@ spec: description: A Spark cluster history server component. This resource is managed by the Stackable operator for Apache Spark. Find more information on how to use it in the [operator documentation](https://docs.stackable.tech/home/nightly/spark-k8s/usage-guide/history-server). properties: clusterConfig: - default: - listenerClass: cluster-internal + default: {} description: Global Spark history server configuration that applies to all roles and role groups. - properties: - listenerClass: - default: cluster-internal - enum: - - cluster-internal - - external-unstable - - external-stable - type: string type: object image: anyOf: @@ -1385,6 +1376,9 @@ spec: cleaner: nullable: true type: boolean + listenerClass: + nullable: true + type: string logging: default: containers: {} @@ -1631,6 +1625,9 @@ spec: cleaner: nullable: true type: boolean + listenerClass: + nullable: true + type: string logging: default: containers: {} diff --git a/deploy/helm/spark-k8s-operator/templates/roles.yaml b/deploy/helm/spark-k8s-operator/templates/roles.yaml index 8a7fd882..cd64b380 100644 --- a/deploy/helm/spark-k8s-operator/templates/roles.yaml +++ b/deploy/helm/spark-k8s-operator/templates/roles.yaml @@ -133,3 +133,14 @@ rules: - bind resourceNames: - {{ include "operator.name" . }}-clusterrole + - apiGroups: + - listeners.stackable.tech + resources: + - listeners + verbs: + - get + - list + - watch + - patch + - create + - delete diff --git a/docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc b/docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc index 3936e3e2..34d2cc83 100644 --- a/docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc +++ b/docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc @@ -13,8 +13,9 @@ kind: SparkHistoryServer metadata: name: spark-history spec: - clusterConfig: - listenerClass: external-unstable # <1> + nodes: + config: + listenerClass: external-unstable # <1> ---- <1> Specify one of `external-stable`, `external-unstable`, `cluster-internal` (the default setting is `cluster-internal`). diff --git a/rust/operator-binary/src/crd/constants.rs b/rust/operator-binary/src/crd/constants.rs index acbd5909..3da61f98 100644 --- a/rust/operator-binary/src/crd/constants.rs +++ b/rust/operator-binary/src/crd/constants.rs @@ -88,6 +88,7 @@ pub const SPARK_ENV_SH_FILE_NAME: &str = "spark-env.sh"; pub const SPARK_CLUSTER_ROLE: &str = "spark-k8s-clusterrole"; pub const SPARK_UID: i64 = 1000; pub const METRICS_PORT: u16 = 18081; +pub const HISTORY_UI_PORT: u16 = 18080; pub const LISTENER_VOLUME_NAME: &str = "listener"; pub const LISTENER_VOLUME_DIR: &str = "/stackable/listener"; diff --git a/rust/operator-binary/src/crd/history.rs b/rust/operator-binary/src/crd/history.rs index 4144acbc..3f98f6d7 100644 --- a/rust/operator-binary/src/crd/history.rs +++ b/rust/operator-binary/src/crd/history.rs @@ -32,10 +32,7 @@ use stackable_operator::{ use strum::{Display, EnumIter}; use crate::{ - crd::{ - affinity::history_affinity, constants::*, listener::SupportedListenerClasses, - logdir::ResolvedLogDir, - }, + crd::{affinity::history_affinity, constants::*, logdir::ResolvedLogDir}, history::config::jvm::construct_history_jvm_args, }; @@ -106,13 +103,17 @@ pub mod versioned { #[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] - pub struct SparkHistoryServerClusterConfig { - #[serde(default)] - pub listener_class: SupportedListenerClasses, - } + pub struct SparkHistoryServerClusterConfig {} } impl v1alpha1::SparkHistoryServer { + /// The name of the group-listener provided for a specific role-group. + /// History servers will use this group listener so that only one load balancer + /// is needed (per role group). + pub fn group_listener_name(&self, rolegroup: &RoleGroupRef) -> String { + format!("{}-group", rolegroup.object_name()) + } + /// Returns a reference to the role. Raises an error if the role is not defined. pub fn role(&self) -> &Role { &self.spec.nodes @@ -343,6 +344,9 @@ pub struct HistoryConfig { /// This can be shortened by the `maxCertificateLifetime` setting on the SecretClass issuing the TLS certificate. #[fragment_attrs(serde(default))] pub requested_secret_lifetime: Option, + + #[serde(default)] + pub listener_class: String, } impl HistoryConfig { @@ -366,6 +370,7 @@ impl HistoryConfig { logging: product_logging::spec::default_logging(), affinity: history_affinity(cluster_name), requested_secret_lifetime: Some(Self::DEFAULT_HISTORY_SECRET_LIFETIME), + listener_class: Some(default_listener_class()), } } } @@ -402,6 +407,10 @@ impl Configuration for HistoryConfigFragment { } } +fn default_listener_class() -> String { + "cluster-internal".to_string() +} + #[cfg(test)] mod test { use indoc::indoc; diff --git a/rust/operator-binary/src/history/history_controller.rs b/rust/operator-binary/src/history/history_controller.rs index 8df21f1b..5f0d46f4 100644 --- a/rust/operator-binary/src/history/history_controller.rs +++ b/rust/operator-binary/src/history/history_controller.rs @@ -21,7 +21,10 @@ use stackable_operator::{ }, }, cluster_resources::{ClusterResourceApplyStrategy, ClusterResources}, - commons::product_image_selection::ResolvedProductImage, + commons::{ + listener::{Listener, ListenerPort, ListenerSpec}, + product_image_selection::ResolvedProductImage, + }, k8s_openapi::{ DeepMerge, api::{ @@ -56,7 +59,7 @@ use crate::{ Ctx, crd::{ constants::{ - ACCESS_KEY_ID, APP_NAME, HISTORY_CONTROLLER_NAME, HISTORY_ROLE_NAME, + ACCESS_KEY_ID, APP_NAME, HISTORY_CONTROLLER_NAME, HISTORY_ROLE_NAME, HISTORY_UI_PORT, JVM_SECURITY_PROPERTIES_FILE, LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, MAX_SPARK_LOG_FILES_SIZE, METRICS_PORT, OPERATOR_NAME, SECRET_ACCESS_KEY, SPARK_CLUSTER_ROLE, SPARK_DEFAULTS_FILE_NAME, SPARK_ENV_SH_FILE_NAME, @@ -76,6 +79,11 @@ use crate::{ #[strum_discriminants(derive(IntoStaticStr))] #[allow(clippy::enum_variant_names)] pub enum Error { + #[snafu(display("failed to build object meta data"))] + ObjectMeta { + source: stackable_operator::builder::meta::Error, + }, + #[snafu(display("failed to build listener volume"))] BuildListenerVolume { source: ListenerOperatorVolumeSourceBuilderError, @@ -216,9 +224,12 @@ pub enum Error { #[snafu(display("failed to merge environment config and/or overrides"))] MergeEnv { source: crate::crd::history::Error }, -} -type Result = std::result::Result; + #[snafu(display("failed to apply group listener"))] + ApplyGroupListener { + source: stackable_operator::cluster_resources::Error, + }, +} impl ReconcilerError for Error { fn category(&self) -> &'static str { @@ -229,7 +240,7 @@ impl ReconcilerError for Error { pub async fn reconcile( shs: Arc>, ctx: Arc, -) -> Result { +) -> Result { tracing::info!("Starting reconcile history server"); let shs = shs @@ -322,6 +333,17 @@ pub async fn reconcile( .add(client, sts) .await .context(ApplyDeploymentSnafu)?; + + let rg_group_listener = build_group_listener( + shs, + &resolved_product_image, + &rgr, + merged_config.listener_class.to_string(), + )?; + cluster_resources + .add(client, rg_group_listener) + .await + .context(ApplyGroupListenerSnafu)?; } let role_config = &shs.spec.nodes.role_config; @@ -343,6 +365,50 @@ pub async fn reconcile( Ok(Action::await_change()) } +#[allow(clippy::result_large_err)] +fn build_group_listener( + shs: &v1alpha1::SparkHistoryServer, + resolved_product_image: &ResolvedProductImage, + rolegroup: &RoleGroupRef, + listener_class: String, +) -> Result { + Ok(Listener { + metadata: ObjectMetaBuilder::new() + .name_and_namespace(shs) + .name(shs.group_listener_name(rolegroup)) + .ownerreference_from_resource(shs, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(labels( + shs, + &resolved_product_image.app_version_label, + &rolegroup.role_group, + )) + .context(ObjectMetaSnafu)? + .build(), + spec: ListenerSpec { + class_name: Some(listener_class), + ports: Some(listener_ports()), + ..ListenerSpec::default() + }, + status: None, + }) +} + +fn listener_ports() -> Vec { + vec![ + ListenerPort { + name: "metrics".to_string(), + port: METRICS_PORT.into(), + protocol: Some("TCP".to_string()), + }, + ListenerPort { + name: "http".to_string(), + port: HISTORY_UI_PORT.into(), + protocol: Some("TCP".to_string()), + }, + ] +} + pub fn error_policy( _obj: Arc>, error: &Error, @@ -528,7 +594,7 @@ fn build_stateful_set( "-c".to_string(), ]) .args(command_args(log_dir)) - .add_container_port("http", 18080) + .add_container_port("http", HISTORY_UI_PORT.into()) .add_container_port("metrics", METRICS_PORT.into()) .add_env_vars(merged_env) .add_volume_mounts(log_dir.volume_mounts()) @@ -544,28 +610,19 @@ fn build_stateful_set( .build(); // Add listener volume - let listener_class = &shs.spec.cluster_config.listener_class; - let pvcs = if listener_class.discoverable() { - // externally reachable listener endpoints will use persistent volumes - // so that load balancers can hard-code the target addresses - let pvc = ListenerOperatorVolumeSourceBuilder::new( - &ListenerReference::ListenerClass(listener_class.to_string()), + // Listener endpoints for the Webserver role will use persistent volumes + // so that load balancers can hard-code the target addresses. This will + // be the case even when no class is set (and the value defaults to + // cluster-internal) as the address should still be consistent. + let pvcs = Some(vec![ + ListenerOperatorVolumeSourceBuilder::new( + &ListenerReference::ListenerClass(merged_config.listener_class.to_string()), &recommended_labels, ) .context(BuildListenerVolumeSnafu)? .build_pvc(LISTENER_VOLUME_NAME.to_string()) - .context(BuildListenerVolumeSnafu)?; - Some(vec![pvc]) - } else { - // non-reachable endpoints use ephemeral volumes - pb.add_listener_volume_by_listener_class( - LISTENER_VOLUME_NAME, - &listener_class.to_string(), - &recommended_labels, - ) - .context(AddVolumeSnafu)?; - None - }; + .context(BuildListenerVolumeSnafu)?, + ]); pb.add_container(container); @@ -642,7 +699,7 @@ fn build_stateful_set( fn build_history_role_serviceaccount( shs: &v1alpha1::SparkHistoryServer, app_version_label: &str, -) -> Result<(ServiceAccount, RoleBinding)> { +) -> Result<(ServiceAccount, RoleBinding), Error> { let sa = ServiceAccount { metadata: ObjectMetaBuilder::new() .name_and_namespace(shs) @@ -784,11 +841,11 @@ fn build_rolegroup_service( shs: &v1alpha1::SparkHistoryServer, app_version_label: &str, group: &RoleGroupRef, -) -> Result { +) -> Result { let ports = Some(vec![ ServicePort { name: Some(String::from("http")), - port: 18080, + port: HISTORY_UI_PORT.into(), ..ServicePort::default() }, ServicePort { diff --git a/tests/templates/kuttl/spark-history-server/06-deploy-history-server.yaml.j2 b/tests/templates/kuttl/spark-history-server/06-deploy-history-server.yaml.j2 index 769bb97f..a440a164 100644 --- a/tests/templates/kuttl/spark-history-server/06-deploy-history-server.yaml.j2 +++ b/tests/templates/kuttl/spark-history-server/06-deploy-history-server.yaml.j2 @@ -36,9 +36,6 @@ spec: vectorAggregatorConfigMapName: vector-aggregator-discovery {% endif %} - clusterConfig: - listenerClass: external-unstable - logFileDirectory: s3: prefix: eventlogs/ @@ -48,6 +45,7 @@ spec: #sparkConf: nodes: config: + listenerClass: external-unstable logging: enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} containers: From cee7f2a3929d30fcd08994cd79944b2b0b8e911d Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Tue, 29 Apr 2025 14:44:41 +0200 Subject: [PATCH 08/19] history: remove services created by the operator and update test --- .../src/history/history_controller.rs | 74 ++----------------- .../spark-history-server/20-test-logs.yaml | 2 +- 2 files changed, 6 insertions(+), 70 deletions(-) diff --git a/rust/operator-binary/src/history/history_controller.rs b/rust/operator-binary/src/history/history_controller.rs index 5f0d46f4..6f69db45 100644 --- a/rust/operator-binary/src/history/history_controller.rs +++ b/rust/operator-binary/src/history/history_controller.rs @@ -29,9 +29,7 @@ use stackable_operator::{ DeepMerge, api::{ apps::v1::{StatefulSet, StatefulSetSpec}, - core::v1::{ - ConfigMap, PodSecurityContext, Service, ServiceAccount, ServicePort, ServiceSpec, - }, + core::v1::{ConfigMap, PodSecurityContext, ServiceAccount}, rbac::v1::{ClusterRole, RoleBinding, RoleRef, Subject}, }, apimachinery::pkg::apis::meta::v1::LabelSelector, @@ -41,7 +39,7 @@ use stackable_operator::{ core::{DeserializeGuard, error_boundary}, runtime::{controller::Action, reflector::ObjectRef}, }, - kvp::{Label, Labels, ObjectLabels}, + kvp::{Labels, ObjectLabels}, logging::controller::ReconcilerError, product_logging::{ framework::{LoggingError, calculate_log_volume_size_limit, vector_container}, @@ -301,13 +299,6 @@ pub async fn reconcile( .merged_config(&rgr) .context(FailedToResolveConfigSnafu)?; - let service = - build_rolegroup_service(shs, &resolved_product_image.app_version_label, &rgr)?; - cluster_resources - .add(client, service) - .await - .context(ApplyServiceSnafu)?; - let config_map = build_config_map( shs, rolegroup_config, @@ -614,9 +605,9 @@ fn build_stateful_set( // so that load balancers can hard-code the target addresses. This will // be the case even when no class is set (and the value defaults to // cluster-internal) as the address should still be consistent. - let pvcs = Some(vec![ + let volume_claim_templates = Some(vec![ ListenerOperatorVolumeSourceBuilder::new( - &ListenerReference::ListenerClass(merged_config.listener_class.to_string()), + &ListenerReference::ListenerName(shs.group_listener_name(rolegroupref)), &recommended_labels, ) .context(BuildListenerVolumeSnafu)? @@ -672,7 +663,7 @@ fn build_stateful_set( metadata: sts_metadata, spec: Some(StatefulSetSpec { template: pod_template, - volume_claim_templates: pvcs, + volume_claim_templates, replicas: shs.replicas(rolegroupref), selector: LabelSelector { match_labels: Some( @@ -832,58 +823,3 @@ fn cleaner_config( Ok(result) } - -/// The rolegroup [`Service`] is a headless service that allows direct access to the instances of a certain rolegroup -/// -/// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing. -#[allow(clippy::result_large_err)] -fn build_rolegroup_service( - shs: &v1alpha1::SparkHistoryServer, - app_version_label: &str, - group: &RoleGroupRef, -) -> Result { - let ports = Some(vec![ - ServicePort { - name: Some(String::from("http")), - port: HISTORY_UI_PORT.into(), - ..ServicePort::default() - }, - ServicePort { - name: Some(String::from("metrics")), - port: METRICS_PORT.into(), - ..ServicePort::default() - }, - ]); - - let metadata = ObjectMetaBuilder::new() - .name_and_namespace(shs) - .name(group.object_name()) - .ownerreference_from_resource(shs, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(labels(shs, app_version_label, &group.role_group)) - .context(MetadataBuildSnafu)? - .with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?) - .build(); - - let selector = Some( - Labels::role_group_selector(shs, APP_NAME, &group.role, &group.role_group) - .context(LabelBuildSnafu)? - .into(), - ); - - let service_spec = ServiceSpec { - // Internal communication does not need to be exposed - type_: Some("ClusterIP".to_string()), - cluster_ip: Some("None".to_string()), - ports, - selector, - publish_not_ready_addresses: Some(true), - ..ServiceSpec::default() - }; - - Ok(Service { - metadata, - spec: Some(service_spec), - status: None, - }) -} diff --git a/tests/templates/kuttl/spark-history-server/20-test-logs.yaml b/tests/templates/kuttl/spark-history-server/20-test-logs.yaml index 68c36402..5d7e6484 100644 --- a/tests/templates/kuttl/spark-history-server/20-test-logs.yaml +++ b/tests/templates/kuttl/spark-history-server/20-test-logs.yaml @@ -16,5 +16,5 @@ spec: "bash", "-x", "-c", - "test 2 == $(curl http://spark-history-node-default:18080/api/v1/applications | jq length)", + "test 2 == $(curl http://spark-history-node-default-group:18080/api/v1/applications | jq length)", ] From 8b3ef2ebe09df589f58afafad587e403a1d24499 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Tue, 29 Apr 2025 16:59:49 +0200 Subject: [PATCH 09/19] connect: create server listeners and update crd --- deploy/helm/spark-k8s-operator/crds/crds.yaml | 16 +-- .../operator-binary/src/connect/controller.rs | 20 ++++ rust/operator-binary/src/connect/crd.rs | 13 +-- rust/operator-binary/src/connect/server.rs | 102 ++++++++++++------ rust/operator-binary/src/crd/listener.rs | 69 +++++++----- .../10-deploy-spark-connect.yaml.j2 | 1 + .../20-run-connect-client.yaml.j2 | 2 +- 7 files changed, 146 insertions(+), 77 deletions(-) diff --git a/deploy/helm/spark-k8s-operator/crds/crds.yaml b/deploy/helm/spark-k8s-operator/crds/crds.yaml index e26d109e..5fd1ed83 100644 --- a/deploy/helm/spark-k8s-operator/crds/crds.yaml +++ b/deploy/helm/spark-k8s-operator/crds/crds.yaml @@ -1866,18 +1866,8 @@ spec: type: string type: array clusterConfig: - default: - listenerClass: cluster-internal + default: {} description: Global Spark Connect server configuration that applies to all roles. - properties: - listenerClass: - default: cluster-internal - description: This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) is used to expose the Spark services. - enum: - - cluster-internal - - external-unstable - - external-stable - type: string type: object clusterOperation: default: @@ -2169,6 +2159,10 @@ spec: config: default: {} properties: + listenerClass: + description: This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) is used to expose the Spark services. + nullable: true + type: string logging: default: containers: {} diff --git a/rust/operator-binary/src/connect/controller.rs b/rust/operator-binary/src/connect/controller.rs index 1234f096..42041a5c 100644 --- a/rust/operator-binary/src/connect/controller.rs +++ b/rust/operator-binary/src/connect/controller.rs @@ -29,6 +29,14 @@ use crate::{ #[strum_discriminants(derive(IntoStaticStr))] #[allow(clippy::enum_variant_names)] pub enum Error { + #[snafu(display("failed to build spark connect listener"))] + BuildListener { source: server::Error }, + + #[snafu(display("failed to apply spark connect listener"))] + ApplyListener { + source: stackable_operator::cluster_resources::Error, + }, + #[snafu(display("failed to serialize connect properties"))] SerializeProperties { source: common::Error }, @@ -263,6 +271,8 @@ pub async fn reconcile( name: scs.name_unchecked(), })?; + // ======================================== + // Server stateful set let args = server::command_args(&scs.spec.args); let deployment = server::build_stateful_set( scs, @@ -274,6 +284,16 @@ pub async fn reconcile( ) .context(BuildServerDeploymentSnafu)?; + // ======================================== + // Server listener + let listener = server::build_listener(scs, &server_config, &resolved_product_image) + .context(BuildListenerSnafu)?; + + cluster_resources + .add(client, listener) + .await + .context(ApplyListenerSnafu)?; + let mut ss_cond_builder = StatefulSetConditionBuilder::default(); ss_cond_builder.add( diff --git a/rust/operator-binary/src/connect/crd.rs b/rust/operator-binary/src/connect/crd.rs index 127bdb40..9023fe71 100644 --- a/rust/operator-binary/src/connect/crd.rs +++ b/rust/operator-binary/src/connect/crd.rs @@ -33,7 +33,7 @@ use stackable_operator::{ use strum::{Display, EnumIter}; use super::common::SparkConnectRole; -use crate::crd::{constants::APP_NAME, listener::SupportedListenerClasses}; +use crate::crd::constants::APP_NAME; pub const CONNECT_CONTROLLER_NAME: &str = "connect"; pub const CONNECT_FULL_CONTROLLER_NAME: &str = concatcp!( @@ -106,11 +106,7 @@ pub mod versioned { #[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] - pub struct SparkConnectServerClusterConfig { - /// This field controls which [ListenerClass](DOCS_BASE_URL_PLACEHOLDER/listener-operator/listenerclass.html) is used to expose the Spark services. - #[serde(default)] - pub listener_class: SupportedListenerClasses, - } + pub struct SparkConnectServerClusterConfig {} #[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)] #[fragment_attrs( @@ -137,6 +133,10 @@ pub mod versioned { /// This can be shortened by the `maxCertificateLifetime` setting on the SecretClass issuing the TLS certificate. #[fragment_attrs(serde(default))] pub requested_secret_lifetime: Option, + + /// This field controls which [ListenerClass](DOCS_BASE_URL_PLACEHOLDER/listener-operator/listenerclass.html) is used to expose the Spark services. + #[serde(default)] + pub listener_class: String, } #[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)] @@ -225,6 +225,7 @@ impl v1alpha1::ServerConfig { }, logging: product_logging::spec::default_logging(), requested_secret_lifetime: Some(Self::DEFAULT_CONNECT_SECRET_LIFETIME), + listener_class: Some("cluster-internal".into()), } } diff --git a/rust/operator-binary/src/connect/server.rs b/rust/operator-binary/src/connect/server.rs index f445839e..fdce6b11 100644 --- a/rust/operator-binary/src/connect/server.rs +++ b/rust/operator-binary/src/connect/server.rs @@ -16,7 +16,10 @@ use stackable_operator::{ }, }, }, - commons::product_image_selection::ResolvedProductImage, + commons::{ + listener::{Listener, ListenerPort}, + product_image_selection::ResolvedProductImage, + }, k8s_openapi::{ DeepMerge, api::{ @@ -42,12 +45,15 @@ use crate::{ SparkConnectContainer, v1alpha1, }, }, - crd::constants::{ - APP_NAME, JVM_SECURITY_PROPERTIES_FILE, LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, - LOG4J2_CONFIG_FILE, MAX_SPARK_LOG_FILES_SIZE, METRICS_PROPERTIES_FILE, POD_TEMPLATE_FILE, - SPARK_DEFAULTS_FILE_NAME, SPARK_UID, VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_NAME_LOG, - VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_CONFIG, VOLUME_MOUNT_PATH_LOG, - VOLUME_MOUNT_PATH_LOG_CONFIG, + crd::{ + constants::{ + APP_NAME, JVM_SECURITY_PROPERTIES_FILE, LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, + LOG4J2_CONFIG_FILE, MAX_SPARK_LOG_FILES_SIZE, METRICS_PROPERTIES_FILE, + POD_TEMPLATE_FILE, SPARK_DEFAULTS_FILE_NAME, SPARK_UID, VOLUME_MOUNT_NAME_CONFIG, + VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_CONFIG, + VOLUME_MOUNT_PATH_LOG, VOLUME_MOUNT_PATH_LOG_CONFIG, + }, + listener, }, product_logging, }; @@ -58,6 +64,9 @@ const HTTP: &str = "http"; #[derive(Snafu, Debug)] #[allow(clippy::enum_variant_names)] pub enum Error { + #[snafu(display("failed to build spark connect listener"))] + BuildListener { source: crate::crd::listener::Error }, + #[snafu(display("failed to build listener volume"))] BuildListenerVolume { source: ListenerOperatorVolumeSourceBuilderError, @@ -177,11 +186,7 @@ pub(crate) fn server_config_map( .add_data(JVM_SECURITY_PROPERTIES_FILE, jvm_sec_props) .add_data(METRICS_PROPERTIES_FILE, metrics_props); - let role_group_ref = RoleGroupRef { - cluster: ObjectRef::from_obj(scs), - role: SparkConnectRole::Server.to_string(), - role_group: DUMMY_SPARK_CONNECT_GROUP_NAME.to_string(), - }; + let role_group_ref = dummy_role_group_ref(scs); product_logging::extend_config_map( &role_group_ref, &config.logging, @@ -324,28 +329,19 @@ pub(crate) fn build_stateful_set( } // Add listener volume - let listener_class = &scs.spec.cluster_config.listener_class; - let pvcs = if listener_class.discoverable() { - // externally reachable listener endpoints will use persistent volumes - // so that load balancers can hard-code the target addresses - let pvc = ListenerOperatorVolumeSourceBuilder::new( - &ListenerReference::ListenerClass(listener_class.to_string()), + // Listener endpoints for the Webserver role will use persistent volumes + // so that load balancers can hard-code the target addresses. This will + // be the case even when no class is set (and the value defaults to + // cluster-internal) as the address should still be consistent. + let volume_claim_templates = Some(vec![ + ListenerOperatorVolumeSourceBuilder::new( + &ListenerReference::ListenerName(dummy_role_group_ref(scs).object_name()), &recommended_labels, ) .context(BuildListenerVolumeSnafu)? .build_pvc(LISTENER_VOLUME_NAME.to_string()) - .context(BuildListenerVolumeSnafu)?; - Some(vec![pvc]) - } else { - // non-reachable endpoints use ephemeral volumes - pb.add_listener_volume_by_listener_class( - LISTENER_VOLUME_NAME, - &listener_class.to_string(), - &recommended_labels, - ) - .context(AddVolumeSnafu)?; - None - }; + .context(BuildListenerVolumeSnafu)?, + ]); // Merge user defined pod template if available let mut pod_template = pb.build_template(); @@ -369,7 +365,7 @@ pub(crate) fn build_stateful_set( spec: Some(StatefulSetSpec { template: pod_template, replicas: Some(1), - volume_claim_templates: pvcs, + volume_claim_templates, selector: LabelSelector { match_labels: Some( Labels::role_group_selector( @@ -605,3 +601,47 @@ fn probe() -> Probe { ..Probe::default() } } + +fn dummy_role_group_ref( + scs: &v1alpha1::SparkConnectServer, +) -> RoleGroupRef { + RoleGroupRef { + cluster: ObjectRef::from_obj(scs), + role: SparkConnectRole::Server.to_string(), + role_group: DUMMY_SPARK_CONNECT_GROUP_NAME.to_string(), + } +} + +pub(crate) fn build_listener( + scs: &v1alpha1::SparkConnectServer, + config: &v1alpha1::ServerConfig, + resolved_product_image: &ResolvedProductImage, +) -> Result { + let listener_name = dummy_role_group_ref(scs).object_name(); + let listener_class = config.listener_class.clone(); + let role = SparkConnectRole::Server.to_string(); + let recommended_object_labels = + common::labels(scs, &resolved_product_image.app_version_label, &role); + + let listener_ports = [ + ListenerPort { + name: GRPC.to_string(), + port: CONNECT_GRPC_PORT, + protocol: Some("TCP".to_string()), + }, + ListenerPort { + name: HTTP.to_string(), + port: CONNECT_UI_PORT, + protocol: Some("TCP".to_string()), + }, + ]; + + listener::build_listener( + scs, + &listener_name, + &listener_class, + recommended_object_labels, + &listener_ports, + ) + .context(BuildListenerSnafu) +} diff --git a/rust/operator-binary/src/crd/listener.rs b/rust/operator-binary/src/crd/listener.rs index 1f3331ec..4a3b7151 100644 --- a/rust/operator-binary/src/crd/listener.rs +++ b/rust/operator-binary/src/crd/listener.rs @@ -1,35 +1,48 @@ -use serde::{Deserialize, Serialize}; +use snafu::{ResultExt, Snafu}; use stackable_operator::{ - config::merge::Atomic, - schemars::{self, JsonSchema}, + builder::meta::ObjectMetaBuilder, + commons::listener::{Listener, ListenerPort, ListenerSpec}, + kube::Resource, + kvp::ObjectLabels, }; -use strum::Display; +use strum::{EnumDiscriminants, IntoStaticStr}; -impl Atomic for SupportedListenerClasses {} +#[derive(Snafu, Debug, EnumDiscriminants)] +#[strum_discriminants(derive(IntoStaticStr))] +#[allow(clippy::enum_variant_names)] +pub enum Error { + #[snafu(display("object is missing metadata to build owner reference"))] + ObjectMissingMetadataForOwnerRef { + source: stackable_operator::builder::meta::Error, + }, -#[derive(Clone, Debug, Default, Display, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] -#[serde(rename_all = "PascalCase")] -pub enum SupportedListenerClasses { - #[default] - #[serde(rename = "cluster-internal")] - #[strum(serialize = "cluster-internal")] - ClusterInternal, - - #[serde(rename = "external-unstable")] - #[strum(serialize = "external-unstable")] - ExternalUnstable, - - #[serde(rename = "external-stable")] - #[strum(serialize = "external-stable")] - ExternalStable, + #[snafu(display("failed to build object meta data"))] + ObjectMeta { + source: stackable_operator::builder::meta::Error, + }, } -impl SupportedListenerClasses { - pub fn discoverable(&self) -> bool { - match self { - SupportedListenerClasses::ClusterInternal => false, - SupportedListenerClasses::ExternalUnstable => true, - SupportedListenerClasses::ExternalStable => true, - } - } +pub fn build_listener>( + resource: &T, + listener_name: &str, + listener_class: &str, + listener_labels: ObjectLabels, + listener_ports: &[ListenerPort], +) -> Result { + Ok(Listener { + metadata: ObjectMetaBuilder::new() + .name_and_namespace(resource) + .name(listener_name) + .ownerreference_from_resource(resource, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(listener_labels) + .context(ObjectMetaSnafu)? + .build(), + spec: ListenerSpec { + class_name: Some(listener_class.into()), + ports: Some(listener_ports.to_vec()), + ..ListenerSpec::default() + }, + status: None, + }) } diff --git a/tests/templates/kuttl/spark-connect/10-deploy-spark-connect.yaml.j2 b/tests/templates/kuttl/spark-connect/10-deploy-spark-connect.yaml.j2 index 3e6464b0..20b3f0e2 100644 --- a/tests/templates/kuttl/spark-connect/10-deploy-spark-connect.yaml.j2 +++ b/tests/templates/kuttl/spark-connect/10-deploy-spark-connect.yaml.j2 @@ -49,6 +49,7 @@ spec: add: - -Dmy.custom.jvm.arg=customValue config: + listenerClass: external-unstable logging: enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} containers: diff --git a/tests/templates/kuttl/spark-connect/20-run-connect-client.yaml.j2 b/tests/templates/kuttl/spark-connect/20-run-connect-client.yaml.j2 index 99f97902..d78c2572 100644 --- a/tests/templates/kuttl/spark-connect/20-run-connect-client.yaml.j2 +++ b/tests/templates/kuttl/spark-connect/20-run-connect-client.yaml.j2 @@ -22,7 +22,7 @@ spec: [ "/usr/bin/python", "/stackable/spark-connect-examples/python/simple-connect-app.py", - "sc://spark-connect-server", + "sc://spark-connect-server-default", ] resources: limits: From fcd5d3a545e290012814fec9bbdeef6691aff556 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Tue, 29 Apr 2025 17:17:52 +0200 Subject: [PATCH 10/19] history: refactor to use listener::build_listener --- rust/operator-binary/src/crd/history.rs | 7 --- .../src/history/history_controller.rs | 62 ++++++++----------- .../spark-history-server/20-test-logs.yaml | 2 +- 3 files changed, 27 insertions(+), 44 deletions(-) diff --git a/rust/operator-binary/src/crd/history.rs b/rust/operator-binary/src/crd/history.rs index 3f98f6d7..272033d0 100644 --- a/rust/operator-binary/src/crd/history.rs +++ b/rust/operator-binary/src/crd/history.rs @@ -107,13 +107,6 @@ pub mod versioned { } impl v1alpha1::SparkHistoryServer { - /// The name of the group-listener provided for a specific role-group. - /// History servers will use this group listener so that only one load balancer - /// is needed (per role group). - pub fn group_listener_name(&self, rolegroup: &RoleGroupRef) -> String { - format!("{}-group", rolegroup.object_name()) - } - /// Returns a reference to the role. Raises an error if the role is not defined. pub fn role(&self) -> &Role { &self.spec.nodes diff --git a/rust/operator-binary/src/history/history_controller.rs b/rust/operator-binary/src/history/history_controller.rs index 6f69db45..83e1fb02 100644 --- a/rust/operator-binary/src/history/history_controller.rs +++ b/rust/operator-binary/src/history/history_controller.rs @@ -22,7 +22,7 @@ use stackable_operator::{ }, cluster_resources::{ClusterResourceApplyStrategy, ClusterResources}, commons::{ - listener::{Listener, ListenerPort, ListenerSpec}, + listener::{Listener, ListenerPort}, product_image_selection::ResolvedProductImage, }, k8s_openapi::{ @@ -66,6 +66,7 @@ use crate::{ VOLUME_MOUNT_PATH_LOG, VOLUME_MOUNT_PATH_LOG_CONFIG, }, history::{self, HistoryConfig, SparkHistoryServerContainer, v1alpha1}, + listener, logdir::ResolvedLogDir, tlscerts, to_spark_env_sh_string, }, @@ -82,6 +83,9 @@ pub enum Error { source: stackable_operator::builder::meta::Error, }, + #[snafu(display("failed to build spark history group listener"))] + BuildListener { source: crate::crd::listener::Error }, + #[snafu(display("failed to build listener volume"))] BuildListenerVolume { source: ListenerOperatorVolumeSourceBuilderError, @@ -363,41 +367,27 @@ fn build_group_listener( rolegroup: &RoleGroupRef, listener_class: String, ) -> Result { - Ok(Listener { - metadata: ObjectMetaBuilder::new() - .name_and_namespace(shs) - .name(shs.group_listener_name(rolegroup)) - .ownerreference_from_resource(shs, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(labels( - shs, - &resolved_product_image.app_version_label, - &rolegroup.role_group, - )) - .context(ObjectMetaSnafu)? - .build(), - spec: ListenerSpec { - class_name: Some(listener_class), - ports: Some(listener_ports()), - ..ListenerSpec::default() - }, - status: None, - }) -} + let listener_name = rolegroup.object_name(); + let recommended_object_labels = labels( + shs, + &resolved_product_image.app_version_label, + &rolegroup.role_group, + ); -fn listener_ports() -> Vec { - vec![ - ListenerPort { - name: "metrics".to_string(), - port: METRICS_PORT.into(), - protocol: Some("TCP".to_string()), - }, - ListenerPort { - name: "http".to_string(), - port: HISTORY_UI_PORT.into(), - protocol: Some("TCP".to_string()), - }, - ] + let listener_ports = [ListenerPort { + name: "http".to_string(), + port: HISTORY_UI_PORT.into(), + protocol: Some("TCP".to_string()), + }]; + + listener::build_listener( + shs, + &listener_name, + &listener_class, + recommended_object_labels, + &listener_ports, + ) + .context(BuildListenerSnafu) } pub fn error_policy( @@ -607,7 +597,7 @@ fn build_stateful_set( // cluster-internal) as the address should still be consistent. let volume_claim_templates = Some(vec![ ListenerOperatorVolumeSourceBuilder::new( - &ListenerReference::ListenerName(shs.group_listener_name(rolegroupref)), + &ListenerReference::ListenerName(rolegroupref.object_name()), &recommended_labels, ) .context(BuildListenerVolumeSnafu)? diff --git a/tests/templates/kuttl/spark-history-server/20-test-logs.yaml b/tests/templates/kuttl/spark-history-server/20-test-logs.yaml index 5d7e6484..68c36402 100644 --- a/tests/templates/kuttl/spark-history-server/20-test-logs.yaml +++ b/tests/templates/kuttl/spark-history-server/20-test-logs.yaml @@ -16,5 +16,5 @@ spec: "bash", "-x", "-c", - "test 2 == $(curl http://spark-history-node-default-group:18080/api/v1/applications | jq length)", + "test 2 == $(curl http://spark-history-node-default:18080/api/v1/applications | jq length)", ] From 6a2f32a3c64a6cdd4ec4a795a41bb44bf31bc4f8 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Tue, 29 Apr 2025 17:24:39 +0200 Subject: [PATCH 11/19] connect: remove traces of deployment --- rust/operator-binary/src/connect/controller.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/rust/operator-binary/src/connect/controller.rs b/rust/operator-binary/src/connect/controller.rs index 42041a5c..eff55179 100644 --- a/rust/operator-binary/src/connect/controller.rs +++ b/rust/operator-binary/src/connect/controller.rs @@ -58,8 +58,8 @@ pub enum Error { #[snafu(display("failed to build spark connect server config map for {name}"))] BuildServerConfigMap { source: server::Error, name: String }, - #[snafu(display("failed to build spark connect deployment"))] - BuildServerDeployment { source: server::Error }, + #[snafu(display("failed to build spark connect stateful set"))] + BuildServerStatefulSet { source: server::Error }, #[snafu(display("failed to update status of spark connect server {name}"))] ApplyStatus { @@ -70,8 +70,8 @@ pub enum Error { #[snafu(display("spark connect object has no namespace"))] ObjectHasNoNamespace, - #[snafu(display("failed to update the connect server deployment"))] - ApplyDeployment { + #[snafu(display("failed to update the connect server stateful set"))] + ApplyStatefulSet { source: stackable_operator::cluster_resources::Error, }, @@ -274,7 +274,7 @@ pub async fn reconcile( // ======================================== // Server stateful set let args = server::command_args(&scs.spec.args); - let deployment = server::build_stateful_set( + let stateful_set = server::build_stateful_set( scs, &server_config, &resolved_product_image, @@ -282,7 +282,7 @@ pub async fn reconcile( &server_config_map, args, ) - .context(BuildServerDeploymentSnafu)?; + .context(BuildServerStatefulSetSnafu)?; // ======================================== // Server listener @@ -298,9 +298,9 @@ pub async fn reconcile( ss_cond_builder.add( cluster_resources - .add(client, deployment) + .add(client, stateful_set) .await - .context(ApplyDeploymentSnafu)?, + .context(ApplyStatefulSetSnafu)?, ); cluster_resources From d9d8c6e4b42088b84941221a35a147fc486eff7d Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Tue, 29 Apr 2025 17:44:46 +0200 Subject: [PATCH 12/19] history: refactor rbac as per todo --- .../src/history/history_controller.rs | 88 +++++-------------- 1 file changed, 23 insertions(+), 65 deletions(-) diff --git a/rust/operator-binary/src/history/history_controller.rs b/rust/operator-binary/src/history/history_controller.rs index 83e1fb02..f4ddfbc8 100644 --- a/rust/operator-binary/src/history/history_controller.rs +++ b/rust/operator-binary/src/history/history_controller.rs @@ -24,13 +24,13 @@ use stackable_operator::{ commons::{ listener::{Listener, ListenerPort}, product_image_selection::ResolvedProductImage, + rbac::build_rbac_resources, }, k8s_openapi::{ DeepMerge, api::{ apps::v1::{StatefulSet, StatefulSetSpec}, core::v1::{ConfigMap, PodSecurityContext, ServiceAccount}, - rbac::v1::{ClusterRole, RoleBinding, RoleRef, Subject}, }, apimachinery::pkg::apis::meta::v1::LabelSelector, }, @@ -60,10 +60,10 @@ use crate::{ ACCESS_KEY_ID, APP_NAME, HISTORY_CONTROLLER_NAME, HISTORY_ROLE_NAME, HISTORY_UI_PORT, JVM_SECURITY_PROPERTIES_FILE, LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, MAX_SPARK_LOG_FILES_SIZE, METRICS_PORT, OPERATOR_NAME, SECRET_ACCESS_KEY, - SPARK_CLUSTER_ROLE, SPARK_DEFAULTS_FILE_NAME, SPARK_ENV_SH_FILE_NAME, - SPARK_IMAGE_BASE_NAME, SPARK_UID, STACKABLE_TRUST_STORE, VOLUME_MOUNT_NAME_CONFIG, - VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_CONFIG, - VOLUME_MOUNT_PATH_LOG, VOLUME_MOUNT_PATH_LOG_CONFIG, + SPARK_DEFAULTS_FILE_NAME, SPARK_ENV_SH_FILE_NAME, SPARK_IMAGE_BASE_NAME, SPARK_UID, + STACKABLE_TRUST_STORE, VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_NAME_LOG, + VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_CONFIG, VOLUME_MOUNT_PATH_LOG, + VOLUME_MOUNT_PATH_LOG_CONFIG, }, history::{self, HistoryConfig, SparkHistoryServerContainer, v1alpha1}, listener, @@ -78,9 +78,9 @@ use crate::{ #[strum_discriminants(derive(IntoStaticStr))] #[allow(clippy::enum_variant_names)] pub enum Error { - #[snafu(display("failed to build object meta data"))] - ObjectMeta { - source: stackable_operator::builder::meta::Error, + #[snafu(display("failed to build RBAC resources"))] + BuildRbacResources { + source: stackable_operator::commons::rbac::Error, }, #[snafu(display("failed to build spark history group listener"))] @@ -113,8 +113,8 @@ pub enum Error { source: stackable_operator::builder::meta::Error, }, - #[snafu(display("failed to update the history server deployment"))] - ApplyDeployment { + #[snafu(display("failed to update the history server stateful set"))] + ApplyStatefulSet { source: stackable_operator::cluster_resources::Error, }, @@ -123,11 +123,6 @@ pub enum Error { source: stackable_operator::cluster_resources::Error, }, - #[snafu(display("failed to update history server service"))] - ApplyService { - source: stackable_operator::cluster_resources::Error, - }, - #[snafu(display("failed to apply role ServiceAccount"))] ApplyServiceAccount { source: stackable_operator::cluster_resources::Error, @@ -275,14 +270,20 @@ pub async fn reconcile( .context(LogDirSnafu)?; // Use a dedicated service account for history server pods. - let (serviceaccount, rolebinding) = - build_history_role_serviceaccount(shs, &resolved_product_image.app_version_label)?; - let serviceaccount = cluster_resources - .add(client, serviceaccount) + let (service_account, role_binding) = build_rbac_resources( + shs, + APP_NAME, + cluster_resources + .get_required_labels() + .context(GetRequiredLabelsSnafu)?, + ) + .context(BuildRbacResourcesSnafu)?; + let service_account = cluster_resources + .add(client, service_account) .await .context(ApplyServiceAccountSnafu)?; cluster_resources - .add(client, rolebinding) + .add(client, role_binding) .await .context(ApplyRoleBindingSnafu)?; @@ -322,12 +323,12 @@ pub async fn reconcile( &rgr, &log_dir, &merged_config, - &serviceaccount, + &service_account, )?; cluster_resources .add(client, sts) .await - .context(ApplyDeploymentSnafu)?; + .context(ApplyStatefulSetSnafu)?; let rg_group_listener = build_group_listener( shs, @@ -674,49 +675,6 @@ fn build_stateful_set( }) } -// TODO: This function should be replaced with operator-rs build_rbac_resources. -// See: https://github.com/stackabletech/spark-k8s-operator/issues/499 -#[allow(clippy::result_large_err)] -fn build_history_role_serviceaccount( - shs: &v1alpha1::SparkHistoryServer, - app_version_label: &str, -) -> Result<(ServiceAccount, RoleBinding), Error> { - let sa = ServiceAccount { - metadata: ObjectMetaBuilder::new() - .name_and_namespace(shs) - .ownerreference_from_resource(shs, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(labels(shs, app_version_label, HISTORY_CONTROLLER_NAME)) - .context(MetadataBuildSnafu)? - .build(), - ..ServiceAccount::default() - }; - let binding = RoleBinding { - metadata: ObjectMetaBuilder::new() - .name_and_namespace(shs) - .ownerreference_from_resource(shs, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(labels(shs, app_version_label, HISTORY_CONTROLLER_NAME)) - .context(MetadataBuildSnafu)? - .build(), - role_ref: RoleRef { - api_group: ::GROUP // need to fully qualify because of "Resource" name clash - .to_string(), - kind: ::KIND.to_string(), - name: SPARK_CLUSTER_ROLE.to_string(), - }, - subjects: Some(vec![Subject { - api_group: Some( - ::GROUP.to_string(), - ), - kind: ::KIND.to_string(), - name: sa.name_any(), - namespace: sa.namespace(), - }]), - }; - Ok((sa, binding)) -} - #[allow(clippy::result_large_err)] fn spark_defaults( shs: &v1alpha1::SparkHistoryServer, From 3ed79fc7fc93829f91d2875df5fe045f3a383095 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Tue, 29 Apr 2025 17:53:32 +0200 Subject: [PATCH 13/19] docs: update --- docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc b/docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc index 34d2cc83..f2ab3adf 100644 --- a/docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc +++ b/docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc @@ -4,7 +4,7 @@ == History services The operator deploys a xref:listener-operator:listener.adoc[Listener] for each spark history pod. -The default is to only being accessible from within the Kubernetes cluster, but this can be changed by setting `.spec.clusterConfig.listenerClass`: +The default is to only being accessible from within the Kubernetes cluster, but this can be changed by setting `.spec.nodes.config.listenerClass`: [source,yaml] ---- @@ -19,7 +19,7 @@ spec: ---- <1> Specify one of `external-stable`, `external-unstable`, `cluster-internal` (the default setting is `cluster-internal`). -For the example above, the listener operator creates a service named `spark-history-node-default-0-listener` where `spark-history` is the name of the SparkHistoryServer, `node` is the service role (the only service role available for history servers) and `default` is the role group. +For the example above, the listener operator creates a service named `spark-history-node-default` where `spark-history` is the name of the SparkHistoryServer, `node` is the service role (the only service role available for history servers) and `default` is the role group. == Connect services From 2709f9525d1e9adf735932f4bcbf37b7e307799b Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Wed, 30 Apr 2025 12:25:52 +0200 Subject: [PATCH 14/19] chore: remove set command to make jenkins test work --- tests/templates/kuttl/spark-connect/10-assert.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/templates/kuttl/spark-connect/10-assert.yaml b/tests/templates/kuttl/spark-connect/10-assert.yaml index 8a5162de..826a7888 100644 --- a/tests/templates/kuttl/spark-connect/10-assert.yaml +++ b/tests/templates/kuttl/spark-connect/10-assert.yaml @@ -18,7 +18,6 @@ commands: # Sleep to prevent the following spark connect app from failing # while the spark-connect server is busy setting up the executors. - script: | - set -xou pipefail sleep 10 EXECUTOR_COUNT=$(kubectl get pods -n $NAMESPACE --selector 'spark-app-name=spark-connect-server' --field-selector='status.phase=Running' -o NAME|wc -l) test 1 -eq "$EXECUTOR_COUNT" From 73b62457f9652d8e01eac3de121ce4be5b6e159b Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Fri, 2 May 2025 12:19:44 +0200 Subject: [PATCH 15/19] Update CHANGELOG.md Co-authored-by: Nick <10092581+NickLarsenNZ@users.noreply.github.com> --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 75b0bd91..1f0168a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,7 @@ All notable changes to this project will be documented in this file. - Use `--file-log-max-files` (or `FILE_LOG_MAX_FILES`) to limit the number of log files kept. - Use `--file-log-rotation-period` (or `FILE_LOG_ROTATION_PERIOD`) to configure the frequency of rotation. - Use `--console-log-format` (or `CONSOLE_LOG_FORMAT`) to set the format to `plain` (default) or `json`. -- Expose history and connect services via listener classes ([#562]) +- Expose history and connect services via listener classes ([#562]). ### Changed From 851e25ad557f5bc25be7a6239f69d1e33bd8fdba Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Fri, 2 May 2025 12:22:36 +0200 Subject: [PATCH 16/19] Update rust/operator-binary/src/connect/server.rs Co-authored-by: Nick <10092581+NickLarsenNZ@users.noreply.github.com> --- rust/operator-binary/src/connect/server.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/operator-binary/src/connect/server.rs b/rust/operator-binary/src/connect/server.rs index fdce6b11..c4efbc87 100644 --- a/rust/operator-binary/src/connect/server.rs +++ b/rust/operator-binary/src/connect/server.rs @@ -412,8 +412,8 @@ pub(crate) fn build_internal_service( .with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?) .build(), spec: Some(ServiceSpec { - type_: Some("ClusterIP".to_string()), - cluster_ip: Some("None".to_string()), + type_: Some("ClusterIP".to_owned()), + cluster_ip: Some("None".to_owned()), ports: Some(vec![ ServicePort { name: Some(String::from(GRPC)), From 2355ded657d3ba504156dfa86a4de8e468bd9508 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Fri, 2 May 2025 12:22:51 +0200 Subject: [PATCH 17/19] Update rust/operator-binary/src/crd/history.rs Co-authored-by: Nick <10092581+NickLarsenNZ@users.noreply.github.com> --- rust/operator-binary/src/crd/history.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/operator-binary/src/crd/history.rs b/rust/operator-binary/src/crd/history.rs index 272033d0..ec9b3db3 100644 --- a/rust/operator-binary/src/crd/history.rs +++ b/rust/operator-binary/src/crd/history.rs @@ -401,7 +401,7 @@ impl Configuration for HistoryConfigFragment { } fn default_listener_class() -> String { - "cluster-internal".to_string() + "cluster-internal".to_owned() } #[cfg(test)] From 9889fb5c6f66e2eff44307fa7f8d59e51bbacd7d Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Fri, 2 May 2025 12:42:49 +0200 Subject: [PATCH 18/19] doc: mention cluster_config usage status --- deploy/helm/spark-k8s-operator/crds/crds.yaml | 4 ++-- rust/operator-binary/src/connect/crd.rs | 1 + rust/operator-binary/src/crd/history.rs | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/deploy/helm/spark-k8s-operator/crds/crds.yaml b/deploy/helm/spark-k8s-operator/crds/crds.yaml index 5fd1ed83..432d7b97 100644 --- a/deploy/helm/spark-k8s-operator/crds/crds.yaml +++ b/deploy/helm/spark-k8s-operator/crds/crds.yaml @@ -1119,7 +1119,7 @@ spec: properties: clusterConfig: default: {} - description: Global Spark history server configuration that applies to all roles and role groups. + description: Global Spark history server configuration that applies to all roles and role groups. Currently not in use. type: object image: anyOf: @@ -1867,7 +1867,7 @@ spec: type: array clusterConfig: default: {} - description: Global Spark Connect server configuration that applies to all roles. + description: Global Spark Connect server configuration that applies to all roles. Currently not in use. type: object clusterOperation: default: diff --git a/rust/operator-binary/src/connect/crd.rs b/rust/operator-binary/src/connect/crd.rs index 9023fe71..673ad8c5 100644 --- a/rust/operator-binary/src/connect/crd.rs +++ b/rust/operator-binary/src/connect/crd.rs @@ -79,6 +79,7 @@ pub mod versioned { pub image: ProductImage, /// Global Spark Connect server configuration that applies to all roles. + /// Currently not in use. #[serde(default)] pub cluster_config: v1alpha1::SparkConnectServerClusterConfig, diff --git a/rust/operator-binary/src/crd/history.rs b/rust/operator-binary/src/crd/history.rs index ec9b3db3..80a7dfc8 100644 --- a/rust/operator-binary/src/crd/history.rs +++ b/rust/operator-binary/src/crd/history.rs @@ -82,6 +82,7 @@ pub mod versioned { pub image: ProductImage, /// Global Spark history server configuration that applies to all roles and role groups. + /// Currently not in use. #[serde(default)] pub cluster_config: v1alpha1::SparkHistoryServerClusterConfig, From 1a6efb8835f9a1f2292c1884083102b8049dd1dd Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Fri, 2 May 2025 14:26:04 +0200 Subject: [PATCH 19/19] review feedback --- deploy/helm/spark-k8s-operator/crds/crds.yaml | 10 ++++++++-- rust/operator-binary/src/connect/crd.rs | 4 +++- rust/operator-binary/src/crd/history.rs | 6 ++++-- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/deploy/helm/spark-k8s-operator/crds/crds.yaml b/deploy/helm/spark-k8s-operator/crds/crds.yaml index 432d7b97..52a61f09 100644 --- a/deploy/helm/spark-k8s-operator/crds/crds.yaml +++ b/deploy/helm/spark-k8s-operator/crds/crds.yaml @@ -1119,7 +1119,10 @@ spec: properties: clusterConfig: default: {} - description: Global Spark history server configuration that applies to all roles and role groups. Currently not in use. + description: |- + Global Spark history server configuration that applies to all roles. + + This was previously used to hold the listener configuration, which has since moved to the role configuration. type: object image: anyOf: @@ -1867,7 +1870,10 @@ spec: type: array clusterConfig: default: {} - description: Global Spark Connect server configuration that applies to all roles. Currently not in use. + description: |- + Global Spark Connect server configuration that applies to all roles. + + This was previously used to hold the listener configuration, which has since moved to the server configuration. type: object clusterOperation: default: diff --git a/rust/operator-binary/src/connect/crd.rs b/rust/operator-binary/src/connect/crd.rs index 673ad8c5..e478f1c1 100644 --- a/rust/operator-binary/src/connect/crd.rs +++ b/rust/operator-binary/src/connect/crd.rs @@ -79,7 +79,9 @@ pub mod versioned { pub image: ProductImage, /// Global Spark Connect server configuration that applies to all roles. - /// Currently not in use. + /// + /// This was previously used to hold the listener configuration, which has since moved + /// to the server configuration. #[serde(default)] pub cluster_config: v1alpha1::SparkConnectServerClusterConfig, diff --git a/rust/operator-binary/src/crd/history.rs b/rust/operator-binary/src/crd/history.rs index 80a7dfc8..54e66756 100644 --- a/rust/operator-binary/src/crd/history.rs +++ b/rust/operator-binary/src/crd/history.rs @@ -81,8 +81,10 @@ pub mod versioned { pub struct SparkHistoryServerSpec { pub image: ProductImage, - /// Global Spark history server configuration that applies to all roles and role groups. - /// Currently not in use. + /// Global Spark history server configuration that applies to all roles. + /// + /// This was previously used to hold the listener configuration, which has since moved + /// to the role configuration. #[serde(default)] pub cluster_config: v1alpha1::SparkHistoryServerClusterConfig,