diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c0168c5..1f0168a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file. - Use `--file-log-max-files` (or `FILE_LOG_MAX_FILES`) to limit the number of log files kept. - Use `--file-log-rotation-period` (or `FILE_LOG_ROTATION_PERIOD`) to configure the frequency of rotation. - Use `--console-log-format` (or `CONSOLE_LOG_FORMAT`) to set the format to `plain` (default) or `json`. +- Expose history and connect services via listener classes ([#562]). ### Changed @@ -35,6 +36,7 @@ All notable changes to this project will be documented in this file. [#554]: https://github.com/stackabletech/spark-k8s-operator/pull/554 [#559]: https://github.com/stackabletech/spark-k8s-operator/pull/559 [#560]: https://github.com/stackabletech/spark-k8s-operator/pull/560 +[#562]: https://github.com/stackabletech/spark-k8s-operator/pull/562 ## [25.3.0] - 2025-03-21 diff --git a/deploy/helm/spark-k8s-operator/crds/crds.yaml b/deploy/helm/spark-k8s-operator/crds/crds.yaml index ab67da5b..52a61f09 100644 --- a/deploy/helm/spark-k8s-operator/crds/crds.yaml +++ b/deploy/helm/spark-k8s-operator/crds/crds.yaml @@ -1118,27 +1118,11 @@ spec: description: A Spark cluster history server component. This resource is managed by the Stackable operator for Apache Spark. Find more information on how to use it in the [operator documentation](https://docs.stackable.tech/home/nightly/spark-k8s/usage-guide/history-server). properties: clusterConfig: - default: - listenerClass: cluster-internal - description: Global Spark history server configuration that applies to all roles and role groups. - properties: - listenerClass: - default: cluster-internal - description: |- - This field controls which type of Service the Operator creates for this HistoryServer: - - * cluster-internal: Use a ClusterIP service - - * external-unstable: Use a NodePort service - - * external-stable: Use a LoadBalancer service + default: {} + description: |- + Global Spark history server configuration that applies to all roles. - This is a temporary solution with the goal to keep yaml manifests forward compatible. In the future, this setting will control which ListenerClass will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change. - enum: - - cluster-internal - - external-unstable - - external-stable - type: string + This was previously used to hold the listener configuration, which has since moved to the role configuration. type: object image: anyOf: @@ -1395,6 +1379,9 @@ spec: cleaner: nullable: true type: boolean + listenerClass: + nullable: true + type: string logging: default: containers: {} @@ -1641,6 +1628,9 @@ spec: cleaner: nullable: true type: boolean + listenerClass: + nullable: true + type: string logging: default: containers: {} @@ -1879,27 +1869,11 @@ spec: type: string type: array clusterConfig: - default: - listenerClass: external-unstable - description: Global Spark Connect server configuration that applies to all roles. - properties: - listenerClass: - default: external-unstable - description: |- - This field controls which type of Service the Operator creates for this ConnectServer: - - * cluster-internal: Use a ClusterIP service - - * external-unstable: Use a NodePort service - - * external-stable: Use a LoadBalancer service + default: {} + description: |- + Global Spark Connect server configuration that applies to all roles. - This is a temporary solution with the goal to keep yaml manifests forward compatible. In the future, this setting will control which ListenerClass will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change. - enum: - - cluster-internal - - external-unstable - - external-stable - type: string + This was previously used to hold the listener configuration, which has since moved to the server configuration. type: object clusterOperation: default: @@ -2191,6 +2165,10 @@ spec: config: default: {} properties: + listenerClass: + description: This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) is used to expose the Spark services. + nullable: true + type: string logging: default: containers: {} diff --git a/deploy/helm/spark-k8s-operator/templates/roles.yaml b/deploy/helm/spark-k8s-operator/templates/roles.yaml index 8a7fd882..cd64b380 100644 --- a/deploy/helm/spark-k8s-operator/templates/roles.yaml +++ b/deploy/helm/spark-k8s-operator/templates/roles.yaml @@ -133,3 +133,14 @@ rules: - bind resourceNames: - {{ include "operator.name" . }}-clusterrole + - apiGroups: + - listeners.stackable.tech + resources: + - listeners + verbs: + - get + - list + - watch + - patch + - create + - delete diff --git a/docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc b/docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc index 7df222d0..f2ab3adf 100644 --- a/docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc +++ b/docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc @@ -1,18 +1,26 @@ -= Service exposition with ListenerClasses += Service exposition with listener classes +:description: Configure the Spark connect and history services exposure with listener classes: cluster-internal, external-unstable, or external-stable. -The Spark operator deploys SparkApplications, and does not offer a UI or other API, so no services are exposed. -However, the operator can also deploy HistoryServers, which do offer a UI and API. -The operator deploys a service called `-historyserver` (where `` is the name of the spark application) through which the HistoryServer can be reached. +== History services -This service can have three different types: `cluster-internal`, `external-unstable` and `external-stable`. -Read more about the types in the xref:concepts:service-exposition.adoc[service exposition] documentation at platform level. - -This is how the ListenerClass is configured: +The operator deploys a xref:listener-operator:listener.adoc[Listener] for each spark history pod. +The default is to only being accessible from within the Kubernetes cluster, but this can be changed by setting `.spec.nodes.config.listenerClass`: [source,yaml] ---- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkHistoryServer +metadata: + name: spark-history spec: - clusterConfig: - listenerClass: cluster-internal # <1> + nodes: + config: + listenerClass: external-unstable # <1> ---- -<1> The default `cluster-internal` setting. +<1> Specify one of `external-stable`, `external-unstable`, `cluster-internal` (the default setting is `cluster-internal`). + +For the example above, the listener operator creates a service named `spark-history-node-default` where `spark-history` is the name of the SparkHistoryServer, `node` is the service role (the only service role available for history servers) and `default` is the role group. + +== Connect services + +Connect pods can be exposed using listener classes in exactly tha same fashion as history servers. diff --git a/rust/operator-binary/src/connect/controller.rs b/rust/operator-binary/src/connect/controller.rs index e1031a17..eff55179 100644 --- a/rust/operator-binary/src/connect/controller.rs +++ b/rust/operator-binary/src/connect/controller.rs @@ -11,8 +11,8 @@ use stackable_operator::{ }, logging::controller::ReconcilerError, status::condition::{ - compute_conditions, deployment::DeploymentConditionBuilder, - operations::ClusterOperationsConditionBuilder, + compute_conditions, operations::ClusterOperationsConditionBuilder, + statefulset::StatefulSetConditionBuilder, }, time::Duration, }; @@ -29,6 +29,14 @@ use crate::{ #[strum_discriminants(derive(IntoStaticStr))] #[allow(clippy::enum_variant_names)] pub enum Error { + #[snafu(display("failed to build spark connect listener"))] + BuildListener { source: server::Error }, + + #[snafu(display("failed to apply spark connect listener"))] + ApplyListener { + source: stackable_operator::cluster_resources::Error, + }, + #[snafu(display("failed to serialize connect properties"))] SerializeProperties { source: common::Error }, @@ -50,8 +58,8 @@ pub enum Error { #[snafu(display("failed to build spark connect server config map for {name}"))] BuildServerConfigMap { source: server::Error, name: String }, - #[snafu(display("failed to build spark connect deployment"))] - BuildServerDeployment { source: server::Error }, + #[snafu(display("failed to build spark connect stateful set"))] + BuildServerStatefulSet { source: server::Error }, #[snafu(display("failed to update status of spark connect server {name}"))] ApplyStatus { @@ -62,8 +70,8 @@ pub enum Error { #[snafu(display("spark connect object has no namespace"))] ObjectHasNoNamespace, - #[snafu(display("failed to update the connect server deployment"))] - ApplyDeployment { + #[snafu(display("failed to update the connect server stateful set"))] + ApplyStatefulSet { source: stackable_operator::cluster_resources::Error, }, @@ -192,21 +200,9 @@ pub async fn reconcile( .await .context(ApplyRoleBindingSnafu)?; - // Expose connect server to the outside world - let service = server::build_service(scs, &resolved_product_image.app_version_label, None) - .context(BuildServiceSnafu)?; - cluster_resources - .add(client, service.clone()) - .await - .context(ApplyServiceSnafu)?; - // Headless service used by executors connect back to the driver - let service = server::build_service( - scs, - &resolved_product_image.app_version_label, - Some("None".to_string()), - ) - .context(BuildServiceSnafu)?; + let service = server::build_internal_service(scs, &resolved_product_image.app_version_label) + .context(BuildServiceSnafu)?; cluster_resources .add(client, service.clone()) @@ -275,8 +271,10 @@ pub async fn reconcile( name: scs.name_unchecked(), })?; + // ======================================== + // Server stateful set let args = server::command_args(&scs.spec.args); - let deployment = server::build_deployment( + let stateful_set = server::build_stateful_set( scs, &server_config, &resolved_product_image, @@ -284,15 +282,25 @@ pub async fn reconcile( &server_config_map, args, ) - .context(BuildServerDeploymentSnafu)?; + .context(BuildServerStatefulSetSnafu)?; + + // ======================================== + // Server listener + let listener = server::build_listener(scs, &server_config, &resolved_product_image) + .context(BuildListenerSnafu)?; + + cluster_resources + .add(client, listener) + .await + .context(ApplyListenerSnafu)?; - let mut ss_cond_builder = DeploymentConditionBuilder::default(); + let mut ss_cond_builder = StatefulSetConditionBuilder::default(); ss_cond_builder.add( cluster_resources - .add(client, deployment) + .add(client, stateful_set) .await - .context(ApplyDeploymentSnafu)?, + .context(ApplyStatefulSetSnafu)?, ); cluster_resources diff --git a/rust/operator-binary/src/connect/crd.rs b/rust/operator-binary/src/connect/crd.rs index a491e952..e478f1c1 100644 --- a/rust/operator-binary/src/connect/crd.rs +++ b/rust/operator-binary/src/connect/crd.rs @@ -79,6 +79,9 @@ pub mod versioned { pub image: ProductImage, /// Global Spark Connect server configuration that applies to all roles. + /// + /// This was previously used to hold the listener configuration, which has since moved + /// to the server configuration. #[serde(default)] pub cluster_config: v1alpha1::SparkConnectServerClusterConfig, @@ -106,21 +109,7 @@ pub mod versioned { #[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] - pub struct SparkConnectServerClusterConfig { - /// This field controls which type of Service the Operator creates for this ConnectServer: - /// - /// * cluster-internal: Use a ClusterIP service - /// - /// * external-unstable: Use a NodePort service - /// - /// * external-stable: Use a LoadBalancer service - /// - /// This is a temporary solution with the goal to keep yaml manifests forward compatible. - /// In the future, this setting will control which ListenerClass - /// will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change. - #[serde(default)] - pub listener_class: CurrentlySupportedListenerClasses, - } + pub struct SparkConnectServerClusterConfig {} #[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)] #[fragment_attrs( @@ -147,6 +136,10 @@ pub mod versioned { /// This can be shortened by the `maxCertificateLifetime` setting on the SecretClass issuing the TLS certificate. #[fragment_attrs(serde(default))] pub requested_secret_lifetime: Option, + + /// This field controls which [ListenerClass](DOCS_BASE_URL_PLACEHOLDER/listener-operator/listenerclass.html) is used to expose the Spark services. + #[serde(default)] + pub listener_class: String, } #[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)] @@ -178,29 +171,6 @@ pub mod versioned { } } -// TODO: Temporary solution until listener-operator is finished -#[derive(Clone, Debug, Default, Display, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] -#[serde(rename_all = "PascalCase")] -pub(crate) enum CurrentlySupportedListenerClasses { - #[serde(rename = "cluster-internal")] - ClusterInternal, - #[default] - #[serde(rename = "external-unstable")] - ExternalUnstable, - #[serde(rename = "external-stable")] - ExternalStable, -} - -impl CurrentlySupportedListenerClasses { - pub fn k8s_service_type(&self) -> String { - match self { - CurrentlySupportedListenerClasses::ClusterInternal => "ClusterIP".to_string(), - CurrentlySupportedListenerClasses::ExternalUnstable => "NodePort".to_string(), - CurrentlySupportedListenerClasses::ExternalStable => "LoadBalancer".to_string(), - } - } -} - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)] #[fragment_attrs( @@ -258,6 +228,7 @@ impl v1alpha1::ServerConfig { }, logging: product_logging::spec::default_logging(), requested_secret_lifetime: Some(Self::DEFAULT_CONNECT_SECRET_LIFETIME), + listener_class: Some("cluster-internal".into()), } } diff --git a/rust/operator-binary/src/connect/server.rs b/rust/operator-binary/src/connect/server.rs index 18944430..c4efbc87 100644 --- a/rust/operator-binary/src/connect/server.rs +++ b/rust/operator-binary/src/connect/server.rs @@ -7,15 +7,23 @@ use stackable_operator::{ configmap::ConfigMapBuilder, meta::ObjectMetaBuilder, pod::{ - PodBuilder, container::ContainerBuilder, resources::ResourceRequirementsBuilder, - volume::VolumeBuilder, + PodBuilder, + container::ContainerBuilder, + resources::ResourceRequirementsBuilder, + volume::{ + ListenerOperatorVolumeSourceBuilder, ListenerOperatorVolumeSourceBuilderError, + ListenerReference, VolumeBuilder, + }, }, }, - commons::product_image_selection::ResolvedProductImage, + commons::{ + listener::{Listener, ListenerPort}, + product_image_selection::ResolvedProductImage, + }, k8s_openapi::{ DeepMerge, api::{ - apps::v1::{Deployment, DeploymentSpec}, + apps::v1::{StatefulSet, StatefulSetSpec}, core::v1::{ ConfigMap, EnvVar, HTTPGetAction, PodSecurityContext, Probe, Service, ServiceAccount, ServicePort, ServiceSpec, @@ -37,11 +45,15 @@ use crate::{ SparkConnectContainer, v1alpha1, }, }, - crd::constants::{ - APP_NAME, JVM_SECURITY_PROPERTIES_FILE, LOG4J2_CONFIG_FILE, MAX_SPARK_LOG_FILES_SIZE, - METRICS_PROPERTIES_FILE, POD_TEMPLATE_FILE, SPARK_DEFAULTS_FILE_NAME, SPARK_UID, - VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_NAME_LOG_CONFIG, - VOLUME_MOUNT_PATH_CONFIG, VOLUME_MOUNT_PATH_LOG, VOLUME_MOUNT_PATH_LOG_CONFIG, + crd::{ + constants::{ + APP_NAME, JVM_SECURITY_PROPERTIES_FILE, LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, + LOG4J2_CONFIG_FILE, MAX_SPARK_LOG_FILES_SIZE, METRICS_PROPERTIES_FILE, + POD_TEMPLATE_FILE, SPARK_DEFAULTS_FILE_NAME, SPARK_UID, VOLUME_MOUNT_NAME_CONFIG, + VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_CONFIG, + VOLUME_MOUNT_PATH_LOG, VOLUME_MOUNT_PATH_LOG_CONFIG, + }, + listener, }, product_logging, }; @@ -52,6 +64,14 @@ const HTTP: &str = "http"; #[derive(Snafu, Debug)] #[allow(clippy::enum_variant_names)] pub enum Error { + #[snafu(display("failed to build spark connect listener"))] + BuildListener { source: crate::crd::listener::Error }, + + #[snafu(display("failed to build listener volume"))] + BuildListenerVolume { + source: ListenerOperatorVolumeSourceBuilderError, + }, + #[snafu(display("vector agent is enabled but vector aggregator ConfigMap is missing"))] VectorAggregatorConfigMapMissing, @@ -166,11 +186,7 @@ pub(crate) fn server_config_map( .add_data(JVM_SECURITY_PROPERTIES_FILE, jvm_sec_props) .add_data(METRICS_PROPERTIES_FILE, metrics_props); - let role_group_ref = RoleGroupRef { - cluster: ObjectRef::from_obj(scs), - role: SparkConnectRole::Server.to_string(), - role_group: DUMMY_SPARK_CONNECT_GROUP_NAME.to_string(), - }; + let role_group_ref = dummy_role_group_ref(scs); product_logging::extend_config_map( &role_group_ref, &config.logging, @@ -187,21 +203,23 @@ pub(crate) fn server_config_map( .context(InvalidConfigMapSnafu { name: cm_name }) } -#[allow(clippy::result_large_err)] -pub(crate) fn build_deployment( +pub(crate) fn build_stateful_set( scs: &v1alpha1::SparkConnectServer, config: &v1alpha1::ServerConfig, resolved_product_image: &ResolvedProductImage, service_account: &ServiceAccount, config_map: &ConfigMap, args: Vec, -) -> Result { +) -> Result { + let server_role = SparkConnectRole::Server.to_string(); + let recommended_object_labels = + common::labels(scs, &resolved_product_image.app_version_label, &server_role); + + let recommended_labels = + Labels::recommended(recommended_object_labels.clone()).context(LabelBuildSnafu)?; + let metadata = ObjectMetaBuilder::new() - .with_recommended_labels(common::labels( - scs, - &resolved_product_image.app_version_label, - &SparkConnectRole::Server.to_string(), - )) + .with_recommended_labels(recommended_object_labels) .context(MetadataBuildSnafu)? .with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?) .build(); @@ -260,6 +278,8 @@ pub(crate) fn build_deployment( .context(AddVolumeMountSnafu)? .add_volume_mount(VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_PATH_LOG) .context(AddVolumeMountSnafu)? + .add_volume_mount(LISTENER_VOLUME_NAME, LISTENER_VOLUME_DIR) + .context(AddVolumeMountSnafu)? .readiness_probe(probe()) .liveness_probe(probe()); @@ -308,13 +328,28 @@ pub(crate) fn build_deployment( } } + // Add listener volume + // Listener endpoints for the Webserver role will use persistent volumes + // so that load balancers can hard-code the target addresses. This will + // be the case even when no class is set (and the value defaults to + // cluster-internal) as the address should still be consistent. + let volume_claim_templates = Some(vec![ + ListenerOperatorVolumeSourceBuilder::new( + &ListenerReference::ListenerName(dummy_role_group_ref(scs).object_name()), + &recommended_labels, + ) + .context(BuildListenerVolumeSnafu)? + .build_pvc(LISTENER_VOLUME_NAME.to_string()) + .context(BuildListenerVolumeSnafu)?, + ]); + // Merge user defined pod template if available let mut pod_template = pb.build_template(); if let Some(pod_overrides_spec) = scs.spec.server.as_ref().map(|s| s.pod_overrides.clone()) { pod_template.merge_from(pod_overrides_spec); } - Ok(Deployment { + Ok(StatefulSet { metadata: ObjectMetaBuilder::new() .name_and_namespace(scs) .name(object_name(&scs.name_any(), SparkConnectRole::Server)) @@ -327,9 +362,10 @@ pub(crate) fn build_deployment( )) .context(MetadataBuildSnafu)? .build(), - spec: Some(DeploymentSpec { + spec: Some(StatefulSetSpec { template: pod_template, replicas: Some(1), + volume_claim_templates, selector: LabelSelector { match_labels: Some( Labels::role_group_selector( @@ -343,41 +379,19 @@ pub(crate) fn build_deployment( ), ..LabelSelector::default() }, - ..DeploymentSpec::default() + ..StatefulSetSpec::default() }), - ..Deployment::default() + ..StatefulSet::default() }) } -#[allow(clippy::result_large_err)] -pub(crate) fn build_service( +// This is the headless driver service used for the internal +// communication with the executors as recommended by the Spark docs. +pub(crate) fn build_internal_service( scs: &v1alpha1::SparkConnectServer, app_version_label: &str, - service_cluster_ip: Option, ) -> Result { - let (service_name, service_type, publish_not_ready_addresses) = match service_cluster_ip.clone() - { - Some(_) => ( - // These are the properties of the headless driver service used for the internal - // communication with the executors as recommended by the Spark docs. - // - // The flag `publish_not_ready_addresses` *must* be `true` to allow for readiness - // probes. Without it, the driver runs into a deadlock beacuse the Pod cannot become - // "ready" until the Service is "ready" and vice versa. - object_name(&scs.name_any(), SparkConnectRole::Server), - "ClusterIP".to_string(), - Some(true), - ), - None => ( - format!( - "{}-{}", - object_name(&scs.name_any(), SparkConnectRole::Server), - SparkConnectRole::Server - ), - scs.spec.cluster_config.listener_class.k8s_service_type(), - Some(false), - ), - }; + let service_name = object_name(&scs.name_any(), SparkConnectRole::Server); let selector = Labels::role_selector(scs, APP_NAME, &SparkConnectRole::Server.to_string()) .context(LabelBuildSnafu)? @@ -398,8 +412,8 @@ pub(crate) fn build_service( .with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?) .build(), spec: Some(ServiceSpec { - type_: Some(service_type), - cluster_ip: service_cluster_ip, + type_: Some("ClusterIP".to_owned()), + cluster_ip: Some("None".to_owned()), ports: Some(vec![ ServicePort { name: Some(String::from(GRPC)), @@ -413,7 +427,10 @@ pub(crate) fn build_service( }, ]), selector: Some(selector), - publish_not_ready_addresses, + // The flag `publish_not_ready_addresses` *must* be `true` to allow for readiness + // probes. Without it, the driver runs into a deadlock beacuse the Pod cannot become + // "ready" until the Service is "ready" and vice versa. + publish_not_ready_addresses: Some(true), ..ServiceSpec::default() }), status: None, @@ -584,3 +601,47 @@ fn probe() -> Probe { ..Probe::default() } } + +fn dummy_role_group_ref( + scs: &v1alpha1::SparkConnectServer, +) -> RoleGroupRef { + RoleGroupRef { + cluster: ObjectRef::from_obj(scs), + role: SparkConnectRole::Server.to_string(), + role_group: DUMMY_SPARK_CONNECT_GROUP_NAME.to_string(), + } +} + +pub(crate) fn build_listener( + scs: &v1alpha1::SparkConnectServer, + config: &v1alpha1::ServerConfig, + resolved_product_image: &ResolvedProductImage, +) -> Result { + let listener_name = dummy_role_group_ref(scs).object_name(); + let listener_class = config.listener_class.clone(); + let role = SparkConnectRole::Server.to_string(); + let recommended_object_labels = + common::labels(scs, &resolved_product_image.app_version_label, &role); + + let listener_ports = [ + ListenerPort { + name: GRPC.to_string(), + port: CONNECT_GRPC_PORT, + protocol: Some("TCP".to_string()), + }, + ListenerPort { + name: HTTP.to_string(), + port: CONNECT_UI_PORT, + protocol: Some("TCP".to_string()), + }, + ]; + + listener::build_listener( + scs, + &listener_name, + &listener_class, + recommended_object_labels, + &listener_ports, + ) + .context(BuildListenerSnafu) +} diff --git a/rust/operator-binary/src/crd/constants.rs b/rust/operator-binary/src/crd/constants.rs index f25eb254..3da61f98 100644 --- a/rust/operator-binary/src/crd/constants.rs +++ b/rust/operator-binary/src/crd/constants.rs @@ -88,3 +88,7 @@ pub const SPARK_ENV_SH_FILE_NAME: &str = "spark-env.sh"; pub const SPARK_CLUSTER_ROLE: &str = "spark-k8s-clusterrole"; pub const SPARK_UID: i64 = 1000; pub const METRICS_PORT: u16 = 18081; +pub const HISTORY_UI_PORT: u16 = 18080; + +pub const LISTENER_VOLUME_NAME: &str = "listener"; +pub const LISTENER_VOLUME_DIR: &str = "/stackable/listener"; diff --git a/rust/operator-binary/src/crd/history.rs b/rust/operator-binary/src/crd/history.rs index df01178a..54e66756 100644 --- a/rust/operator-binary/src/crd/history.rs +++ b/rust/operator-binary/src/crd/history.rs @@ -62,6 +62,7 @@ pub enum Error { #[versioned(version(name = "v1alpha1"))] pub mod versioned { + /// A Spark cluster history server component. This resource is managed by the Stackable operator /// for Apache Spark. Find more information on how to use it in the /// [operator documentation](DOCS_BASE_URL_PLACEHOLDER/spark-k8s/usage-guide/history-server). @@ -80,7 +81,10 @@ pub mod versioned { pub struct SparkHistoryServerSpec { pub image: ProductImage, - /// Global Spark history server configuration that applies to all roles and role groups. + /// Global Spark history server configuration that applies to all roles. + /// + /// This was previously used to hold the listener configuration, which has since moved + /// to the role configuration. #[serde(default)] pub cluster_config: v1alpha1::SparkHistoryServerClusterConfig, @@ -102,44 +106,7 @@ pub mod versioned { #[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] - pub struct SparkHistoryServerClusterConfig { - /// This field controls which type of Service the Operator creates for this HistoryServer: - /// - /// * cluster-internal: Use a ClusterIP service - /// - /// * external-unstable: Use a NodePort service - /// - /// * external-stable: Use a LoadBalancer service - /// - /// This is a temporary solution with the goal to keep yaml manifests forward compatible. - /// In the future, this setting will control which ListenerClass - /// will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change. - #[serde(default)] - pub listener_class: CurrentlySupportedListenerClasses, - } -} - -// TODO: Temporary solution until listener-operator is finished -#[derive(Clone, Debug, Default, Display, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] -#[serde(rename_all = "PascalCase")] -pub enum CurrentlySupportedListenerClasses { - #[default] - #[serde(rename = "cluster-internal")] - ClusterInternal, - #[serde(rename = "external-unstable")] - ExternalUnstable, - #[serde(rename = "external-stable")] - ExternalStable, -} - -impl CurrentlySupportedListenerClasses { - pub fn k8s_service_type(&self) -> String { - match self { - CurrentlySupportedListenerClasses::ClusterInternal => "ClusterIP".to_string(), - CurrentlySupportedListenerClasses::ExternalUnstable => "NodePort".to_string(), - CurrentlySupportedListenerClasses::ExternalStable => "LoadBalancer".to_string(), - } - } + pub struct SparkHistoryServerClusterConfig {} } impl v1alpha1::SparkHistoryServer { @@ -373,6 +340,9 @@ pub struct HistoryConfig { /// This can be shortened by the `maxCertificateLifetime` setting on the SecretClass issuing the TLS certificate. #[fragment_attrs(serde(default))] pub requested_secret_lifetime: Option, + + #[serde(default)] + pub listener_class: String, } impl HistoryConfig { @@ -396,6 +366,7 @@ impl HistoryConfig { logging: product_logging::spec::default_logging(), affinity: history_affinity(cluster_name), requested_secret_lifetime: Some(Self::DEFAULT_HISTORY_SECRET_LIFETIME), + listener_class: Some(default_listener_class()), } } } @@ -432,6 +403,10 @@ impl Configuration for HistoryConfigFragment { } } +fn default_listener_class() -> String { + "cluster-internal".to_owned() +} + #[cfg(test)] mod test { use indoc::indoc; diff --git a/rust/operator-binary/src/crd/listener.rs b/rust/operator-binary/src/crd/listener.rs new file mode 100644 index 00000000..4a3b7151 --- /dev/null +++ b/rust/operator-binary/src/crd/listener.rs @@ -0,0 +1,48 @@ +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + builder::meta::ObjectMetaBuilder, + commons::listener::{Listener, ListenerPort, ListenerSpec}, + kube::Resource, + kvp::ObjectLabels, +}; +use strum::{EnumDiscriminants, IntoStaticStr}; + +#[derive(Snafu, Debug, EnumDiscriminants)] +#[strum_discriminants(derive(IntoStaticStr))] +#[allow(clippy::enum_variant_names)] +pub enum Error { + #[snafu(display("object is missing metadata to build owner reference"))] + ObjectMissingMetadataForOwnerRef { + source: stackable_operator::builder::meta::Error, + }, + + #[snafu(display("failed to build object meta data"))] + ObjectMeta { + source: stackable_operator::builder::meta::Error, + }, +} + +pub fn build_listener>( + resource: &T, + listener_name: &str, + listener_class: &str, + listener_labels: ObjectLabels, + listener_ports: &[ListenerPort], +) -> Result { + Ok(Listener { + metadata: ObjectMetaBuilder::new() + .name_and_namespace(resource) + .name(listener_name) + .ownerreference_from_resource(resource, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(listener_labels) + .context(ObjectMetaSnafu)? + .build(), + spec: ListenerSpec { + class_name: Some(listener_class.into()), + ports: Some(listener_ports.to_vec()), + ..ListenerSpec::default() + }, + status: None, + }) +} diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 340ff5fa..d0d82691 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -55,6 +55,7 @@ use crate::{ pub mod affinity; pub mod constants; pub mod history; +pub mod listener; pub mod logdir; pub mod roles; pub mod tlscerts; diff --git a/rust/operator-binary/src/history/history_controller.rs b/rust/operator-binary/src/history/history_controller.rs index 2bc679cb..f4ddfbc8 100644 --- a/rust/operator-binary/src/history/history_controller.rs +++ b/rust/operator-binary/src/history/history_controller.rs @@ -11,20 +11,26 @@ use stackable_operator::{ configmap::ConfigMapBuilder, meta::ObjectMetaBuilder, pod::{ - PodBuilder, container::ContainerBuilder, resources::ResourceRequirementsBuilder, - volume::VolumeBuilder, + PodBuilder, + container::ContainerBuilder, + resources::ResourceRequirementsBuilder, + volume::{ + ListenerOperatorVolumeSourceBuilder, ListenerOperatorVolumeSourceBuilderError, + ListenerReference, VolumeBuilder, + }, }, }, cluster_resources::{ClusterResourceApplyStrategy, ClusterResources}, - commons::product_image_selection::ResolvedProductImage, + commons::{ + listener::{Listener, ListenerPort}, + product_image_selection::ResolvedProductImage, + rbac::build_rbac_resources, + }, k8s_openapi::{ DeepMerge, api::{ apps::v1::{StatefulSet, StatefulSetSpec}, - core::v1::{ - ConfigMap, PodSecurityContext, Service, ServiceAccount, ServicePort, ServiceSpec, - }, - rbac::v1::{ClusterRole, RoleBinding, RoleRef, Subject}, + core::v1::{ConfigMap, PodSecurityContext, ServiceAccount}, }, apimachinery::pkg::apis::meta::v1::LabelSelector, }, @@ -33,7 +39,7 @@ use stackable_operator::{ core::{DeserializeGuard, error_boundary}, runtime::{controller::Action, reflector::ObjectRef}, }, - kvp::{Label, Labels, ObjectLabels}, + kvp::{Labels, ObjectLabels}, logging::controller::ReconcilerError, product_logging::{ framework::{LoggingError, calculate_log_volume_size_limit, vector_container}, @@ -51,14 +57,16 @@ use crate::{ Ctx, crd::{ constants::{ - ACCESS_KEY_ID, APP_NAME, HISTORY_CONTROLLER_NAME, HISTORY_ROLE_NAME, - JVM_SECURITY_PROPERTIES_FILE, MAX_SPARK_LOG_FILES_SIZE, METRICS_PORT, OPERATOR_NAME, - SECRET_ACCESS_KEY, SPARK_CLUSTER_ROLE, SPARK_DEFAULTS_FILE_NAME, - SPARK_ENV_SH_FILE_NAME, SPARK_IMAGE_BASE_NAME, SPARK_UID, STACKABLE_TRUST_STORE, - VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_NAME_LOG_CONFIG, - VOLUME_MOUNT_PATH_CONFIG, VOLUME_MOUNT_PATH_LOG, VOLUME_MOUNT_PATH_LOG_CONFIG, + ACCESS_KEY_ID, APP_NAME, HISTORY_CONTROLLER_NAME, HISTORY_ROLE_NAME, HISTORY_UI_PORT, + JVM_SECURITY_PROPERTIES_FILE, LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, + MAX_SPARK_LOG_FILES_SIZE, METRICS_PORT, OPERATOR_NAME, SECRET_ACCESS_KEY, + SPARK_DEFAULTS_FILE_NAME, SPARK_ENV_SH_FILE_NAME, SPARK_IMAGE_BASE_NAME, SPARK_UID, + STACKABLE_TRUST_STORE, VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_NAME_LOG, + VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_CONFIG, VOLUME_MOUNT_PATH_LOG, + VOLUME_MOUNT_PATH_LOG_CONFIG, }, history::{self, HistoryConfig, SparkHistoryServerContainer, v1alpha1}, + listener, logdir::ResolvedLogDir, tlscerts, to_spark_env_sh_string, }, @@ -70,6 +78,19 @@ use crate::{ #[strum_discriminants(derive(IntoStaticStr))] #[allow(clippy::enum_variant_names)] pub enum Error { + #[snafu(display("failed to build RBAC resources"))] + BuildRbacResources { + source: stackable_operator::commons::rbac::Error, + }, + + #[snafu(display("failed to build spark history group listener"))] + BuildListener { source: crate::crd::listener::Error }, + + #[snafu(display("failed to build listener volume"))] + BuildListenerVolume { + source: ListenerOperatorVolumeSourceBuilderError, + }, + #[snafu(display("missing secret lifetime"))] MissingSecretLifetime, @@ -92,8 +113,8 @@ pub enum Error { source: stackable_operator::builder::meta::Error, }, - #[snafu(display("failed to update the history server deployment"))] - ApplyDeployment { + #[snafu(display("failed to update the history server stateful set"))] + ApplyStatefulSet { source: stackable_operator::cluster_resources::Error, }, @@ -102,11 +123,6 @@ pub enum Error { source: stackable_operator::cluster_resources::Error, }, - #[snafu(display("failed to update history server service"))] - ApplyService { - source: stackable_operator::cluster_resources::Error, - }, - #[snafu(display("failed to apply role ServiceAccount"))] ApplyServiceAccount { source: stackable_operator::cluster_resources::Error, @@ -205,9 +221,12 @@ pub enum Error { #[snafu(display("failed to merge environment config and/or overrides"))] MergeEnv { source: crate::crd::history::Error }, -} -type Result = std::result::Result; + #[snafu(display("failed to apply group listener"))] + ApplyGroupListener { + source: stackable_operator::cluster_resources::Error, + }, +} impl ReconcilerError for Error { fn category(&self) -> &'static str { @@ -218,7 +237,7 @@ impl ReconcilerError for Error { pub async fn reconcile( shs: Arc>, ctx: Arc, -) -> Result { +) -> Result { tracing::info!("Starting reconcile history server"); let shs = shs @@ -251,14 +270,20 @@ pub async fn reconcile( .context(LogDirSnafu)?; // Use a dedicated service account for history server pods. - let (serviceaccount, rolebinding) = - build_history_role_serviceaccount(shs, &resolved_product_image.app_version_label)?; - let serviceaccount = cluster_resources - .add(client, serviceaccount) + let (service_account, role_binding) = build_rbac_resources( + shs, + APP_NAME, + cluster_resources + .get_required_labels() + .context(GetRequiredLabelsSnafu)?, + ) + .context(BuildRbacResourcesSnafu)?; + let service_account = cluster_resources + .add(client, service_account) .await .context(ApplyServiceAccountSnafu)?; cluster_resources - .add(client, rolebinding) + .add(client, role_binding) .await .context(ApplyRoleBindingSnafu)?; @@ -268,17 +293,6 @@ pub async fn reconcile( .context(ProductConfigValidationSnafu)? .iter() { - let service = build_service( - shs, - &resolved_product_image.app_version_label, - role_name, - None, - )?; - cluster_resources - .add(client, service) - .await - .context(ApplyServiceSnafu)?; - for (rolegroup_name, rolegroup_config) in role_config.iter() { let rgr = RoleGroupRef { cluster: ObjectRef::from_obj(shs), @@ -290,17 +304,6 @@ pub async fn reconcile( .merged_config(&rgr) .context(FailedToResolveConfigSnafu)?; - let service = build_service( - shs, - &resolved_product_image.app_version_label, - role_name, - Some(&rgr), - )?; - cluster_resources - .add(client, service) - .await - .context(ApplyServiceSnafu)?; - let config_map = build_config_map( shs, rolegroup_config, @@ -320,12 +323,23 @@ pub async fn reconcile( &rgr, &log_dir, &merged_config, - &serviceaccount, + &service_account, )?; cluster_resources .add(client, sts) .await - .context(ApplyDeploymentSnafu)?; + .context(ApplyStatefulSetSnafu)?; + + let rg_group_listener = build_group_listener( + shs, + &resolved_product_image, + &rgr, + merged_config.listener_class.to_string(), + )?; + cluster_resources + .add(client, rg_group_listener) + .await + .context(ApplyGroupListenerSnafu)?; } let role_config = &shs.spec.nodes.role_config; @@ -347,6 +361,36 @@ pub async fn reconcile( Ok(Action::await_change()) } +#[allow(clippy::result_large_err)] +fn build_group_listener( + shs: &v1alpha1::SparkHistoryServer, + resolved_product_image: &ResolvedProductImage, + rolegroup: &RoleGroupRef, + listener_class: String, +) -> Result { + let listener_name = rolegroup.object_name(); + let recommended_object_labels = labels( + shs, + &resolved_product_image.app_version_label, + &rolegroup.role_group, + ); + + let listener_ports = [ListenerPort { + name: "http".to_string(), + port: HISTORY_UI_PORT.into(), + protocol: Some("TCP".to_string()), + }]; + + listener::build_listener( + shs, + &listener_name, + &listener_class, + recommended_object_labels, + &listener_ports, + ) + .context(BuildListenerSnafu) +} + pub fn error_policy( _obj: Arc>, error: &Error, @@ -452,12 +496,16 @@ fn build_stateful_set( rolegroupref.object_name() }; - let metadata = ObjectMetaBuilder::new() - .with_recommended_labels(labels( - shs, - &resolved_product_image.app_version_label, - &rolegroupref.role_group, - )) + let recommended_object_labels = labels( + shs, + &resolved_product_image.app_version_label, + rolegroupref.role_group.as_ref(), + ); + let recommended_labels = + Labels::recommended(recommended_object_labels.clone()).context(LabelBuildSnafu)?; + + let pb_metadata = ObjectMetaBuilder::new() + .with_recommended_labels(recommended_object_labels.clone()) .context(MetadataBuildSnafu)? .build(); @@ -467,7 +515,7 @@ fn build_stateful_set( .requested_secret_lifetime .context(MissingSecretLifetimeSnafu)?; pb.service_account_name(serviceaccount.name_unchecked()) - .metadata(metadata) + .metadata(pb_metadata) .image_pull_secrets_from_product_image(resolved_product_image) .add_volume( VolumeBuilder::new(VOLUME_MOUNT_NAME_CONFIG) @@ -528,7 +576,7 @@ fn build_stateful_set( "-c".to_string(), ]) .args(command_args(log_dir)) - .add_container_port("http", 18080) + .add_container_port("http", HISTORY_UI_PORT.into()) .add_container_port("metrics", METRICS_PORT.into()) .add_env_vars(merged_env) .add_volume_mounts(log_dir.volume_mounts()) @@ -539,7 +587,25 @@ fn build_stateful_set( .context(AddVolumeMountSnafu)? .add_volume_mount(VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_PATH_LOG) .context(AddVolumeMountSnafu)? + .add_volume_mount(LISTENER_VOLUME_NAME, LISTENER_VOLUME_DIR) + .context(AddVolumeMountSnafu)? .build(); + + // Add listener volume + // Listener endpoints for the Webserver role will use persistent volumes + // so that load balancers can hard-code the target addresses. This will + // be the case even when no class is set (and the value defaults to + // cluster-internal) as the address should still be consistent. + let volume_claim_templates = Some(vec![ + ListenerOperatorVolumeSourceBuilder::new( + &ListenerReference::ListenerName(rolegroupref.object_name()), + &recommended_labels, + ) + .context(BuildListenerVolumeSnafu)? + .build_pvc(LISTENER_VOLUME_NAME.to_string()) + .context(BuildListenerVolumeSnafu)?, + ]); + pb.add_container(container); if merged_config.logging.enable_vector_agent { @@ -575,21 +641,20 @@ fn build_stateful_set( pod_template.merge_from(shs.role().config.pod_overrides.clone()); pod_template.merge_from(role_group.config.pod_overrides); + let sts_metadata = ObjectMetaBuilder::new() + .name_and_namespace(shs) + .name(rolegroupref.object_name()) + .ownerreference_from_resource(shs, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(recommended_object_labels) + .context(MetadataBuildSnafu)? + .build(); + Ok(StatefulSet { - metadata: ObjectMetaBuilder::new() - .name_and_namespace(shs) - .name(rolegroupref.object_name()) - .ownerreference_from_resource(shs, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(labels( - shs, - &resolved_product_image.app_version_label, - rolegroupref.role_group.as_ref(), - )) - .context(MetadataBuildSnafu)? - .build(), + metadata: sts_metadata, spec: Some(StatefulSetSpec { template: pod_template, + volume_claim_templates, replicas: shs.replicas(rolegroupref), selector: LabelSelector { match_labels: Some( @@ -610,116 +675,6 @@ fn build_stateful_set( }) } -#[allow(clippy::result_large_err)] -fn build_service( - shs: &v1alpha1::SparkHistoryServer, - app_version_label: &str, - role: &str, - group: Option<&RoleGroupRef>, -) -> Result { - let group_name = match group { - Some(rgr) => rgr.role_group.clone(), - None => "global".to_owned(), - }; - - let (service_name, service_type, service_cluster_ip) = match group { - Some(rgr) => ( - rgr.object_name(), - "ClusterIP".to_string(), - Some("None".to_string()), - ), - None => ( - // TODO (@NickLarsenNZ): Explain this unwrap. Either convert to expect, or gracefully handle the error. - format!("{}-{}", shs.metadata.name.as_ref().unwrap(), role), - shs.spec.cluster_config.listener_class.k8s_service_type(), - None, - ), - }; - - let selector = match group { - Some(rgr) => Labels::role_group_selector(shs, APP_NAME, &rgr.role, &rgr.role_group) - .context(LabelBuildSnafu)? - .into(), - None => Labels::role_selector(shs, APP_NAME, role) - .context(LabelBuildSnafu)? - .into(), - }; - - Ok(Service { - metadata: ObjectMetaBuilder::new() - .name_and_namespace(shs) - .name(service_name) - .ownerreference_from_resource(shs, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(labels(shs, app_version_label, &group_name)) - .context(MetadataBuildSnafu)? - .with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?) - .build(), - spec: Some(ServiceSpec { - type_: Some(service_type), - cluster_ip: service_cluster_ip, - ports: Some(vec![ - ServicePort { - name: Some(String::from("http")), - port: 18080, - ..ServicePort::default() - }, - ServicePort { - name: Some(String::from("metrics")), - port: METRICS_PORT.into(), - ..ServicePort::default() - }, - ]), - selector: Some(selector), - ..ServiceSpec::default() - }), - status: None, - }) -} - -// TODO: This function should be replaced with operator-rs build_rbac_resources. -// See: https://github.com/stackabletech/spark-k8s-operator/issues/499 -#[allow(clippy::result_large_err)] -fn build_history_role_serviceaccount( - shs: &v1alpha1::SparkHistoryServer, - app_version_label: &str, -) -> Result<(ServiceAccount, RoleBinding)> { - let sa = ServiceAccount { - metadata: ObjectMetaBuilder::new() - .name_and_namespace(shs) - .ownerreference_from_resource(shs, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(labels(shs, app_version_label, HISTORY_CONTROLLER_NAME)) - .context(MetadataBuildSnafu)? - .build(), - ..ServiceAccount::default() - }; - let binding = RoleBinding { - metadata: ObjectMetaBuilder::new() - .name_and_namespace(shs) - .ownerreference_from_resource(shs, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(labels(shs, app_version_label, HISTORY_CONTROLLER_NAME)) - .context(MetadataBuildSnafu)? - .build(), - role_ref: RoleRef { - api_group: ::GROUP // need to fully qualify because of "Resource" name clash - .to_string(), - kind: ::KIND.to_string(), - name: SPARK_CLUSTER_ROLE.to_string(), - }, - subjects: Some(vec![Subject { - api_group: Some( - ::GROUP.to_string(), - ), - kind: ::KIND.to_string(), - name: sa.name_any(), - namespace: sa.namespace(), - }]), - }; - Ok((sa, binding)) -} - #[allow(clippy::result_large_err)] fn spark_defaults( shs: &v1alpha1::SparkHistoryServer, diff --git a/tests/templates/kuttl/spark-connect/10-assert.yaml b/tests/templates/kuttl/spark-connect/10-assert.yaml index 5b75cf2c..826a7888 100644 --- a/tests/templates/kuttl/spark-connect/10-assert.yaml +++ b/tests/templates/kuttl/spark-connect/10-assert.yaml @@ -4,7 +4,7 @@ kind: TestAssert timeout: 300 --- apiVersion: apps/v1 -kind: Deployment +kind: StatefulSet metadata: name: spark-connect-server status: diff --git a/tests/templates/kuttl/spark-connect/10-deploy-spark-connect.yaml.j2 b/tests/templates/kuttl/spark-connect/10-deploy-spark-connect.yaml.j2 index 3e6464b0..20b3f0e2 100644 --- a/tests/templates/kuttl/spark-connect/10-deploy-spark-connect.yaml.j2 +++ b/tests/templates/kuttl/spark-connect/10-deploy-spark-connect.yaml.j2 @@ -49,6 +49,7 @@ spec: add: - -Dmy.custom.jvm.arg=customValue config: + listenerClass: external-unstable logging: enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} containers: diff --git a/tests/templates/kuttl/spark-connect/20-run-connect-client.yaml.j2 b/tests/templates/kuttl/spark-connect/20-run-connect-client.yaml.j2 index 99f97902..d78c2572 100644 --- a/tests/templates/kuttl/spark-connect/20-run-connect-client.yaml.j2 +++ b/tests/templates/kuttl/spark-connect/20-run-connect-client.yaml.j2 @@ -22,7 +22,7 @@ spec: [ "/usr/bin/python", "/stackable/spark-connect-examples/python/simple-connect-app.py", - "sc://spark-connect-server", + "sc://spark-connect-server-default", ] resources: limits: diff --git a/tests/templates/kuttl/spark-history-server/06-deploy-history-server.yaml.j2 b/tests/templates/kuttl/spark-history-server/06-deploy-history-server.yaml.j2 index 967cdd7e..a440a164 100644 --- a/tests/templates/kuttl/spark-history-server/06-deploy-history-server.yaml.j2 +++ b/tests/templates/kuttl/spark-history-server/06-deploy-history-server.yaml.j2 @@ -35,6 +35,7 @@ spec: {% if lookup('env', 'VECTOR_AGGREGATOR') %} vectorAggregatorConfigMapName: vector-aggregator-discovery {% endif %} + logFileDirectory: s3: prefix: eventlogs/ @@ -44,6 +45,7 @@ spec: #sparkConf: nodes: config: + listenerClass: external-unstable logging: enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} containers: