Skip to content

[Merged by Bors] - Resources limits #147

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 231 additions & 19 deletions deploy/crd/sparkapplication.crd.yaml

Large diffs are not rendered by default.

250 changes: 231 additions & 19 deletions deploy/helm/spark-k8s-operator/crds/crds.yaml

Large diffs are not rendered by default.

250 changes: 231 additions & 19 deletions deploy/manifests/crds.yaml

Large diffs are not rendered by default.

29 changes: 23 additions & 6 deletions docs/modules/ROOT/examples/example-sparkapp-image.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,28 @@ spec:
- tabulate==0.8.9 # <4>
sparkConf: # <5>
"spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"
job:
config:
resources:
cpu:
min: "1"
max: "1"
memory:
limit: "512m"
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
config:
resources:
cpu:
min: "1"
max: "1200m"
memory:
limit: "512m"
executor:
cores: 1
instances: 3
memory: "512m"
config:
instances: 3
resources:
cpu:
min: "1"
max: "1"
memory:
limit: "512m"
124 changes: 108 additions & 16 deletions rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@ use std::collections::{BTreeMap, HashMap};

use serde::{Deserialize, Serialize};
use snafu::{OptionExt, Snafu};
use stackable_operator::commons::resources::{
CpuLimits, MemoryLimits, NoRuntimeLimits, PvcConfig, Resources,
};
use stackable_operator::kube::ResourceExt;
use stackable_operator::labels;
use stackable_operator::{
config::merge::Merge,
k8s_openapi::apimachinery::pkg::api::resource::Quantity,
kube::CustomResource,
role_utils::CommonConfiguration,
schemars::{self, JsonSchema},
Expand Down Expand Up @@ -48,6 +53,41 @@ pub struct SparkApplicationStatus {
pub phase: String,
}

#[derive(Clone, Debug, Default, Deserialize, Merge, JsonSchema, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SparkStorageConfig {
#[serde(default)]
pub data: PvcConfig,
}

#[derive(Clone, Debug, Default, Deserialize, JsonSchema, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SparkConfig {
pub resources: Option<Resources<SparkStorageConfig, NoRuntimeLimits>>,
}

impl SparkConfig {
fn default_resources() -> Resources<SparkStorageConfig, NoRuntimeLimits> {
Resources {
cpu: CpuLimits {
min: Some(Quantity("200m".to_owned())),
max: Some(Quantity("4".to_owned())),
},
memory: MemoryLimits {
limit: Some(Quantity("2Gi".to_owned())),
runtime_limits: NoRuntimeLimits {},
},
storage: SparkStorageConfig {
data: PvcConfig {
capacity: Some(Quantity("2Gi".to_owned())),
storage_class: None,
selectors: None,
},
},
}
}
}

#[derive(Clone, CustomResource, Debug, Default, Deserialize, JsonSchema, PartialEq, Serialize)]
#[kube(
group = "spark.stackable.tech",
Expand Down Expand Up @@ -81,6 +121,8 @@ pub struct SparkApplicationSpec {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub spark_image_pull_secrets: Option<Vec<LocalObjectReference>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub job: Option<SparkConfig>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub driver: Option<DriverConfig>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub executor: Option<ExecutorConfig>,
Expand Down Expand Up @@ -411,6 +453,22 @@ impl SparkApplication {
.as_ref()
.and_then(|executor_config| executor_config.node_selector.clone())
}

pub fn resolve_resource_config(
&self,
) -> Option<Resources<SparkStorageConfig, NoRuntimeLimits>> {
let mut conf = SparkConfig::default_resources();

if let Some(resources) = &self
.spec
.job
.as_ref()
.and_then(|spark_config| spark_config.resources.as_ref())
{
conf.merge(resources);
}
Some(conf)
}
}

