Skip to content

docs: Document resource parsing with examples #297

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ All notable changes to this project will be documented in this file.
- [BREAKING] use product image selection instead of version ([#275]).
- [BREAKING] refactored application roles to use `CommonConfiguration` structures from the operator framework ([#277]).
- Let secret-operator handle certificate conversion ([#286]).
- Extended resource-usage documentation ([#297]).

### Fixed

Expand All @@ -39,6 +40,7 @@ All notable changes to this project will be documented in this file.
[#286]: https://github.com/stackabletech/spark-k8s-operator/pull/286
[#288]: https://github.com/stackabletech/spark-k8s-operator/pull/288
[#291]: https://github.com/stackabletech/spark-k8s-operator/pull/291
[#297]: https://github.com/stackabletech/spark-k8s-operator/pull/297

## [23.7.0] - 2023-07-14

Expand Down
151 changes: 148 additions & 3 deletions docs/modules/spark-k8s/pages/usage-guide/resources.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

include::home:concepts:stackable_resource_requests.adoc[]

If no resources are configured explicitly, the operator uses the following defaults for `SparkApplication`s:
If no resources are configured explicitly, the operator uses the following defaults for `SparkApplication` resources:

[source,yaml]
----
Expand All @@ -29,7 +29,7 @@ executor:
min: '250m'
max: "1"
memory:
limit: '4Gi'
limit: '1Gi'
----

For `SparkHistoryServer`s the following defaults are used:
Expand All @@ -50,4 +50,149 @@ For more details regarding Kubernetes CPU limits see: https://kubernetes.io/docs

Spark allocates a default amount of non-heap memory based on the type of job (JVM or non-JVM). This is taken into account when defining memory settings based exclusively on the resource limits, so that the "declared" value is the actual total value (i.e. including memory overhead). This may result in minor deviations from the stated resource value due to rounding differences.

NOTE: It is possible to define Spark resources either directly by setting configuration properties listed under `sparkConf`, or by using resource limits. If both are used, then `sparkConf` properties take precedence. It is recommended for the sake of clarity to use *_either_* one *_or_* the other.
NOTE: It is possible to define Spark resources either directly by setting configuration properties listed under `sparkConf`, or by using resource limits. If both are used, then `sparkConf` properties take precedence. It is recommended for the sake of clarity to use *_either_* one *_or_* the other. See below for examples.

== Resource examples

To illustrate resource configuration consider the use-case where resources are defined using CRD fields (which are then parsed internally to be passed to Spark as spark.conf settings).

=== CPU

CPU request and limit will be rounded up to the next integer value, resulting in the following:


|===
|CRD |Spark conf

|1800m
|2

|100m
|1

|1.5
|2

|2
|2
|===

Spark allows CPU limits to be set for the driver and executor using standard Spark settings (`spark.{driver|executor}.cores}`) as well as Kubernetes-specific ones (`spark.kubernetes.{driver,executor}.{request|limit}.cores`). Since `spark.kubernetes.{driver,executor}.request.cores` takes precedence over `spark.{driver|executor}.cores}`, `spark.{driver|executor}.cores}` is not specified by the operator when building the spark-submit configuration.

=== Memory

Memory values are not rounded as is the case with CPU. Values for `spark.{driver|executor}.memory}` - this is the amount of memory to use for the driver process (i.e. where SparkContext is initialized) and executor processes respectively - are passed to Spark in such as a way that the overheads added by Spark are already implicitly declared: this overhead will be applied using a factor of 0.1 (JVM jobs) or 0.4 (non-JVM jobs), being not less than 384MB, the minimum overhead applied by Spark. Once the overhead is applied, the effective value is the one defined by the user. This keeps the values transparent: what is defined in the CRD is what is actually provisioned for the process.

An alternative is to do define the spark.conf settings explicitly and then let Spark apply the overheads to those values.

=== Example

A SparkApplication defines the following resources:

[source,yaml]
----
...
job:
config:
resources:
cpu:
min: 250m # <1>
max: 500m # <2>
memory:
limit: 512Mi # <3>
driver:
config:
resources:
cpu:
min: 200m # <4>
max: 1200m # <5>
memory:
limit: 1024Mi # <6>
executor:
config:
resources:
cpu:
min: 250m # <7>
max: 1000m # <8>
memory:
limit: 1024Mi # <9>
...
----

This will result in the following Pod definitions:

For the job:

[source,yaml]
----
spec:
containers:
- name: spark-submit
resources:
limits:
cpu: 500m # <2>
memory: 512Mi # <3>
requests:
cpu: 250m # <1>
memory: 512Mi # <3>
----

For the driver:

[source,yaml]
----
spec:
containers:
- name: spark
resources:
limits:
cpu: "2" # <5>
memory: 1Gi # <6>
requests:
cpu: "1" # <4>
memory: 1Gi # <6>
----

For each executor:

[source,yaml]
----
spec:
containers:
- name: spark
limits:
cpu: "1" # <7>
memory: 1Gi # <9>
requests:
cpu: "1" # <8>
memory: 1Gi # <9>
----

<1> CPU request (unchanged as this is the Job pod)
<2> CPU limit (unchanged as this is the Job pod)
<3> Memory is assigned to both request and limit values
<4> CPU request, rounded up from `200m` to `1`
<5> CPU limit, rounded up from `1200m` to `2`
<6> Memory after reduction and re-addition of Spark overhead (so the declared value matches what is provisioned)
<7> CPU request, rounded up from `250m` to `1`
<8> CPU limit, unchanged after rounding: `1000m` to `1`
<9> Memory after reduction and re-addition of Spark overhead (so the declared value matches what is provisioned)

The spark.conf values derived from the above can be inspected in the job Pod definition:

[source]
----
...
--conf "spark.driver.cores=1"
--conf "spark.driver.memory=640m"
--conf "spark.executor.cores=1"
--conf "spark.executor.memory=640m"
--conf "spark.kubernetes.driver.limit.cores=1"
--conf "spark.kubernetes.driver.request.cores=2"
--conf "spark.kubernetes.executor.limit.cores=1"
--conf "spark.kubernetes.executor.request.cores=1"
--conf "spark.kubernetes.memoryOverheadFactor=0.0"
...
----

These correspond to the resources listed above for the job/driver/executor Pods, with the exception of `spark.{driver|executor}.memory` where indeed the Spark internal overhead of 384MB has been deducted from 1024MB.
84 changes: 20 additions & 64 deletions rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -815,18 +815,21 @@ fn resources_to_driver_props(
props: &mut BTreeMap<String, String>,
) -> Result<(), Error> {
if let Resources {
cpu: CpuLimits { max: Some(max), .. },
cpu: CpuLimits {
min: Some(min),
max: Some(max),
},
..
} = &driver_config.resources
{
let cores = cores_from_quantity(max.0.clone())?;
let min_cores = cores_from_quantity(min.0.clone())?;
let max_cores = cores_from_quantity(max.0.clone())?;
// will have default value from resources to apply if nothing set specifically
props.insert("spark.driver.cores".to_string(), cores.clone());
props.insert(
"spark.kubernetes.driver.request.cores".to_string(),
cores.clone(),
min_cores,
);
props.insert("spark.kubernetes.driver.limit.cores".to_string(), cores);
props.insert("spark.kubernetes.driver.limit.cores".to_string(), max_cores);
}

if let Resources {
Expand All @@ -838,22 +841,6 @@ fn resources_to_driver_props(
{
let memory = subtract_spark_memory_overhead(for_java, limit)?;
props.insert("spark.driver.memory".to_string(), memory);

let limit_mb = format!(
"{}m",
MemoryQuantity::try_from(limit)
.context(FailedToConvertJavaHeapSnafu {
unit: BinaryMultiple::Mebi.to_java_memory_unit(),
})?
.scale_to(BinaryMultiple::Mebi)
.floor()
.value as u32
);
props.insert(
"spark.kubernetes.driver.request.memory".to_string(),
limit_mb.clone(),
);
props.insert("spark.kubernetes.driver.limit.memory".to_string(), limit_mb);
}

Ok(())
Expand All @@ -867,18 +854,24 @@ fn resources_to_executor_props(
props: &mut BTreeMap<String, String>,
) -> Result<(), Error> {
if let Resources {
cpu: CpuLimits { max: Some(max), .. },
cpu: CpuLimits {
min: Some(min),
max: Some(max),
},
..
} = &executor_config.resources
{
let cores = cores_from_quantity(max.0.clone())?;
let min_cores = cores_from_quantity(min.0.clone())?;
let max_cores = cores_from_quantity(max.0.clone())?;
// will have default value from resources to apply if nothing set specifically
props.insert("spark.executor.cores".to_string(), cores.clone());
props.insert(
"spark.kubernetes.executor.request.cores".to_string(),
cores.clone(),
min_cores,
);
props.insert(
"spark.kubernetes.executor.limit.cores".to_string(),
max_cores,
);
props.insert("spark.kubernetes.executor.limit.cores".to_string(), cores);
}

if let Resources {
Expand All @@ -890,25 +883,6 @@ fn resources_to_executor_props(
{
let memory = subtract_spark_memory_overhead(for_java, limit)?;
props.insert("spark.executor.memory".to_string(), memory);

let limit_mb = format!(
"{}m",
MemoryQuantity::try_from(limit)
.context(FailedToConvertJavaHeapSnafu {
unit: BinaryMultiple::Mebi.to_java_memory_unit(),
})?
.scale_to(BinaryMultiple::Mebi)
.floor()
.value as u32
);
props.insert(
"spark.kubernetes.executor.request.memory".to_string(),
limit_mb.clone(),
);
props.insert(
"spark.kubernetes.executor.limit.memory".to_string(),
limit_mb,
);
}

Ok(())
Expand Down Expand Up @@ -1070,24 +1044,15 @@ mod tests {
resources_to_driver_props(true, &driver_config, &mut props).expect("blubb");

let expected: BTreeMap<String, String> = vec![
("spark.driver.cores".to_string(), "1".to_string()),
("spark.driver.memory".to_string(), "128m".to_string()),
(
"spark.kubernetes.driver.limit.cores".to_string(),
"1".to_string(),
),
(
"spark.kubernetes.driver.limit.memory".to_string(),
"128m".to_string(),
),
(
"spark.kubernetes.driver.request.cores".to_string(),
"1".to_string(),
),
(
"spark.kubernetes.driver.request.memory".to_string(),
"128m".to_string(),
),
]
.into_iter()
.collect();
Expand Down Expand Up @@ -1122,19 +1087,10 @@ mod tests {
resources_to_executor_props(true, &executor_config, &mut props).expect("blubb");

let expected: BTreeMap<String, String> = vec![
("spark.executor.cores".to_string(), "2".to_string()),
("spark.executor.memory".to_string(), "128m".to_string()), // 128 and not 512 because memory overhead is subtracted
(
"spark.kubernetes.executor.limit.memory".to_string(),
"512m".to_string(),
),
(
"spark.kubernetes.executor.request.cores".to_string(),
"2".to_string(),
),
(
"spark.kubernetes.executor.request.memory".to_string(),
"512m".to_string(),
"1".to_string(),
),
(
"spark.kubernetes.executor.limit.cores".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion tests/templates/kuttl/resources/10-assert.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ spec:
cpu: "2"
memory: 1Gi
requests:
cpu: "2"
cpu: "1"
memory: 1Gi
---
apiVersion: v1
Expand Down
2 changes: 0 additions & 2 deletions tests/templates/kuttl/resources/12-deploy-spark-app.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@ spec:
spark.kubernetes.executor.podNamePrefix: "resources-sparkconf"
spark.kubernetes.driver.request.cores: "1"
spark.kubernetes.driver.limit.cores: "1"
spark.driver.cores: "1"
spark.driver.memory: "1g"
spark.driver.memoryOverheadFactor: "0.4"
spark.kubernetes.executor.request.cores: "1"
spark.kubernetes.executor.limit.cores: "2"
spark.executor.cores: "2"
spark.executor.memory: "2g"
spark.executor.memoryOverheadFactor: "0.4"
spark.executor.instances: "1"
Expand Down