Skip to content

fix: Pass-through envOverrides #451

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ All notable changes to this project will be documented in this file.
- `volumes`
- `volumeMounts`

### Fixed

- Fix envOverrides for spark cluster and history server ([#451]).

[#450]: https://github.com/stackabletech/spark-k8s-operator/pull/450
[#451]: https://github.com/stackabletech/spark-k8s-operator/pull/451

## [24.7.0] - 2024-07-24

Expand Down
61 changes: 61 additions & 0 deletions rust/crd/src/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,3 +363,64 @@ impl Configuration for HistoryConfigFragment {
Ok(BTreeMap::new())
}
}

#[cfg(test)]
mod test {
use super::*;
use indoc::indoc;

#[test]
pub fn test_env() {
let input = indoc! {r#"
---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkHistoryServer
metadata:
name: spark-history
spec:
image:
productVersion: 3.5.1
logFileDirectory:
s3:
prefix: eventlogs/
bucket:
reference: spark-history-s3-bucket
nodes:
envOverrides:
TEST_SPARK_HIST_VAR: ROLE
roleGroups:
default:
replicas: 1
config:
cleaner: true
envOverrides:
TEST_SPARK_HIST_VAR: ROLEGROUP
"#};

let deserializer = serde_yaml::Deserializer::from_str(input);
let history: SparkHistoryServer =
serde_yaml::with::singleton_map_recursive::deserialize(deserializer).unwrap();

assert_eq!(
Some(&"ROLE".to_string()),
history
.spec
.nodes
.config
.env_overrides
.get("TEST_SPARK_HIST_VAR")
);
assert_eq!(
Some(&"ROLEGROUP".to_string()),
history
.spec
.nodes
.role_groups
.get("default")
.unwrap()
.config
.env_overrides
.get("TEST_SPARK_HIST_VAR")
);
}
}
30 changes: 30 additions & 0 deletions rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,36 @@ impl SparkApplication {
}
}

pub fn merged_env(&self, role: SparkApplicationRole, env: &[EnvVar]) -> Vec<EnvVar> {
// use a BTree internally to enable replacement of existing keys
let mut env_vars: BTreeMap<String, EnvVar> = BTreeMap::new();

for e in env {
env_vars.insert(e.clone().name, e.to_owned());
}

if let Some(env_map) = match role {
SparkApplicationRole::Submit => self.spec.job.clone().map(|j| j.env_overrides),
SparkApplicationRole::Driver => self.spec.driver.clone().map(|d| d.env_overrides),
SparkApplicationRole::Executor => {
self.spec.executor.clone().map(|r| r.config.env_overrides)
}
} {
for (k, v) in env_map {
env_vars.insert(
k.clone(),
EnvVar {
name: k,
value: Some(v),
value_from: None,
},
);
}
}

env_vars.into_values().collect()
}

pub fn validated_role_config(
&self,
resolved_product_image: &ResolvedProductImage,
Expand Down
3 changes: 2 additions & 1 deletion rust/crd/src/s3logdir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ impl S3LogDir {
/// * spark.hadoop.fs.s3a.aws.credentials.provider
/// * spark.hadoop.fs.s3a.access.key
/// * spark.hadoop.fs.s3a.secret.key
/// instead, the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY are set
///
/// Instead, the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY are set
/// on the container start command.
pub fn history_server_spark_config(&self) -> BTreeMap<String, String> {
let mut result = BTreeMap::new();
Expand Down
102 changes: 82 additions & 20 deletions rust/operator-binary/src/history/history_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,16 @@ fn build_stateful_set(
..PodSecurityContext::default()
});

let role_group = shs
.rolegroup(rolegroupref)
.with_context(|_| CannotRetrieveRoleGroupSnafu)?;

let merged_env_vars = env_vars(
s3_log_dir,
shs.role().config.clone().env_overrides,
role_group.config.env_overrides,
);

let container_name = "spark-history";
let container = ContainerBuilder::new(container_name)
.context(InvalidContainerNameSnafu)?
Expand All @@ -449,7 +459,7 @@ fn build_stateful_set(
.args(command_args(s3_log_dir))
.add_container_port("http", 18080)
.add_container_port("metrics", METRICS_PORT.into())
.add_env_vars(env_vars(s3_log_dir))
.add_env_vars(merged_env_vars)
.add_volume_mounts(s3_log_dir.volume_mounts())
.add_volume_mount(VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_PATH_CONFIG)
.add_volume_mount(VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_LOG_CONFIG)
Expand Down Expand Up @@ -670,21 +680,33 @@ fn command_args(s3logdir: &S3LogDir) -> Vec<String> {
vec![String::from("-c"), command.join(" && ")]
}

fn env_vars(s3logdir: &S3LogDir) -> Vec<EnvVar> {
let mut vars: Vec<EnvVar> = vec![];
fn env_vars(
s3logdir: &S3LogDir,
role_env_overrides: HashMap<String, String>,
role_group_env_overrides: HashMap<String, String>,
) -> Vec<EnvVar> {
// Maps env var name to env var object. This allows env_overrides to work
// as expected (i.e. users can override the env var value).
let mut vars: BTreeMap<String, EnvVar> = BTreeMap::new();

// This env var prevents the history server from detaching itself from the
// start script because this leads to the Pod terminating immediately.
vars.push(EnvVar {
name: "SPARK_NO_DAEMONIZE".to_string(),
value: Some("true".into()),
value_from: None,
});
vars.push(EnvVar {
name: "SPARK_DAEMON_CLASSPATH".to_string(),
value: Some("/stackable/spark/extra-jars/*".into()),
value_from: None,
});
vars.insert(
"SPARK_NO_DAEMONIZE".to_string(),
EnvVar {
name: "SPARK_NO_DAEMONIZE".to_string(),
value: Some("true".into()),
value_from: None,
},
);
vars.insert(
"SPARK_DAEMON_CLASSPATH".to_string(),
EnvVar {
name: "SPARK_DAEMON_CLASSPATH".to_string(),
value: Some("/stackable/spark/extra-jars/*".into()),
value_from: None,
},
);

let mut history_opts = vec![
format!("-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}"),
Expand All @@ -693,6 +715,8 @@ fn env_vars(s3logdir: &S3LogDir) -> Vec<EnvVar> {
),
format!("-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar={METRICS_PORT}:/stackable/jmx/config.yaml")
];

// if TLS is enabled build truststore
if tlscerts::tls_secret_name(&s3logdir.bucket.connection).is_some() {
history_opts.extend(vec![
format!("-Djavax.net.ssl.trustStore={STACKABLE_TRUST_STORE}/truststore.p12"),
Expand All @@ -701,13 +725,51 @@ fn env_vars(s3logdir: &S3LogDir) -> Vec<EnvVar> {
]);
}

vars.push(EnvVar {
name: "SPARK_HISTORY_OPTS".to_string(),
value: Some(history_opts.join(" ")),
value_from: None,
});
// if TLS is enabled build truststore
vars
vars.insert(
"SPARK_HISTORY_OPTS".to_string(),
EnvVar {
name: "SPARK_HISTORY_OPTS".to_string(),
value: Some(history_opts.join(" ")),
value_from: None,
},
);

// apply the role overrides
let mut role_envs = role_env_overrides
.into_iter()
.map(|env_var| {
(
env_var.0.clone(),
EnvVar {
name: env_var.0.clone(),
value: Some(env_var.1),
value_from: None,
},
)
})
.collect();

vars.append(&mut role_envs);

// apply the role-group overrides
let mut role_group_envs = role_group_env_overrides
.into_iter()
.map(|env_var| {
(
env_var.0.clone(),
EnvVar {
name: env_var.0.clone(),
value: Some(env_var.1),
value_from: None,
},
)
})
.collect();

vars.append(&mut role_group_envs);

// convert to Vec
vars.into_values().collect()
}

fn labels<'a, T>(
Expand Down
6 changes: 4 additions & 2 deletions rust/operator-binary/src/spark_k8s_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,9 +479,10 @@ fn pod_template(
) -> Result<PodTemplateSpec> {
let container_name = SparkContainer::Spark.to_string();
let mut cb = ContainerBuilder::new(&container_name).context(IllegalContainerNameSnafu)?;
let merged_env = spark_application.merged_env(role.clone(), env);

cb.add_volume_mounts(config.volume_mounts(spark_application, s3conn, s3logdir))
.add_env_vars(env.to_vec())
.add_env_vars(merged_env)
.resources(config.resources.clone().into())
.image_from_product_image(spark_image);

Expand Down Expand Up @@ -716,13 +717,14 @@ fn spark_job(
.context(IllegalContainerNameSnafu)?;

let args = [job_commands.join(" ")];
let merged_env = spark_application.merged_env(SparkApplicationRole::Submit, env);

cb.image_from_product_image(spark_image)
.command(vec!["/bin/bash".to_string(), "-c".to_string()])
.args(vec![args.join(" && ")])
.resources(job_config.resources.clone().into())
.add_volume_mounts(spark_application.spark_job_volume_mounts(s3conn, s3logdir))
.add_env_vars(env.to_vec())
.add_env_vars(merged_env)
.add_env_var(
"SPARK_SUBMIT_OPTS",
format!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,17 @@ spec:
# For possible properties see: https://spark.apache.org/docs/latest/monitoring.html#spark-history-server-configuration-options
#sparkConf:
nodes:
envOverrides:
TEST_SPARK_HIST_VAR_ROLE: ROLE
TEST_SPARK_HIST_VAR_FROM_RG: ROLE
roleGroups:
default:
replicas: 1
config:
cleaner: true
envOverrides:
TEST_SPARK_HIST_VAR_FROM_RG: ROLEGROUP
TEST_SPARK_HIST_VAR_RG: ROLEGROUP
podOverrides:
spec:
containers:
Expand Down
10 changes: 10 additions & 0 deletions tests/templates/kuttl/overrides/07-assert.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
timeout: 30
commands:
- script: |
POD=$(kubectl -n $NAMESPACE get pod -l app.kubernetes.io/instance=spark-history -o name | head -n 1 | sed -e 's#pod/##')
kubectl -n $NAMESPACE get pod $POD -o yaml | yq '.spec.containers[0].env[] | select (.name == "TEST_SPARK_HIST_VAR_ROLE").value' | grep 'ROLE'
kubectl -n $NAMESPACE get pod $POD -o yaml | yq '.spec.containers[0].env[] | select (.name == "TEST_SPARK_HIST_VAR_RG").value' | grep 'ROLEGROUP'
kubectl -n $NAMESPACE get pod $POD -o yaml | yq '.spec.containers[0].env[] | select (.name == "TEST_SPARK_HIST_VAR_FROM_RG").value' | grep 'ROLEGROUP'
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,14 @@ spec:
prefix: eventlogs/
bucket:
reference: spark-history-s3-bucket
env:
- name: TEST_SPARK_VAR_0
value: ORIGINAL
- name: TEST_SPARK_VAR_1
value: DONOTREPLACE
job:
envOverrides: &envOverrides
TEST_SPARK_VAR_0: REPLACED
podOverrides:
spec:
containers:
Expand All @@ -37,6 +44,7 @@ spec:
cpu: 1500m
memory: 1024Mi
driver:
envOverrides: *envOverrides
podOverrides:
spec:
containers:
Expand All @@ -50,6 +58,7 @@ spec:
memory: 1024Mi
executor:
replicas: 1
envOverrides: *envOverrides
podOverrides:
spec:
containers:
Expand Down
15 changes: 15 additions & 0 deletions tests/templates/kuttl/overrides/11-assert.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
timeout: 30
commands:
- script: kubectl -n $NAMESPACE get job spark-pi-s3-1 -o yaml | yq '.spec.template.spec.containers[0].env[] | select (.name == "TEST_SPARK_VAR_0").value' | grep 'REPLACED'
- script: kubectl -n $NAMESPACE get job spark-pi-s3-1 -o yaml | yq '.spec.template.spec.containers[0].env[] | select (.name == "TEST_SPARK_VAR_1").value' | grep 'DONOTREPLACE'
- script: kubectl -n $NAMESPACE get cm spark-pi-s3-1-driver-pod-template -o json | jq -r '.data."template.yaml"' | yq '.spec.containers[0].env[] | select (.name == "TEST_SPARK_VAR_0").value' | grep 'REPLACED'
- script: kubectl -n $NAMESPACE get cm spark-pi-s3-1-driver-pod-template -o json | jq -r '.data."template.yaml"' | yq '.spec.containers[0].env[] | select (.name == "TEST_SPARK_VAR_1").value' | grep 'DONOTREPLACE'
- script: kubectl -n $NAMESPACE get cm spark-pi-s3-1-executor-pod-template -o json | jq -r '.data."template.yaml"' | yq '.spec.containers[0].env[] | select (.name == "TEST_SPARK_VAR_0").value' | grep 'REPLACED'
- script: kubectl -n $NAMESPACE get cm spark-pi-s3-1-executor-pod-template -o json | jq -r '.data."template.yaml"' | yq '.spec.containers[0].env[] | select (.name == "TEST_SPARK_VAR_1").value' | grep 'DONOTREPLACE'
- script: |
POD=$(kubectl -n $NAMESPACE get pod -l app.kubernetes.io/instance=spark-pi-s3-1 -o name | head -n 1 | sed -e 's#pod/##')
kubectl -n $NAMESPACE get pod $POD -o yaml | yq '.spec.containers[0].env[] | select (.name == "TEST_SPARK_VAR_0").value' | grep 'REPLACED'
kubectl -n $NAMESPACE get pod $POD -o yaml | yq '.spec.containers[0].env[] | select (.name == "TEST_SPARK_VAR_1").value' | grep 'DONOTREPLACE'
2 changes: 1 addition & 1 deletion tests/test-definition.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ tests:
- spark
- s3-use-tls
- openshift
- name: pod_overrides
- name: overrides
dimensions:
- spark
- openshift
Expand Down
Loading