Skip to content

feat: add support for SparkConnect #539

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 42 commits into from
Apr 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
32b79a1
wip
razvan Mar 21, 2025
c97e5d8
it builds
razvan Mar 21, 2025
6e6a828
kuttl test
razvan Mar 21, 2025
040a6f5
successfully create driver and executors
razvan Mar 21, 2025
5f5850a
use bundled spark-connect jar instead of --package
razvan Mar 24, 2025
6b57dec
kuttl test is successful
razvan Mar 25, 2025
47f32a9
Merge branch 'main' into feat/connect
razvan Mar 25, 2025
15fac04
simplify CRD: no roles and no role groups for spark connect servers
razvan Mar 26, 2025
67c8366
test passes again
razvan Mar 28, 2025
6bc0631
move all config to properties file
razvan Mar 28, 2025
62f33c6
use deployment instead of stateful set
razvan Mar 28, 2025
a810031
implement jvm overrides for the connect server
razvan Mar 28, 2025
eed00d1
implement log aggregation
razvan Mar 28, 2025
87655f9
implement pod overrides
razvan Mar 28, 2025
4f42a32
implement resource requests
razvan Mar 28, 2025
e559188
implement cluster operation
razvan Mar 31, 2025
4c9c06e
implement connect server status tracking
razvan Mar 31, 2025
a9152da
Refactor server related code in its own module.
razvan Mar 31, 2025
ca4ba59
cleanup
razvan Mar 31, 2025
98a8bc3
configure executors with pod templates
razvan Apr 1, 2025
01bfefc
split configuration between server and executor
razvan Apr 2, 2025
7d0483a
merge executor pod overrides into pod template
razvan Apr 2, 2025
cd44a77
implement executor affinity and resource properties
razvan Apr 3, 2025
bb4c852
bump op-rs to 0.88.0
razvan Apr 3, 2025
28dc0e8
implement user provided command line args for the connect server
razvan Apr 3, 2025
2499e95
Merge branch 'main' into feat/connect
razvan Apr 3, 2025
e6c5571
spark connect usage guide
razvan Apr 3, 2025
533318b
main merge
razvan Apr 4, 2025
c9ca081
update readme and fix typo
razvan Apr 4, 2025
581e2af
cleanup, liveliness probe, do not use the iceberg test for now
razvan Apr 4, 2025
e6cdc97
expose prometheus metrics
razvan Apr 7, 2025
2dec67d
Apply suggestions from code review
razvan Apr 10, 2025
4fd2a3d
remove duplicate constant
razvan Apr 10, 2025
f707cd2
rename argument
razvan Apr 10, 2025
a956730
GRPC and HTTP constants
razvan Apr 10, 2025
93bd80e
fix main merge problems
razvan Apr 10, 2025
433a2ea
remove unused error variant
razvan Apr 10, 2025
5d176aa
remove iceberg test script
razvan Apr 10, 2025
85c149c
Apply suggestions from code review
razvan Apr 10, 2025
7432f37
regenerate charts
razvan Apr 10, 2025
218c406
change visibility
maltesander Apr 10, 2025
5e46bc0
fix comment
razvan Apr 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.

## [Unreleased]

### Added

