From f11b5821ca95c5325532e29f7ef9ea38015b1dd8 Mon Sep 17 00:00:00 2001 From: Felix Hennig Date: Thu, 31 Mar 2022 13:44:12 +0200 Subject: [PATCH 1/4] Added s3 section --- deploy/crd/sparkapplication.crd.yaml | 11 ++++++ deploy/helm/spark-k8s-operator/crds/crds.yaml | 11 ++++++ deploy/manifests/crds.yaml | 11 ++++++ rust/crd/src/lib.rs | 39 ++++++++++++++++++- .../src/spark_k8s_controller.rs | 5 +-- 5 files changed, 71 insertions(+), 6 deletions(-) diff --git a/deploy/crd/sparkapplication.crd.yaml b/deploy/crd/sparkapplication.crd.yaml index cbda8463..b79fe882 100644 --- a/deploy/crd/sparkapplication.crd.yaml +++ b/deploy/crd/sparkapplication.crd.yaml @@ -263,6 +263,17 @@ spec: mode: nullable: true type: string + s3: + nullable: true + properties: + credentialsSecret: + type: string + endpoint: + nullable: true + type: string + required: + - credentialsSecret + type: object sparkConf: additionalProperties: type: string diff --git a/deploy/helm/spark-k8s-operator/crds/crds.yaml b/deploy/helm/spark-k8s-operator/crds/crds.yaml index 5eb29f98..ef22569f 100644 --- a/deploy/helm/spark-k8s-operator/crds/crds.yaml +++ b/deploy/helm/spark-k8s-operator/crds/crds.yaml @@ -265,6 +265,17 @@ spec: mode: nullable: true type: string + s3: + nullable: true + properties: + credentialsSecret: + type: string + endpoint: + nullable: true + type: string + required: + - credentialsSecret + type: object sparkConf: additionalProperties: type: string diff --git a/deploy/manifests/crds.yaml b/deploy/manifests/crds.yaml index db8cf8f1..01331b42 100644 --- a/deploy/manifests/crds.yaml +++ b/deploy/manifests/crds.yaml @@ -267,6 +267,17 @@ spec: mode: nullable: true type: string + s3: + nullable: true + properties: + credentialsSecret: + type: string + endpoint: + nullable: true + type: string + required: + - credentialsSecret + type: object sparkConf: additionalProperties: type: string diff --git a/rust/crd/src/lib.rs b/rust/crd/src/lib.rs index d9a14374..8146bf00 100644 --- a/rust/crd/src/lib.rs +++ b/rust/crd/src/lib.rs @@ -3,7 +3,7 @@ pub mod constants; use constants::*; -use stackable_operator::k8s_openapi::api::core::v1::{EnvVar, Volume, VolumeMount}; +use stackable_operator::k8s_openapi::api::core::v1::{EnvVar, EnvVarSource, SecretKeySelector, Volume, VolumeMount}; use std::collections::HashMap; @@ -76,6 +76,8 @@ pub struct SparkApplicationSpec { #[serde(default, skip_serializing_if = "Option::is_none")] pub deps: Option, #[serde(default, skip_serializing_if = "Option::is_none")] + pub s3: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] pub args: Option>, #[serde(default, skip_serializing_if = "Option::is_none")] pub volumes: Option>, @@ -96,6 +98,14 @@ pub struct JobDependencies { pub exclude_packages: Option>, } +#[derive(Clone, Debug, Default, Deserialize, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct S3 { + pub credentials_secret: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub endpoint: Option, +} + impl SparkApplication { pub fn enable_monitoring(&self) -> Option { let spec: &SparkApplicationSpec = &self.spec; @@ -131,7 +141,12 @@ impl SparkApplication { pub fn env(&self) -> Vec { let tmp = self.spec.env.as_ref(); - tmp.iter().flat_map(|e| e.iter()).cloned().collect() + let mut e: Vec = tmp.iter().flat_map(|e| e.iter()).cloned().collect(); + if let Some(s3) = self.spec.s3.as_ref() { + e.push(env_var_from_secret("AWS_ACCCESS_KEY_ID", &s3.credentials_secret, "acessKeyId")); + e.push(env_var_from_secret("AWS_SECRET_ACCESS_KEY", &s3.credentials_secret, "secretAccessKey")); + } + e } pub fn volumes(&self) -> Vec { @@ -183,6 +198,11 @@ impl SparkApplication { //"--conf spark.hadoop.fs.s3a.fast.upload=true".to_string(), ]; + + if let Some(endpoint) = self.spec.s3.as_ref().and_then(|s3| s3.endpoint.as_ref()) { + submit_cmd.push(format!("--conf spark.hadoop.fs.s3a.endpoint={}", endpoint)); + } + // conf arguments that are not driver or executor specific if let Some(spark_conf) = self.spec.spark_conf.clone() { for (key, value) in spark_conf { @@ -307,6 +327,21 @@ pub struct CommandStatus { pub finished_at: Option