Skip to content

Commit ed452e2

Browse files
committed
Use RoleGroup for executors to make replicas on the same level as executor configuration.
1 parent 1831437 commit ed452e2

File tree

4 files changed

+112
-228
lines changed

4 files changed

+112
-228
lines changed

deploy/helm/spark-k8s-operator/crds/crds.yaml

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4115,11 +4115,6 @@ spec:
41154115
nullable: true
41164116
type: boolean
41174117
type: object
4118-
replicas:
4119-
format: uint16
4120-
minimum: 0.0
4121-
nullable: true
4122-
type: integer
41234118
resources:
41244119
default:
41254120
memory:
@@ -6989,6 +6984,42 @@ spec:
69896984
type: array
69906985
type: object
69916986
type: object
6987+
replicas:
6988+
format: uint16
6989+
minimum: 0.0
6990+
nullable: true
6991+
type: integer
6992+
selector:
6993+
description: A label selector is a label query over a set of resources. The result of matchLabels and matchExpressions are ANDed. An empty label selector matches all objects. A null label selector matches no objects.
6994+
nullable: true
6995+
properties:
6996+
matchExpressions:
6997+
description: matchExpressions is a list of label selector requirements. The requirements are ANDed.
6998+
items:
6999+
description: A label selector requirement is a selector that contains values, a key, and an operator that relates the key and values.
7000+
properties:
7001+
key:
7002+
description: key is the label key that the selector applies to.
7003+
type: string
7004+
operator:
7005+
description: operator represents a key's relationship to a set of values. Valid operators are In, NotIn, Exists and DoesNotExist.
7006+
type: string
7007+
values:
7008+
description: values is an array of string values. If the operator is In or NotIn, the values array must be non-empty. If the operator is Exists or DoesNotExist, the values array must be empty. This array is replaced during a strategic merge patch.
7009+
items:
7010+
type: string
7011+
type: array
7012+
required:
7013+
- key
7014+
- operator
7015+
type: object
7016+
type: array
7017+
matchLabels:
7018+
additionalProperties:
7019+
type: string
7020+
description: matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels map is equivalent to an element of matchExpressions, whose key field is "key", the operator is "In", and the values array contains only "value". The requirements are ANDed.
7021+
type: object
7022+
type: object
69927023
type: object
69937024
image:
69947025
nullable: true

rust/crd/src/lib.rs