#[derive(Clone, Debug, Default, Deserialize, Eq, JsonSchema, PartialEq, Serialize)]
Expand All @@ -425,9 +483,8 @@ pub struct CommonConfig {
#[derive(Clone, Debug, Default, Deserialize, JsonSchema, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct DriverConfig {
pub cores: Option<usize>,
pub core_limit: Option<String>,
pub memory: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub spark_config: Option<SparkConfig>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub volume_mounts: Option<Vec<VolumeMount>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
Expand All @@ -437,27 +494,42 @@ pub struct DriverConfig {
impl DriverConfig {
pub fn spark_config(&self) -> Vec<String> {
let mut cmd = vec![];
if let Some(cores) = &self.cores {
cmd.push(format!("--conf spark.driver.cores={cores}"));
let mut conf = SparkConfig::default_resources();

if let Some(resources) = &self
.spark_config
.clone()
.and_then(|spark_config| spark_config.resources)
{
conf.merge(resources);
}
if let Some(memory) = &conf.memory.limit {
let memory = &memory.0;
cmd.push(format!("--conf spark.driver.memory={memory}"));
}
if let Some(core_limit) = &self.core_limit {
if let Some(cpu_min) = &conf.cpu.min {
let cpu_min = &cpu_min.0;
cmd.push(format!(
"--conf spark.kubernetes.executor.limit.cores={core_limit}"
"--conf spark.kubernetes.driver.request.cores={cpu_min}"
));
}
if let Some(memory) = &self.memory {
cmd.push(format!("--conf spark.driver.memory={memory}"));
if let Some(cpu_max) = &conf.cpu.max {
let cpu_max = &cpu_max.0;
cmd.push(format!(
"--conf spark.kubernetes.driver.limit.cores={cpu_max}"
));
}

cmd
}
}

#[derive(Clone, Debug, Default, Deserialize, JsonSchema, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ExecutorConfig {
pub cores: Option<usize>,
pub instances: Option<usize>,
pub memory: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub spark_config: Option<SparkConfig>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub volume_mounts: Option<Vec<VolumeMount>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
Expand All @@ -467,15 +539,35 @@ pub struct ExecutorConfig {
impl ExecutorConfig {
pub fn spark_config(&self) -> Vec<String> {
let mut cmd = vec![];
if let Some(cores) = &self.cores {
cmd.push(format!("--conf spark.executor.cores={cores}"));
let mut conf = SparkConfig::default_resources();

if let Some(resources) = &self
.spark_config
.clone()
.and_then(|spark_config| spark_config.resources)
{
conf.merge(resources);
}
if let Some(memory) = &conf.memory.limit {
let memory = &memory.0;
cmd.push(format!("--conf spark.executor.memory={memory}"));
}
if let Some(cpu_min) = &conf.cpu.min {
let cpu_min = &cpu_min.0;
cmd.push(format!(
"--conf spark.kubernetes.executor.request.cores={cpu_min}"
));
}
if let Some(cpu_max) = &conf.cpu.max {
let cpu_max = &cpu_max.0;
cmd.push(format!(
"--conf spark.kubernetes.executor.limit.cores={cpu_max}"
));
}
if let Some(instances) = &self.instances {
cmd.push(format!("--conf spark.executor.instances={instances}"));
}
if let Some(memory) = &self.memory {
cmd.push(format!("--conf spark.executor.memory={memory}"));
}

cmd
}
}
Expand Down
7 changes: 7 additions & 0 deletions rust/operator-binary/src/spark_k8s_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ pub enum Error {
S3TlsNoVerificationNotSupported,
#[snafu(display("ca-cert verification not supported"))]
S3TlsCaVerificationNotSupported,
#[snafu(display("failed to resolve and merge resource config"))]
FailedToResolveResourceConfig,
}

type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -327,8 +329,13 @@ fn spark_job(
volume_mounts.extend(spark_application.driver_volume_mounts(s3bucket));

let mut cb = ContainerBuilder::new("spark-submit");
let resources = spark_application
.resolve_resource_config()
.context(FailedToResolveResourceConfigSnafu)?;

cb.image(spark_image)
.command(vec!["/bin/sh".to_string()])
.resources(resources.into())
.args(vec!["-c".to_string(), job_commands.join(" ")])
.add_volume_mounts(volume_mounts)
.add_env_vars(env.to_vec())
Expand Down
29 changes: 23 additions & 6 deletions tests/templates/kuttl/spark-examples/10-deploy-spark-app.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,28 @@ spec:
mode: cluster
mainClass: org.apache.spark.examples.SparkALS
mainApplicationFile: local:///stackable/spark/examples/jars/spark-examples_2.12-{{ test_scenario['values']['spark'] }}.jar
job:
config:
resources:
cpu:
min: "1"
max: "1200m"
memory:
limit: "512m"
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
config:
resources:
cpu:
min: "1"
max: "1300m"
memory:
limit: "512m"
executor:
cores: 1
instances: 1
memory: "512m"
config:
instances: 1
resources:
cpu:
min: "500m"
max: "1200m"
memory:
limit: "1Gi"