diff --git a/CHANGELOG.md b/CHANGELOG.md index edf932f1..6af61b61 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Added + +- Added new fields to govern image pull policy ([#75]) + +[#75]: https://github.com/stackabletech/spark-k8s-operator/pull/75 + ### Changed - Updated examples ([#71]) diff --git a/Cargo.lock b/Cargo.lock index a561f81f..f2c8b00b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1724,6 +1724,7 @@ dependencies = [ "serde_yaml", "snafu", "stackable-operator", + "strum", ] [[package]] diff --git a/deploy/crd/sparkapplication.crd.yaml b/deploy/crd/sparkapplication.crd.yaml index 4fbb0187..7231f7c9 100644 --- a/deploy/crd/sparkapplication.crd.yaml +++ b/deploy/crd/sparkapplication.crd.yaml @@ -356,6 +356,23 @@ spec: sparkImage: nullable: true type: string + sparkImagePullPolicy: + enum: + - Always + - IfNotPresent + - Never + nullable: true + type: string + sparkImagePullSecrets: + items: + description: LocalObjectReference contains enough information to let you locate the referenced object inside the same namespace. + properties: + name: + description: "Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names" + type: string + type: object + nullable: true + type: array stopped: nullable: true type: boolean diff --git a/deploy/helm/spark-k8s-operator/crds/crds.yaml b/deploy/helm/spark-k8s-operator/crds/crds.yaml index 28d4ce91..d77aec0e 100644 --- a/deploy/helm/spark-k8s-operator/crds/crds.yaml +++ b/deploy/helm/spark-k8s-operator/crds/crds.yaml @@ -358,6 +358,23 @@ spec: sparkImage: nullable: true type: string + sparkImagePullPolicy: + enum: + - Always + - IfNotPresent + - Never + nullable: true + type: string + sparkImagePullSecrets: + items: + description: LocalObjectReference contains enough information to let you locate the referenced object inside the same namespace. + properties: + name: + description: "Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names" + type: string + type: object + nullable: true + type: array stopped: nullable: true type: boolean diff --git a/deploy/manifests/crds.yaml b/deploy/manifests/crds.yaml index 53fd9d65..7a0a844e 100644 --- a/deploy/manifests/crds.yaml +++ b/deploy/manifests/crds.yaml @@ -359,6 +359,23 @@ spec: sparkImage: nullable: true type: string + sparkImagePullPolicy: + enum: + - Always + - IfNotPresent + - Never + nullable: true + type: string + sparkImagePullSecrets: + items: + description: LocalObjectReference contains enough information to let you locate the referenced object inside the same namespace. + properties: + name: + description: "Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names" + type: string + type: object + nullable: true + type: array stopped: nullable: true type: boolean diff --git a/docs/modules/ROOT/pages/usage.adoc b/docs/modules/ROOT/pages/usage.adoc index 1d26df25..6b3b3af4 100644 --- a/docs/modules/ROOT/pages/usage.adoc +++ b/docs/modules/ROOT/pages/usage.adoc @@ -199,6 +199,12 @@ Below are listed the CRD fields that can be defined by the user: |`spec.sparkImage` | Spark image which will be deployed to driver and executor pods, which must contain spark environment needed by the job e.g. `docker.stackable.tech/stackable/spark-k8s:3.2.1-hadoop3.2-stackable0.4.0` +|`spec.sparkImagePullPolicy` +| Optional Enum (one of `Always`, `IfNotPresent` or `Never`) that determines the pull policy of the spark job image + +|`spec.sparkImagePullSecrets` +| An optional list of references to secrets in the same namespace to use for pulling any of the images used by a `SparkApplication` resource. Each reference has a single property (`name`) that must contain a reference to a valid secret + |`spec.mainApplicationFile` |The actual application file that will be called by `spark-submit` diff --git a/examples/ny-tlc-report-external-dependencies.yaml b/examples/ny-tlc-report-external-dependencies.yaml index d8fe50fd..28110039 100644 --- a/examples/ny-tlc-report-external-dependencies.yaml +++ b/examples/ny-tlc-report-external-dependencies.yaml @@ -7,6 +7,8 @@ metadata: spec: version: "1.0" sparkImage: docker.stackable.tech/stackable/pyspark-k8s:3.2.1-hadoop3.2-python39-stackable0.1.0 + # Always | IfNotPresent | Never + sparkImagePullPolicy: IfNotPresent mode: cluster mainApplicationFile: s3a://my-bucket/ny-tlc-report.py args: diff --git a/examples/ny-tlc-report-image.yaml b/examples/ny-tlc-report-image.yaml index d413dc20..2315f6be 100644 --- a/examples/ny-tlc-report-image.yaml +++ b/examples/ny-tlc-report-image.yaml @@ -9,6 +9,7 @@ spec: # everything under /jobs will be copied to /stackable/spark/jobs image: docker.stackable.tech/stackable/ny-tlc-report:0.1.0 sparkImage: docker.stackable.tech/stackable/pyspark-k8s:3.2.1-hadoop3.2-python39-stackable0.1.0 + sparkImagePullPolicy: Always mode: cluster mainApplicationFile: local:///stackable/spark/jobs/ny_tlc_report.py args: diff --git a/rust/crd/Cargo.toml b/rust/crd/Cargo.toml index adad7892..1a9b0f65 100644 --- a/rust/crd/Cargo.toml +++ b/rust/crd/Cargo.toml @@ -15,3 +15,4 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_yaml = "0.8" snafu = "0.7" +strum = { version = "0.24", features = ["derive"] } diff --git a/rust/crd/src/lib.rs b/rust/crd/src/lib.rs index bcc0183c..fc12a909 100644 --- a/rust/crd/src/lib.rs +++ b/rust/crd/src/lib.rs @@ -5,7 +5,7 @@ pub mod constants; use constants::*; use stackable_operator::commons::s3::{InlinedS3BucketSpec, S3BucketDef}; use stackable_operator::k8s_openapi::api::core::v1::{ - EnvVar, EnvVarSource, SecretKeySelector, Volume, VolumeMount, + EnvVar, EnvVarSource, LocalObjectReference, SecretKeySelector, Volume, VolumeMount, }; use std::collections::{BTreeMap, HashMap}; @@ -20,6 +20,7 @@ use stackable_operator::{ role_utils::CommonConfiguration, schemars::{self, JsonSchema}, }; +use strum::{Display, EnumString}; #[derive(Snafu, Debug)] pub enum Error { @@ -68,6 +69,10 @@ pub struct SparkApplicationSpec { #[serde(default, skip_serializing_if = "Option::is_none")] pub spark_image: Option, #[serde(default, skip_serializing_if = "Option::is_none")] + pub spark_image_pull_policy: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub spark_image_pull_secrets: Option>, + #[serde(default, skip_serializing_if = "Option::is_none")] pub driver: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub executor: Option, @@ -89,6 +94,13 @@ pub struct SparkApplicationSpec { pub env: Option>, } +#[derive(Clone, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize, Display, EnumString)] +pub enum ImagePullPolicy { + Always, + IfNotPresent, + Never, +} + #[derive(Clone, Debug, Default, Deserialize, JsonSchema, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] pub struct JobDependencies { @@ -123,6 +135,14 @@ impl SparkApplication { self.spec.image.as_deref() } + pub fn spark_image_pull_policy(&self) -> Option { + self.spec.spark_image_pull_policy.clone() + } + + pub fn spark_image_pull_secrets(&self) -> Option> { + self.spec.spark_image_pull_secrets.clone() + } + pub fn version(&self) -> Option<&str> { self.spec.version.as_deref() } @@ -380,7 +400,10 @@ pub struct CommandStatus { #[cfg(test)] mod tests { + use crate::ImagePullPolicy; + use crate::LocalObjectReference; use crate::SparkApplication; + use std::str::FromStr; #[test] fn test_spark_examples_s3() { @@ -542,4 +565,72 @@ spec: assert!(spark_application.spec.main_class.is_none()); assert!(spark_application.spec.image.is_none()); } + + #[test] + fn test_image_actions() { + let spark_application = serde_yaml::from_str::( + r#" +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkApplication +metadata: + name: spark-pi-local + namespace: default +spec: + version: "1.0" + sparkImage: docker.stackable.tech/stackable/spark-k8s:3.2.1-hadoop3.2-stackable0.4.0 + sparkImagePullPolicy: Always + sparkImagePullSecrets: + - name: myregistrykey + mode: cluster + mainClass: org.apache.spark.examples.SparkPi + mainApplicationFile: local:///stackable/spark/examples/jars/spark-examples_2.12-3.2.1.jar + sparkConf: + spark.kubernetes.node.selector.node: "2" + driver: + cores: 1 + coreLimit: "1200m" + memory: "512m" + executor: + cores: 1 + instances: 1 + memory: "512m" + "#, + ) + .unwrap(); + + assert_eq!( + Some(vec![LocalObjectReference { + name: Some("myregistrykey".to_string()) + }]), + spark_application.spark_image_pull_secrets() + ); + assert_eq!( + Some(ImagePullPolicy::Always), + spark_application.spark_image_pull_policy() + ); + } + + #[test] + fn test_image_pull_policy_ser() { + assert_eq!("Never", ImagePullPolicy::Never.to_string()); + assert_eq!("Always", ImagePullPolicy::Always.to_string()); + assert_eq!("IfNotPresent", ImagePullPolicy::IfNotPresent.to_string()); + } + + #[test] + fn test_image_pull_policy_de() { + assert_eq!( + ImagePullPolicy::Always, + ImagePullPolicy::from_str("Always").unwrap() + ); + assert_eq!( + ImagePullPolicy::Never, + ImagePullPolicy::from_str("Never").unwrap() + ); + assert_eq!( + ImagePullPolicy::IfNotPresent, + ImagePullPolicy::from_str("IfNotPresent").unwrap() + ); + } } diff --git a/rust/operator-binary/src/spark_k8s_controller.rs b/rust/operator-binary/src/spark_k8s_controller.rs index 262ad973..1297a908 100644 --- a/rust/operator-binary/src/spark_k8s_controller.rs +++ b/rust/operator-binary/src/spark_k8s_controller.rs @@ -1,6 +1,6 @@ use snafu::{OptionExt, ResultExt, Snafu}; use stackable_operator::builder::{ - ConfigMapBuilder, ContainerBuilder, ObjectMetaBuilder, PodBuilder, VolumeBuilder, + ConfigMapBuilder, ContainerBuilder, ObjectMetaBuilder, VolumeBuilder, }; use stackable_operator::k8s_openapi::api::batch::v1::{Job, JobSpec}; @@ -10,6 +10,7 @@ use stackable_operator::k8s_openapi::api::core::v1::{ }; use stackable_operator::k8s_openapi::api::rbac::v1::{ClusterRole, RoleBinding, RoleRef, Subject}; use stackable_operator::k8s_openapi::Resource; +use stackable_operator::kube::api::ObjectMeta; use stackable_operator::kube::runtime::controller::{Action, Context}; use stackable_operator::logging::controller::ReconcilerError; use stackable_operator::product_config::ProductConfigManager; @@ -30,8 +31,6 @@ pub struct Ctx { #[strum_discriminants(derive(IntoStaticStr))] #[allow(clippy::enum_variant_names)] pub enum Error { - #[snafu(display("object defines no version"))] - ObjectHasNoVersion, #[snafu(display("object is missing metadata to build owner reference"))] ObjectMissingMetadataForOwnerRef { source: stackable_operator::error::Error, @@ -133,7 +132,8 @@ pub async fn reconcile( }); let requirements_container = spark_application.requirements().map(|req| { - ContainerBuilder::new(CONTAINER_NAME_REQ) + let mut container_builder = ContainerBuilder::new(CONTAINER_NAME_REQ); + container_builder .image(spark_image) .command(vec![ "/bin/bash".to_string(), @@ -141,8 +141,11 @@ pub async fn reconcile( "-c".to_string(), format!("pip install --target={VOLUME_MOUNT_PATH_REQ} {req}"), ]) - .add_volume_mount(VOLUME_MOUNT_NAME_REQ, VOLUME_MOUNT_PATH_REQ) - .build() + .add_volume_mount(VOLUME_MOUNT_NAME_REQ, VOLUME_MOUNT_PATH_REQ); + if let Some(image_pull_policy) = spark_application.spark_image_pull_policy() { + container_builder.image_pull_policy(image_pull_policy.to_string()); + } + container_builder.build() }); let env_vars = spark_application.env(&s3bucket); @@ -182,6 +185,7 @@ pub async fn reconcile( } fn pod_template( + spark_application: &SparkApplication, container_name: &str, job_container: &Option, requirements_container: &Option, @@ -189,16 +193,24 @@ fn pod_template( volume_mounts: &[VolumeMount], env: &[EnvVar], ) -> Result { - let volumes = volumes.to_vec(); + let mut volumes = volumes.to_vec(); let volume_mounts = volume_mounts.to_vec(); + let mut inits: Option> = None; let mut container = ContainerBuilder::new(container_name); container .add_volume_mounts(volume_mounts) .add_env_vars(env.to_vec()); + if let Some(image_pull_policy) = spark_application.spark_image_pull_policy() { + container.image_pull_policy(image_pull_policy.to_string()); + } + if job_container.is_some() { container.add_volume_mount(VOLUME_MOUNT_NAME_JOB, VOLUME_MOUNT_PATH_JOB); + volumes.extend(vec![VolumeBuilder::new(VOLUME_MOUNT_NAME_JOB) + .empty_dir(EmptyDirVolumeSource::default()) + .build()]); } if requirements_container.is_some() { @@ -208,31 +220,34 @@ fn pod_template( "PYTHONPATH", format!("$SPARK_HOME/python:{VOLUME_MOUNT_PATH_REQ}:$PYTHONPATH"), ); + volumes.extend(vec![VolumeBuilder::new(VOLUME_MOUNT_NAME_REQ) + .empty_dir(EmptyDirVolumeSource::default()) + .build()]); } - let mut template = PodBuilder::new(); - template - .metadata_default() - .add_container(container.build()) - .add_volumes(volumes); - if let Some(container) = requirements_container.clone() { - template.add_init_container(container); - template.add_volume( - VolumeBuilder::new(VOLUME_MOUNT_NAME_REQ) - .empty_dir(EmptyDirVolumeSource::default()) - .build(), - ); + inits.get_or_insert_with(Vec::new).push(container); } if let Some(container) = job_container.clone() { - template.add_init_container(container); - template.add_volume( - VolumeBuilder::new(VOLUME_MOUNT_NAME_JOB) - .empty_dir(EmptyDirVolumeSource::default()) - .build(), - ); + inits.get_or_insert_with(Vec::new).push(container); } - template.build().context(PodTemplateSnafu) + + let mut pod_spec = PodSpec { + containers: vec![container.build()], + init_containers: inits.clone(), + volumes: Some(volumes.clone()), + ..PodSpec::default() + }; + + if let Some(image_pull_secrets) = spark_application.spark_image_pull_secrets() { + pod_spec.image_pull_secrets = Some(image_pull_secrets); + } + + Ok(Pod { + metadata: ObjectMeta::default(), + spec: Some(pod_spec), + ..Pod::default() + }) } fn pod_template_config_map( @@ -244,6 +259,7 @@ fn pod_template_config_map( let volumes = spark_application.volumes(); let driver_template = pod_template( + spark_application, CONTAINER_NAME_DRIVER, job_container, requirements_container, @@ -252,6 +268,7 @@ fn pod_template_config_map( env, )?; let executor_template = pod_template( + spark_application, CONTAINER_NAME_EXECUTOR, job_container, requirements_container, @@ -322,6 +339,10 @@ fn spark_job( value_from: None, }]); + if let Some(image_pull_policy) = spark_application.spark_image_pull_policy() { + container.image_pull_policy(image_pull_policy.to_string()); + } + let mut volumes = vec![Volume { name: String::from(VOLUME_MOUNT_NAME_POD_TEMPLATES), config_map: Some(ConfigMapVolumeSource { @@ -353,6 +374,7 @@ fn spark_job( restart_policy: Some("Never".to_string()), service_account_name: serviceaccount.metadata.name.clone(), volumes: Some(volumes), + image_pull_secrets: spark_application.spark_image_pull_secrets(), ..PodSpec::default() }), }; diff --git a/tests/templates/kuttl/spark-ny-public-s3/00-s3-upload-container.yaml b/tests/templates/kuttl/spark-ny-public-s3/00-s3-upload-container.yaml index 41870882..dbd4c93d 100644 --- a/tests/templates/kuttl/spark-ny-public-s3/00-s3-upload-container.yaml +++ b/tests/templates/kuttl/spark-ny-public-s3/00-s3-upload-container.yaml @@ -5,6 +5,7 @@ metadata: name: minio-mc labels: app: minio-mc +timeout: 240 spec: clusterIP: None selector: @@ -16,6 +17,7 @@ metadata: name: minio-mc labels: app: minio-mc +timeout: 240 spec: replicas: 1 serviceName: "minio-mc" diff --git a/tests/templates/kuttl/spark-ny-public-s3/00-setup-minio.yaml b/tests/templates/kuttl/spark-ny-public-s3/00-setup-minio.yaml index 7a0c10e7..17e2d19a 100644 --- a/tests/templates/kuttl/spark-ny-public-s3/00-setup-minio.yaml +++ b/tests/templates/kuttl/spark-ny-public-s3/00-setup-minio.yaml @@ -11,3 +11,4 @@ commands: --set buckets[0].name=my-bucket,buckets[0].policy=public --set resources.requests.memory=1Gi --repo https://charts.min.io/ minio + timeout: 240 diff --git a/tests/templates/kuttl/spark-ny-public-s3/10-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/spark-ny-public-s3/10-deploy-spark-app.yaml.j2 index fde2dcf9..4eb813cf 100644 --- a/tests/templates/kuttl/spark-ny-public-s3/10-deploy-spark-app.yaml.j2 +++ b/tests/templates/kuttl/spark-ny-public-s3/10-deploy-spark-app.yaml.j2 @@ -14,6 +14,7 @@ metadata: spec: version: "1.0" sparkImage: docker.stackable.tech/stackable/spark-k8s:{{ test_scenario['values']['spark'] }}-hadoop{{ test_scenario['values']['hadoop'][:-2] }}-stackable{{ test_scenario['values']['stackable'] }} + sparkImagePullPolicy: IfNotPresent mode: cluster mainClass: tech.stackable.demo.spark.NYTLCReport mainApplicationFile: s3a://my-bucket/ny-tlc-report-1.1.0.jar diff --git a/tests/templates/kuttl/spark-pi-private-s3/00-s3-secret.yaml b/tests/templates/kuttl/spark-pi-private-s3/00-s3-secret.yaml index c07c1e5c..2e32aff6 100644 --- a/tests/templates/kuttl/spark-pi-private-s3/00-s3-secret.yaml +++ b/tests/templates/kuttl/spark-pi-private-s3/00-s3-secret.yaml @@ -3,6 +3,7 @@ apiVersion: v1 kind: Secret metadata: name: minio-credentials +timeout: 240 stringData: accessKeyId: minioAccessKey secretAccessKey: minioSecretKey diff --git a/tests/templates/kuttl/spark-pi-private-s3/00-s3-upload-container.yaml b/tests/templates/kuttl/spark-pi-private-s3/00-s3-upload-container.yaml index 41870882..dbd4c93d 100644 --- a/tests/templates/kuttl/spark-pi-private-s3/00-s3-upload-container.yaml +++ b/tests/templates/kuttl/spark-pi-private-s3/00-s3-upload-container.yaml @@ -5,6 +5,7 @@ metadata: name: minio-mc labels: app: minio-mc +timeout: 240 spec: clusterIP: None selector: @@ -16,6 +17,7 @@ metadata: name: minio-mc labels: app: minio-mc +timeout: 240 spec: replicas: 1 serviceName: "minio-mc" diff --git a/tests/templates/kuttl/spark-pi-private-s3/00-setup-minio.yaml b/tests/templates/kuttl/spark-pi-private-s3/00-setup-minio.yaml index 7927295f..ae273e09 100644 --- a/tests/templates/kuttl/spark-pi-private-s3/00-setup-minio.yaml +++ b/tests/templates/kuttl/spark-pi-private-s3/00-setup-minio.yaml @@ -12,3 +12,4 @@ commands: --set users[0].accessKey=minioAccessKey,users[0].secretKey=minioSecretKey,users[0].policy=readwrite --set resources.requests.memory=1Gi --repo https://charts.min.io/ minio + timeout: 240 diff --git a/tests/templates/kuttl/spark-pi-public-s3/00-s3-upload-container.yaml b/tests/templates/kuttl/spark-pi-public-s3/00-s3-upload-container.yaml index 41870882..dbd4c93d 100644 --- a/tests/templates/kuttl/spark-pi-public-s3/00-s3-upload-container.yaml +++ b/tests/templates/kuttl/spark-pi-public-s3/00-s3-upload-container.yaml @@ -5,6 +5,7 @@ metadata: name: minio-mc labels: app: minio-mc +timeout: 240 spec: clusterIP: None selector: @@ -16,6 +17,7 @@ metadata: name: minio-mc labels: app: minio-mc +timeout: 240 spec: replicas: 1 serviceName: "minio-mc" diff --git a/tests/templates/kuttl/spark-pi-public-s3/00-setup-minio.yaml b/tests/templates/kuttl/spark-pi-public-s3/00-setup-minio.yaml index 7a0c10e7..17e2d19a 100644 --- a/tests/templates/kuttl/spark-pi-public-s3/00-setup-minio.yaml +++ b/tests/templates/kuttl/spark-pi-public-s3/00-setup-minio.yaml @@ -11,3 +11,4 @@ commands: --set buckets[0].name=my-bucket,buckets[0].policy=public --set resources.requests.memory=1Gi --repo https://charts.min.io/ minio + timeout: 240 diff --git a/tests/templates/kuttl/spark-pi-public-s3/10-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/spark-pi-public-s3/10-deploy-spark-app.yaml.j2 index 9e083b00..427205fe 100644 --- a/tests/templates/kuttl/spark-pi-public-s3/10-deploy-spark-app.yaml.j2 +++ b/tests/templates/kuttl/spark-pi-public-s3/10-deploy-spark-app.yaml.j2 @@ -6,6 +6,7 @@ metadata: spec: version: "1.0" sparkImage: docker.stackable.tech/stackable/spark-k8s:{{ test_scenario['values']['spark'] }}-hadoop{{ test_scenario['values']['hadoop'][:-2] }}-stackable{{ test_scenario['values']['stackable'] }} + sparkImagePullPolicy: Always mode: cluster mainClass: org.apache.spark.examples.SparkPi mainApplicationFile: s3a://my-bucket/spark-examples_2.12-{{ test_scenario['values']['spark'] }}.jar