- Experimental support for Spark Connect ([#539]).

### Changed

- BREAKING: Replace stackable-operator `initialize_logging` with stackable-telemetry `Tracing` ([#547], [#554]).
Expand All @@ -19,6 +23,7 @@ All notable changes to this project will be documented in this file.

- Use `json` file extension for log files ([#553]).

[#539]: https://github.com/stackabletech/spark-k8s-operator/pull/539
[#547]: https://github.com/stackabletech/spark-k8s-operator/pull/547
[#551]: https://github.com/stackabletech/spark-k8s-operator/pull/551
[#553]: https://github.com/stackabletech/spark-k8s-operator/pull/553
Expand Down
2 changes: 1 addition & 1 deletion crate-hashes.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

583 changes: 582 additions & 1 deletion deploy/helm/spark-k8s-operator/crds/crds.yaml

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions deploy/helm/spark-k8s-operator/templates/roles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ rules:
- apps
resources:
- statefulsets
- deployments
verbs:
- create
- delete
Expand Down Expand Up @@ -102,6 +103,7 @@ rules:
resources:
- sparkapplications
- sparkhistoryservers
- sparkconnectservers
verbs:
- get
- list
Expand All @@ -111,6 +113,7 @@ rules:
- spark.stackable.tech
resources:
- sparkapplications/status
- sparkconnectservers/status
verbs:
- patch
- apiGroups:
Expand Down
44 changes: 44 additions & 0 deletions docs/modules/spark-k8s/examples/example-spark-connect.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkConnectServer
metadata:
name: spark-connect # <1>
spec:
image:
productVersion: "3.5.5" # <2>
pullPolicy: IfNotPresent
args:
- "--package org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1" # <3>
server:
podOverrides:
spec:
containers:
- name: spark
env:
- name: DEMO_GREETING # <4>
value: "Hello"
jvmArgumentOverrides:
add:
- -Dmy.custom.jvm.arg=customValue # <5>
config:
logging:
enableVectorAgent: False
containers:
spark:
custom:
configMap: spark-connect-log-config # <6>
configOverrides:
spark-defaults.conf:
spark.driver.cores: "3" # <7>
executor:
configOverrides:
spark-defaults.conf:
spark.executor.memoryOverhead: "1m" # <8>
spark.executor.instances: "3"
config:
logging:
enableVectorAgent: False
containers:
spark:
custom:
configMap: spark-connect-log-config
59 changes: 59 additions & 0 deletions docs/modules/spark-k8s/pages/usage-guide/spark-connect.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
= Spark Connect
:description: Set up a Spark Connect Server with Kubernetes as distributed execution engine with an external service to be used by clients
:page-aliases: spark_connect.adoc

WARNING: Support for Apache Spark Connect is considered experimental and is subject to change in future releases. Spark Connect is a young technology and there are important questions to be answered yet, mostly related to security and multi-tenancy.

Apache Spark Connect is a remote procedure call (RPC) server that allows clients to run Spark applications on a remote cluster. Clients can connect to the Spark Connect server using a variety of programming languages, editors and IDEs without needing to install Spark locally.

The Stackable Spark operator can set up Spark Connect servers backed by Kubernetes as a distributed execution engine.

== Deployment

The example below demonstrates how to set up a Spark Connect server and apply some customizations.

[source,yaml]
----
include::example$example-spark-connect.yaml[]
----

<1> The name of the Spark Connect server.
<2> Version of the Spark Connect server.
<3> Additional package to install when starting the Spark Connect server and executors.
<4> Environment variable to be created via `podOverrides`. Alternatively, the environment variable can be set in the `spec.server.envOverrides` section.
<5> Additional argument to be passed to the Spark Connect JVM settings. Do not use this to tweak heap settings. Use `spec.server.jvmOptions` instead.
<6> A custom log4j configuration file to be used by the Spark Connect server. The config map must have an entry called `log4j.properties`.
<7> Customize the driver properties in the `server` role. The number of cores here is not related to Kubernetes cores!
<8> Customize `spark.executor.\*` and `spark.kubernetes.executor.*` in the `executor` role.

== Metrics

The server pod exposes Prometheus metrics at the following endpoints:

* `/metrics/prometheus` for driver instances.
* `/metrics/executors/prometheus` for executor instances.

To customize the metrics configuration use the `spec.server.configOverrides' like this:

```
spec:
server:
configOverrides:
metrics.properties:
applications.sink.prometheusServlet.path: "/metrics/applications/prometheus"
```

The example above adds a new endpoint for application metrics.

== Notable Omissions

The following features are not supported by the Stackable Spark operator yet

* Integration with the Spark History Server.
* Authorization and authentication. Currently, anyone with access to the Spark Connect service can run jobs.
* Volumes and volume mounts can be added only with pod overrides.
* Job dependencies must be provisioned as custom images or via `--packages` or `--jars` arguments.

== Known Issues

* Dynamically provisioning the iceberg runtime leads to "iceberg.SparkWrite$WriterFactory" ClassNotfoundException when attempting to use it from clients.
1 change: 1 addition & 0 deletions docs/modules/spark-k8s/partials/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
** xref:spark-k8s:usage-guide/security.adoc[]
** xref:spark-k8s:usage-guide/logging.adoc[]
** xref:spark-k8s:usage-guide/history-server.adoc[]
** xref:spark-k8s:usage-guide/spark-connect.adoc[]
** xref:spark-k8s:usage-guide/examples.adoc[]
** xref:spark-k8s:usage-guide/overrides.adoc[]
** xref:spark-k8s:usage-guide/operations/index.adoc[]
Expand Down
152 changes: 152 additions & 0 deletions rust/operator-binary/src/connect/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
use std::collections::{BTreeMap, HashMap};

use product_config::writer::to_java_properties_string;
use snafu::{ResultExt, Snafu};
use stackable_operator::{
kvp::ObjectLabels,
role_utils::{JavaCommonConfig, JvmArgumentOverrides},
};
use strum::Display;

use super::crd::CONNECT_EXECUTOR_ROLE_NAME;
use crate::{
connect::crd::{
CONNECT_CONTROLLER_NAME, CONNECT_SERVER_ROLE_NAME, DUMMY_SPARK_CONNECT_GROUP_NAME,
},
crd::constants::{APP_NAME, OPERATOR_NAME},
};

#[derive(Snafu, Debug)]
#[allow(clippy::enum_variant_names)]
pub enum Error {
#[snafu(display("failed to merge jvm argument overrides"))]
MergeJvmArgumentOverrides {
source: stackable_operator::role_utils::Error,
},

#[snafu(display("failed to serialize spark properties"))]
SparkProperties {
source: product_config::writer::PropertiesWriterError,
},

#[snafu(display("failed to serialize jvm security properties",))]
JvmSecurityProperties {
source: product_config::writer::PropertiesWriterError,
},
}

pub(crate) fn labels<'a, T>(
scs: &'a T,
app_version_label: &'a str,
role: &'a str,
) -> ObjectLabels<'a, T> {
ObjectLabels {
owner: scs,
app_name: APP_NAME,
app_version: app_version_label,
operator_name: OPERATOR_NAME,
controller_name: CONNECT_CONTROLLER_NAME,
role,
role_group: DUMMY_SPARK_CONNECT_GROUP_NAME,
}
}

// The dead code annotation is to shut up complains about missing Executor instantiations
// These will come in the future.
#[allow(dead_code)]
#[derive(Clone, Debug, Display)]
#[strum(serialize_all = "lowercase")]
pub(crate) enum SparkConnectRole {
Server,
Executor,
}

pub(crate) fn object_name(stacklet_name: &str, role: SparkConnectRole) -> String {
match role {
SparkConnectRole::Server => format!("{}-{}", stacklet_name, CONNECT_SERVER_ROLE_NAME),
SparkConnectRole::Executor => format!("{}-{}", stacklet_name, CONNECT_EXECUTOR_ROLE_NAME),
}
}

// Returns the jvm arguments a user has provided merged with the operator props.
pub(crate) fn jvm_args(
jvm_args: &[String],
user_java_config: Option<&JavaCommonConfig>,
) -> Result<String, Error> {
if let Some(user_jvm_props) = user_java_config {
let operator_generated = JvmArgumentOverrides::new_with_only_additions(jvm_args.to_vec());
let mut user_jvm_props_copy = user_jvm_props.jvm_argument_overrides.clone();
user_jvm_props_copy
.try_merge(&operator_generated)
.context(MergeJvmArgumentOverridesSnafu)?;
Ok(user_jvm_props_copy
.effective_jvm_config_after_merging()
.join(" "))
} else {
Ok(jvm_args.join(" "))
}
}

// Merges server and executor properties and renders the contents
// of the Spark properties file.
pub(crate) fn spark_properties(
props: &[BTreeMap<String, Option<String>>; 2],
) -> Result<String, Error> {
let mut result = BTreeMap::new();
for p in props {
result.extend(p);
}
to_java_properties_string(result.into_iter()).context(SparkPropertiesSnafu)
}

pub(crate) fn security_properties(
config_overrides: Option<&HashMap<String, String>>,
) -> Result<String, Error> {
let mut result: BTreeMap<String, Option<String>> = [
(
"networkaddress.cache.ttl".to_string(),
Some("30".to_string()),
),
(
"networkaddress.cache.negative.ttl".to_string(),
Some("0".to_string()),
),
]
.into();

if let Some(user_config) = config_overrides {
result.extend(
user_config
.iter()
.map(|(k, v)| (k.clone(), Some(v.clone()))),
);
}

to_java_properties_string(result.iter()).context(JvmSecurityPropertiesSnafu)
}

pub(crate) fn metrics_properties(
config_overrides: Option<&HashMap<String, String>>,
) -> Result<String, Error> {
let mut result: BTreeMap<String, Option<String>> = [
(
"*.sink.prometheusServlet.class".to_string(),
Some("org.apache.spark.metrics.sink.PrometheusServlet".to_string()),
),
(
"*.sink.prometheusServlet.path".to_string(),
Some("/metrics/prometheus".to_string()),
),
]
.into();

if let Some(user_config) = config_overrides {
result.extend(
user_config
.iter()
.map(|(k, v)| (k.clone(), Some(v.clone()))),
);
}

to_java_properties_string(result.iter()).context(JvmSecurityPropertiesSnafu)
}
Loading