Skip to content

feat: expose services with listener classes #562

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
May 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
29d2cd3
add listener.rs and replace usages of CurrentlySupportedListenerClasses
razvan Apr 23, 2025
c3b98b2
expose history pods via listener classes
razvan Apr 23, 2025
90d4f03
Merge branch 'main' into feat/listener
razvan Apr 23, 2025
9ffb187
expose spark connect pods via listener classes
razvan Apr 23, 2025
4c1c18f
update changelog
razvan Apr 23, 2025
6914675
Merge branch 'main' into feat/listener
razvan Apr 24, 2025
3f79af6
connect: use persistent volumes for listener and move from deplument …
razvan Apr 28, 2025
2c9488f
history: use persistent columes for listener volumes
razvan Apr 28, 2025
f590251
Merge branch 'main' into feat/listener
razvan Apr 28, 2025
e0fa982
history: create group listeners and update crd
razvan Apr 29, 2025
cee7f2a
history: remove services created by the operator and update test
razvan Apr 29, 2025
8b3ef2e
connect: create server listeners and update crd
razvan Apr 29, 2025
fcd5d3a
history: refactor to use listener::build_listener
razvan Apr 29, 2025
6a2f32a
connect: remove traces of deployment
razvan Apr 29, 2025
d9d8c6e
history: refactor rbac as per todo
razvan Apr 29, 2025
3ed79fc
docs: update
razvan Apr 29, 2025
2709f95
chore: remove set command to make jenkins test work
razvan Apr 30, 2025
73b6245
Update CHANGELOG.md
razvan May 2, 2025
851e25a
Update rust/operator-binary/src/connect/server.rs
razvan May 2, 2025
2355ded
Update rust/operator-binary/src/crd/history.rs
razvan May 2, 2025
9889fb5
doc: mention cluster_config usage status
razvan May 2, 2025
1a6efb8
review feedback
razvan May 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file.
- Use `--file-log-max-files` (or `FILE_LOG_MAX_FILES`) to limit the number of log files kept.
- Use `--file-log-rotation-period` (or `FILE_LOG_ROTATION_PERIOD`) to configure the frequency of rotation.
- Use `--console-log-format` (or `CONSOLE_LOG_FORMAT`) to set the format to `plain` (default) or `json`.
- Expose history and connect services via listener classes ([#562]).

### Changed

Expand All @@ -35,6 +36,7 @@ All notable changes to this project will be documented in this file.
[#554]: https://github.com/stackabletech/spark-k8s-operator/pull/554
[#559]: https://github.com/stackabletech/spark-k8s-operator/pull/559
[#560]: https://github.com/stackabletech/spark-k8s-operator/pull/560
[#562]: https://github.com/stackabletech/spark-k8s-operator/pull/562

## [25.3.0] - 2025-03-21

Expand Down
58 changes: 18 additions & 40 deletions deploy/helm/spark-k8s-operator/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1118,27 +1118,11 @@ spec:
description: A Spark cluster history server component. This resource is managed by the Stackable operator for Apache Spark. Find more information on how to use it in the [operator documentation](https://docs.stackable.tech/home/nightly/spark-k8s/usage-guide/history-server).
properties:
clusterConfig:
default:
listenerClass: cluster-internal
description: Global Spark history server configuration that applies to all roles and role groups.
properties:
listenerClass:
default: cluster-internal
description: |-
This field controls which type of Service the Operator creates for this HistoryServer:

* cluster-internal: Use a ClusterIP service

* external-unstable: Use a NodePort service

* external-stable: Use a LoadBalancer service
default: {}
description: |-
Global Spark history server configuration that applies to all roles.

This is a temporary solution with the goal to keep yaml manifests forward compatible. In the future, this setting will control which ListenerClass <https://docs.stackable.tech/home/stable/listener-operator/listenerclass.html> will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change.
enum:
- cluster-internal
- external-unstable
- external-stable
type: string
This was previously used to hold the listener configuration, which has since moved to the role configuration.
type: object
image:
anyOf:
Expand Down Expand Up @@ -1395,6 +1379,9 @@ spec:
cleaner:
nullable: true
type: boolean
listenerClass:
nullable: true
type: string
logging:
default:
containers: {}
Expand Down Expand Up @@ -1641,6 +1628,9 @@ spec:
cleaner:
nullable: true
type: boolean
listenerClass:
nullable: true
type: string
logging:
default:
containers: {}
Expand Down Expand Up @@ -1879,27 +1869,11 @@ spec:
type: string
type: array
clusterConfig:
default:
listenerClass: external-unstable
description: Global Spark Connect server configuration that applies to all roles.
properties:
listenerClass:
default: external-unstable
description: |-
This field controls which type of Service the Operator creates for this ConnectServer:

* cluster-internal: Use a ClusterIP service

* external-unstable: Use a NodePort service

* external-stable: Use a LoadBalancer service
default: {}
description: |-
Global Spark Connect server configuration that applies to all roles.

This is a temporary solution with the goal to keep yaml manifests forward compatible. In the future, this setting will control which ListenerClass <https://docs.stackable.tech/home/stable/listener-operator/listenerclass.html> will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change.
enum:
- cluster-internal
- external-unstable
- external-stable
type: string
This was previously used to hold the listener configuration, which has since moved to the server configuration.
type: object
clusterOperation:
default:
Expand Down Expand Up @@ -2191,6 +2165,10 @@ spec:
config:
default: {}
properties:
listenerClass:
description: This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) is used to expose the Spark services.
nullable: true
type: string
logging:
default:
containers: {}
Expand Down
11 changes: 11 additions & 0 deletions deploy/helm/spark-k8s-operator/templates/roles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,14 @@ rules:
- bind
resourceNames:
- {{ include "operator.name" . }}-clusterrole
- apiGroups:
- listeners.stackable.tech
resources:
- listeners
verbs:
- get
- list
- watch
- patch
- create
- delete
30 changes: 19 additions & 11 deletions docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
= Service exposition with ListenerClasses
= Service exposition with listener classes
:description: Configure the Spark connect and history services exposure with listener classes: cluster-internal, external-unstable, or external-stable.

The Spark operator deploys SparkApplications, and does not offer a UI or other API, so no services are exposed.
However, the operator can also deploy HistoryServers, which do offer a UI and API.
The operator deploys a service called `<name>-historyserver` (where `<name>` is the name of the spark application) through which the HistoryServer can be reached.
== History services

This service can have three different types: `cluster-internal`, `external-unstable` and `external-stable`.
Read more about the types in the xref:concepts:service-exposition.adoc[service exposition] documentation at platform level.

This is how the ListenerClass is configured:
The operator deploys a xref:listener-operator:listener.adoc[Listener] for each spark history pod.
The default is to only being accessible from within the Kubernetes cluster, but this can be changed by setting `.spec.nodes.config.listenerClass`:

[source,yaml]
----
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkHistoryServer
metadata:
name: spark-history
spec:
clusterConfig:
listenerClass: cluster-internal # <1>
nodes:
config:
listenerClass: external-unstable # <1>
----
<1> The default `cluster-internal` setting.
<1> Specify one of `external-stable`, `external-unstable`, `cluster-internal` (the default setting is `cluster-internal`).

For the example above, the listener operator creates a service named `spark-history-node-default` where `spark-history` is the name of the SparkHistoryServer, `node` is the service role (the only service role available for history servers) and `default` is the role group.

== Connect services

Connect pods can be exposed using listener classes in exactly tha same fashion as history servers.
58 changes: 33 additions & 25 deletions rust/operator-binary/src/connect/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use stackable_operator::{
},
logging::controller::ReconcilerError,
status::condition::{
compute_conditions, deployment::DeploymentConditionBuilder,
operations::ClusterOperationsConditionBuilder,
compute_conditions, operations::ClusterOperationsConditionBuilder,
statefulset::StatefulSetConditionBuilder,
},
time::Duration,
};
Expand All @@ -29,6 +29,14 @@ use crate::{
#[strum_discriminants(derive(IntoStaticStr))]
#[allow(clippy::enum_variant_names)]
pub enum Error {
#[snafu(display("failed to build spark connect listener"))]
BuildListener { source: server::Error },

#[snafu(display("failed to apply spark connect listener"))]
ApplyListener {
source: stackable_operator::cluster_resources::Error,
},

#[snafu(display("failed to serialize connect properties"))]
SerializeProperties { source: common::Error },

Expand All @@ -50,8 +58,8 @@ pub enum Error {
#[snafu(display("failed to build spark connect server config map for {name}"))]
BuildServerConfigMap { source: server::Error, name: String },

#[snafu(display("failed to build spark connect deployment"))]
BuildServerDeployment { source: server::Error },
#[snafu(display("failed to build spark connect stateful set"))]
BuildServerStatefulSet { source: server::Error },

#[snafu(display("failed to update status of spark connect server {name}"))]
ApplyStatus {
Expand All @@ -62,8 +70,8 @@ pub enum Error {
#[snafu(display("spark connect object has no namespace"))]
ObjectHasNoNamespace,

#[snafu(display("failed to update the connect server deployment"))]
ApplyDeployment {
#[snafu(display("failed to update the connect server stateful set"))]
ApplyStatefulSet {
source: stackable_operator::cluster_resources::Error,
},

Expand Down Expand Up @@ -192,21 +200,9 @@ pub async fn reconcile(
.await
.context(ApplyRoleBindingSnafu)?;

// Expose connect server to the outside world
let service = server::build_service(scs, &resolved_product_image.app_version_label, None)
.context(BuildServiceSnafu)?;
cluster_resources
.add(client, service.clone())
.await
.context(ApplyServiceSnafu)?;

// Headless service used by executors connect back to the driver
let service = server::build_service(
scs,
&resolved_product_image.app_version_label,
Some("None".to_string()),
)
.context(BuildServiceSnafu)?;
let service = server::build_internal_service(scs, &resolved_product_image.app_version_label)
.context(BuildServiceSnafu)?;

cluster_resources
.add(client, service.clone())
Expand Down Expand Up @@ -275,24 +271,36 @@ pub async fn reconcile(
name: scs.name_unchecked(),
})?;

// ========================================
// Server stateful set
let args = server::command_args(&scs.spec.args);
let deployment = server::build_deployment(
let stateful_set = server::build_stateful_set(
scs,
&server_config,
&resolved_product_image,
&service_account,
&server_config_map,
args,
)
.context(BuildServerDeploymentSnafu)?;
.context(BuildServerStatefulSetSnafu)?;

// ========================================
// Server listener
let listener = server::build_listener(scs, &server_config, &resolved_product_image)
.context(BuildListenerSnafu)?;

cluster_resources
.add(client, listener)
.await
.context(ApplyListenerSnafu)?;

let mut ss_cond_builder = DeploymentConditionBuilder::default();
let mut ss_cond_builder = StatefulSetConditionBuilder::default();

ss_cond_builder.add(
cluster_resources
.add(client, deployment)
.add(client, stateful_set)
.await
.context(ApplyDeploymentSnafu)?,
.context(ApplyStatefulSetSnafu)?,
);

cluster_resources
Expand Down
47 changes: 9 additions & 38 deletions rust/operator-binary/src/connect/crd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ pub mod versioned {
pub image: ProductImage,

/// Global Spark Connect server configuration that applies to all roles.
///
/// This was previously used to hold the listener configuration, which has since moved
/// to the server configuration.
#[serde(default)]
pub cluster_config: v1alpha1::SparkConnectServerClusterConfig,

Expand Down Expand Up @@ -106,21 +109,7 @@ pub mod versioned {

#[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SparkConnectServerClusterConfig {
/// This field controls which type of Service the Operator creates for this ConnectServer:
///
/// * cluster-internal: Use a ClusterIP service
///
/// * external-unstable: Use a NodePort service
///
/// * external-stable: Use a LoadBalancer service
///
/// This is a temporary solution with the goal to keep yaml manifests forward compatible.
/// In the future, this setting will control which ListenerClass <https://docs.stackable.tech/home/stable/listener-operator/listenerclass.html>
/// will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change.
#[serde(default)]
pub listener_class: CurrentlySupportedListenerClasses,
}
pub struct SparkConnectServerClusterConfig {}

#[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)]
#[fragment_attrs(
Expand All @@ -147,6 +136,10 @@ pub mod versioned {
/// This can be shortened by the `maxCertificateLifetime` setting on the SecretClass issuing the TLS certificate.
#[fragment_attrs(serde(default))]
pub requested_secret_lifetime: Option<Duration>,

/// This field controls which [ListenerClass](DOCS_BASE_URL_PLACEHOLDER/listener-operator/listenerclass.html) is used to expose the Spark services.
#[serde(default)]
pub listener_class: String,
}

#[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)]
Expand Down Expand Up @@ -178,29 +171,6 @@ pub mod versioned {
}
}

// TODO: Temporary solution until listener-operator is finished
#[derive(Clone, Debug, Default, Display, Deserialize, Eq, JsonSchema, PartialEq, Serialize)]
#[serde(rename_all = "PascalCase")]
pub(crate) enum CurrentlySupportedListenerClasses {
#[serde(rename = "cluster-internal")]
ClusterInternal,
#[default]
#[serde(rename = "external-unstable")]
ExternalUnstable,
#[serde(rename = "external-stable")]
ExternalStable,
}

impl CurrentlySupportedListenerClasses {
pub fn k8s_service_type(&self) -> String {
match self {
CurrentlySupportedListenerClasses::ClusterInternal => "ClusterIP".to_string(),
CurrentlySupportedListenerClasses::ExternalUnstable => "NodePort".to_string(),
CurrentlySupportedListenerClasses::ExternalStable => "LoadBalancer".to_string(),
}
}
}

#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)]
#[fragment_attrs(
Expand Down Expand Up @@ -258,6 +228,7 @@ impl v1alpha1::ServerConfig {
},
logging: product_logging::spec::default_logging(),
requested_secret_lifetime: Some(Self::DEFAULT_CONNECT_SECRET_LIFETIME),
listener_class: Some("cluster-internal".into()),
}
}

Expand Down
Loading