Skip to content

Commit d5ede2c

Browse files
Felix Hennigfhennigadwk67
committed
Added s3 section (#42)
## Description - Added s3 section to the CRD. - credentials and endpoint are written to env vars/config options fixes #37 A test case has been added here: https://github.com/stackabletech/integration-tests/pull/205 and should be reviewed at the same time. Co-authored-by: Felix Hennig <[email protected]> Co-authored-by: Andrew Kenworthy <[email protected]>
1 parent 984e5b2 commit d5ede2c

File tree

6 files changed

+87
-7
lines changed

6 files changed

+87
-7
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ All notable changes to this project will be documented in this file.
66

77
### Added
88

9-
- ServiceAccount, ClusterRole and RoleBinding for Spark driver ([#39])
109
- Initial commit
10+
- ServiceAccount, ClusterRole and RoleBinding for Spark driver ([#39])
11+
- S3 credentials can be provided via a Secret ([#42])
1112

1213
[#39]: https://github.com/stackabletech/spark-k8s-operator/pull/39
14+
[#42]: https://github.com/stackabletech/spark-k8s-operator/pull/42

deploy/crd/sparkapplication.crd.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,17 @@ spec:
263263
mode:
264264
nullable: true
265265
type: string
266+
s3:
267+
nullable: true
268+
properties:
269+
credentialsSecret:
270+
type: string
271+
endpoint:
272+
nullable: true
273+
type: string
274+
required:
275+
- credentialsSecret
276+
type: object
266277
sparkConf:
267278
additionalProperties:
268279
type: string

deploy/helm/spark-k8s-operator/crds/crds.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,17 @@ spec:
265265
mode:
266266
nullable: true
267267
type: string
268+
s3:
269+
nullable: true
270+
properties:
271+
credentialsSecret:
272+
type: string
273+
endpoint:
274+
nullable: true
275+
type: string
276+
required:
277+
- credentialsSecret
278+
type: object
268279
sparkConf:
269280
additionalProperties:
270281
type: string

deploy/manifests/crds.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,17 @@ spec:
267267
mode:
268268
nullable: true
269269
type: string
270+
s3:
271+
nullable: true
272+
properties:
273+
credentialsSecret:
274+
type: string
275+
endpoint:
276+
nullable: true
277+
type: string
278+
required:
279+
- credentialsSecret
280+
type: object
270281
sparkConf:
271282
additionalProperties:
272283
type: string

rust/crd/src/constants.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,8 @@ pub const CONTAINER_NAME_DRIVER: &str = "spark-driver";
1616

1717
pub const CONTAINER_IMAGE_NAME_EXECUTOR: &str = "dummy-overwritten-by-command-line";
1818
pub const CONTAINER_NAME_EXECUTOR: &str = "spark-executor";
19+
20+
pub const ENV_AWS_ACCESS_KEY_ID: &str = "AWS_ACCESS_KEY_ID";
21+
pub const ENV_AWS_SECRET_ACCESS_KEY: &str = "AWS_SECRET_ACCESS_KEY";
22+
pub const ACCESS_KEY_ID: &str = "accessKeyId";
23+
pub const SECRET_ACCESS_KEY: &str = "secretAccessKey";

rust/crd/src/lib.rs

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
pub mod constants;
44

55
use constants::*;
6-
use stackable_operator::k8s_openapi::api::core::v1::{EnvVar, Volume, VolumeMount};
6+
use stackable_operator::k8s_openapi::api::core::v1::{
7+
EnvVar, EnvVarSource, SecretKeySelector, Volume, VolumeMount,
8+
};
79

810
use std::collections::{BTreeMap, HashMap};
911

@@ -77,6 +79,8 @@ pub struct SparkApplicationSpec {
7779
#[serde(default, skip_serializing_if = "Option::is_none")]
7880
pub deps: Option<JobDependencies>,
7981
#[serde(default, skip_serializing_if = "Option::is_none")]
82+
pub s3: Option<S3>,
83+
#[serde(default, skip_serializing_if = "Option::is_none")]
8084
pub args: Option<Vec<String>>,
8185
#[serde(default, skip_serializing_if = "Option::is_none")]
8286
pub volumes: Option<Vec<Volume>>,
@@ -97,6 +101,14 @@ pub struct JobDependencies {
97101
pub exclude_packages: Option<Vec<String>>,
98102
}
99103

104+
#[derive(Clone, Debug, Default, Deserialize, JsonSchema, PartialEq, Serialize)]
105+
#[serde(rename_all = "camelCase")]
106+
pub struct S3 {
107+
pub credentials_secret: String,
108+
#[serde(default, skip_serializing_if = "Option::is_none")]
109+
pub endpoint: Option<String>,
110+
}
111+
100112
impl SparkApplication {
101113
pub fn enable_monitoring(&self) -> Option<bool> {
102114
let spec: &SparkApplicationSpec = &self.spec;
@@ -136,7 +148,20 @@ impl SparkApplication {
136148

137149
pub fn env(&self) -> Vec<EnvVar> {
138150
let tmp = self.spec.env.as_ref();
139-
tmp.iter().flat_map(|e| e.iter()).cloned().collect()
151+
let mut e: Vec<EnvVar> = tmp.iter().flat_map(|e| e.iter()).cloned().collect();
152+
if let Some(s3) = self.spec.s3.as_ref() {
153+
e.push(env_var_from_secret(
154+
ENV_AWS_ACCESS_KEY_ID,
155+
&s3.credentials_secret,
156+
ACCESS_KEY_ID,
157+
));
158+
e.push(env_var_from_secret(
159+
ENV_AWS_SECRET_ACCESS_KEY,
160+
&s3.credentials_secret,
161+
SECRET_ACCESS_KEY,
162+
));
163+
}
164+
e
140165
}
141166

142167
pub fn volumes(&self) -> Vec<Volume> {
@@ -195,12 +220,12 @@ impl SparkApplication {
195220
format!("--conf spark.kubernetes.driver.container.image={}", self.spec.spark_image.as_ref().context(NoSparkImageSnafu)?),
196221
format!("--conf spark.kubernetes.executor.container.image={}", self.spec.spark_image.as_ref().context(NoSparkImageSnafu)?),
197222
format!("--conf spark.kubernetes.authenticate.driver.serviceAccountName={}", serviceaccount_name),
198-
//"--conf spark.kubernetes.file.upload.path=s3a://stackable-spark-k8s-jars/jobs".to_string(),
199-
//"--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem".to_string(),
200-
//"--conf spark.driver.extraClassPath=/stackable/.ivy2/cache".to_string(),
201-
//"--conf spark.hadoop.fs.s3a.fast.upload=true".to_string(),
202223
];
203224

225+
if let Some(endpoint) = self.spec.s3.as_ref().and_then(|s3| s3.endpoint.as_ref()) {
226+
submit_cmd.push(format!("--conf spark.hadoop.fs.s3a.endpoint={}", endpoint));
227+
}
228+
204229
// conf arguments that are not driver or executor specific
205230
if let Some(spark_conf) = self.spec.spark_conf.clone() {
206231
for (key, value) in spark_conf {
@@ -325,6 +350,21 @@ pub struct CommandStatus {
325350
pub finished_at: Option<Time>,
326351
}
327352

353+
fn env_var_from_secret(var_name: &str, secret: &str, secret_key: &str) -> EnvVar {
354+
EnvVar {
355+
name: String::from(var_name),
356+
value_from: Some(EnvVarSource {
357+
secret_key_ref: Some(SecretKeySelector {
358+
name: Some(String::from(secret)),
359+
key: String::from(secret_key),
360+
..Default::default()
361+
}),
362+
..Default::default()
363+
}),
364+
..Default::default()
365+
}
366+
}
367+
328368
#[cfg(test)]
329369
mod tests {
330370
use crate::SparkApplication;

0 commit comments

Comments
 (0)