Lines changed: 33 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,9 @@ pub struct SparkApplicationSpec {
115115
#[serde(default, skip_serializing_if = "Option::is_none")]
116116
pub job: Option<CommonConfiguration<SubmitConfigFragment>>,
117117
#[serde(default, skip_serializing_if = "Option::is_none")]
118-
pub driver: Option<CommonConfiguration<DriverConfigFragment>>,
118+
pub driver: Option<CommonConfiguration<RoleConfigFragment>>,
119119
#[serde(default, skip_serializing_if = "Option::is_none")]
120-
pub executor: Option<CommonConfiguration<ExecutorConfigFragment>>,
120+
pub executor: Option<RoleGroup<RoleConfigFragment>>,
121121
#[serde(default, skip_serializing_if = "Option::is_none")]
122122
pub stopped: Option<bool>,
123123
#[serde(default, skip_serializing_if = "Option::is_none")]
@@ -486,12 +486,8 @@ impl SparkApplication {
486486
&mut submit_conf,
487487
)?;
488488

489-
if let Some(CommonConfiguration {
490-
config:
491-
ExecutorConfigFragment {
492-
replicas: Some(replicas),
493-
..
494-
},
489+
if let Some(RoleGroup {
490+
replicas: Some(replicas),
495491
..
496492
}) = &self.spec.executor
497493
{
@@ -566,31 +562,35 @@ impl SparkApplication {
566562
}
567563
}
568564

569-
pub fn driver_config(&self) -> Result<DriverConfig, Error> {
565+
pub fn driver_config(&self) -> Result<RoleConfig, Error> {
570566
if let Some(CommonConfiguration { mut config, .. }) = self.spec.driver.clone() {
571-
config.merge(&DriverConfig::default_config());
567+
config.merge(&RoleConfig::default_config());
572568
fragment::validate(config).context(FragmentValidationFailureSnafu)
573569
} else {
574-
fragment::validate(DriverConfig::default_config())
575-
.context(FragmentValidationFailureSnafu)
570+
fragment::validate(RoleConfig::default_config()).context(FragmentValidationFailureSnafu)
576571
}
577572
}
578573

579-
pub fn executor_config(&self) -> Result<ExecutorConfig, Error> {
580-
if let Some(CommonConfiguration { mut config, .. }) = self.spec.executor.clone() {
581-
config.merge(&ExecutorConfig::default_config());
574+
pub fn executor_config(&self) -> Result<RoleConfig, Error> {
575+
if let Some(RoleGroup {
576+
config: CommonConfiguration { mut config, .. },
577+
..
578+
}) = self.spec.executor.clone()
579+
{
580+
config.merge(&RoleConfig::default_config());
582581
fragment::validate(config).context(FragmentValidationFailureSnafu)
583582
} else {
584-
fragment::validate(ExecutorConfig::default_config())
585-
.context(FragmentValidationFailureSnafu)
583+
fragment::validate(RoleConfig::default_config()).context(FragmentValidationFailureSnafu)
586584
}
587585
}
588586

589587
pub fn pod_overrides(&self, role: SparkApplicationRole) -> Option<PodTemplateSpec> {
590588
match role {
591589
SparkApplicationRole::Submit => self.spec.job.clone().map(|j| j.pod_overrides),
592590
SparkApplicationRole::Driver => self.spec.driver.clone().map(|d| d.pod_overrides),
593-
SparkApplicationRole::Executor => self.spec.executor.clone().map(|e| e.pod_overrides),
591+
SparkApplicationRole::Executor => {
592+
self.spec.executor.clone().map(|r| r.config.pod_overrides)
593+
}
594594
}
595595
}
596596

@@ -612,17 +612,21 @@ impl SparkApplication {
612612
self.spec.driver.as_ref().unwrap().clone()
613613
} else {
614614
CommonConfiguration {
615-
config: DriverConfig::default_config(),
615+
config: RoleConfig::default_config(),
616616
..CommonConfiguration::default()
617617
}
618618
};
619619

620620
let executor_conf = if self.spec.executor.is_some() {
621621
self.spec.executor.as_ref().unwrap().clone()
622622
} else {
623-
CommonConfiguration {
624-
config: ExecutorConfig::default_config(),
625-
..CommonConfiguration::default()
623+
RoleGroup {
624+
replicas: Some(1),
625+
config: CommonConfiguration {
626+
config: RoleConfig::default_config(),
627+
..CommonConfiguration::default()
628+
},
629+
selector: None,
626630
}
627631
};
628632

@@ -679,19 +683,8 @@ impl SparkApplication {
679683
PropertyNameKind::File(JVM_SECURITY_PROPERTIES_FILE.to_string()),
680684
],
681685
Role {
682-
config: executor_conf.clone(),
683-
role_groups: [(
684-
"default".to_string(),
685-
RoleGroup {
686-
config: executor_conf,
687-
// This is a dummy value needed to be able to build the RoleGroup
688-
// object. ExecutorConfig.replicas is the true value used to set the
689-
// number of executor pods.
690-
replicas: Some(1),
691-
selector: None,
692-
},
693-
)]
694-
.into(),
686+
config: executor_conf.config.clone(),
687+
role_groups: [("default".to_string(), executor_conf)].into(),
695688
}
696689
.erase(),
697690
),
@@ -780,7 +773,7 @@ fn subtract_spark_memory_overhead(for_java: bool, limit: &Quantity) -> Result<St
780773
/// Spark will use these and *ignore* the resource limits in pod templates entirely.
781774
fn resources_to_driver_props(
782775
for_java: bool,
783-
driver_config: &DriverConfig,
776+
driver_config: &RoleConfig,
784777
props: &mut BTreeMap<String, String>,
785778
) -> Result<(), Error> {
786779
if let Resources {
@@ -832,7 +825,7 @@ fn resources_to_driver_props(
832825
/// Spark will use these and *ignore* the resource limits in pod templates entirely.
833826
fn resources_to_executor_props(
834827
for_java: bool,
835-
executor_config: &ExecutorConfig,
828+
executor_config: &RoleConfig,
836829
props: &mut BTreeMap<String, String>,
837830
) -> Result<(), Error> {
838831
if let Resources {
@@ -885,7 +878,7 @@ fn resources_to_executor_props(
885878

886879
#[cfg(test)]
887880
mod tests {
888-
use crate::{cores_from_quantity, resources_to_executor_props, DriverConfig, ExecutorConfig};
881+
use crate::{cores_from_quantity, resources_to_executor_props, RoleConfig};
889882
use crate::{resources_to_driver_props, SparkApplication};
890883
use crate::{Quantity, SparkStorageConfig};
891884
use stackable_operator::commons::affinity::StackableAffinity;
@@ -1014,7 +1007,7 @@ mod tests {
10141007

10151008
#[test]
10161009
fn test_resource_to_driver_props() {
1017-
let driver_config = DriverConfig {
1010+
let driver_config = RoleConfig {
10181011
resources: Resources {
10191012
memory: MemoryLimits {
10201013
limit: Some(Quantity("128Mi".to_string())),
@@ -1066,8 +1059,7 @@ mod tests {
10661059

10671060
#[test]
10681061
fn test_resource_to_executor_props() {
1069-
let executor_config = ExecutorConfig {
1070-
replicas: Some(3),
1062+
let executor_config = RoleConfig {
10711063
resources: Resources {
10721064
memory: MemoryLimits {
10731065
limit: Some(Quantity("512Mi".to_string())),

0 commit comments

Comments
 (0)