diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d68f6ea..dc297757 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ All notable changes to this project will be documented in this file. - Deploy default and support custom affinities ([#217]) - BREAKING: Dropped support for old spec.{driver,executor}.nodeSelector field. Use spec.{driver,executor}.affinity.nodeSelector instead ([#217]) +- Log aggregation added ([#226]). ### Changed @@ -19,6 +20,7 @@ All notable changes to this project will be documented in this file. [#207]: https://github.com/stackabletech/spark-k8s-operator/pull/207 [#217]: https://github.com/stackabletech/spark-k8s-operator/pull/217 [#223]: https://github.com/stackabletech/spark-k8s-operator/pull/223 +[#226]: https://github.com/stackabletech/spark-k8s-operator/pull/226 ## [23.1.0] - 2023-01-23 diff --git a/deploy/helm/spark-k8s-operator/crds/crds.yaml b/deploy/helm/spark-k8s-operator/crds/crds.yaml index f6ba5565..85864cda 100644 --- a/deploy/helm/spark-k8s-operator/crds/crds.yaml +++ b/deploy/helm/spark-k8s-operator/crds/crds.yaml @@ -565,6 +565,82 @@ spec: type: array type: object type: object + logging: + default: + enableVectorAgent: null + containers: {} + properties: + containers: + additionalProperties: + anyOf: + - required: + - custom + - {} + description: Fragment derived from `ContainerLogConfigChoice` + properties: + console: + nullable: true + properties: + level: + description: Log levels + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + nullable: true + type: string + type: object + file: + nullable: true + properties: + level: + description: Log levels + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + loggers: + additionalProperties: + properties: + level: + description: Log levels + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + default: {} + type: object + type: object + type: object + enableVectorAgent: + nullable: true + type: boolean + type: object resources: default: memory: @@ -1198,6 +1274,82 @@ spec: minimum: 0.0 nullable: true type: integer + logging: + default: + enableVectorAgent: null + containers: {} + properties: + containers: + additionalProperties: + anyOf: + - required: + - custom + - {} + description: Fragment derived from `ContainerLogConfigChoice` + properties: + console: + nullable: true + properties: + level: + description: Log levels + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + nullable: true + type: string + type: object + file: + nullable: true + properties: + level: + description: Log levels + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + loggers: + additionalProperties: + properties: + level: + description: Log levels + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + default: {} + type: object + type: object + type: object + enableVectorAgent: + nullable: true + type: boolean + type: object nodeSelector: additionalProperties: type: string @@ -1274,6 +1426,82 @@ spec: job: nullable: true properties: + logging: + default: + enableVectorAgent: null + containers: {} + properties: + containers: + additionalProperties: + anyOf: + - required: + - custom + - {} + description: Fragment derived from `ContainerLogConfigChoice` + properties: + console: + nullable: true + properties: + level: + description: Log levels + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + nullable: true + type: string + type: object + file: + nullable: true + properties: + level: + description: Log levels + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + loggers: + additionalProperties: + properties: + level: + description: Log levels + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + default: {} + type: object + type: object + type: object + enableVectorAgent: + nullable: true + type: boolean + type: object resources: default: memory: @@ -1579,6 +1807,10 @@ spec: stopped: nullable: true type: boolean + vectorAggregatorConfigMapName: + description: Name of the Vector aggregator discovery ConfigMap. It must contain the key `ADDRESS` with the address of the Vector aggregator. + nullable: true + type: string version: nullable: true type: string @@ -3366,6 +3598,82 @@ spec: cleaner: nullable: true type: boolean + logging: + default: + enableVectorAgent: null + containers: {} + properties: + containers: + additionalProperties: + anyOf: + - required: + - custom + - {} + description: Fragment derived from `ContainerLogConfigChoice` + properties: + console: + nullable: true + properties: + level: + description: Log levels + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + nullable: true + type: string + type: object + file: + nullable: true + properties: + level: + description: Log levels + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + loggers: + additionalProperties: + properties: + level: + description: Log levels + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + default: {} + type: object + type: object + type: object + enableVectorAgent: + nullable: true + type: boolean + type: object resources: default: memory: @@ -3908,6 +4216,82 @@ spec: cleaner: nullable: true type: boolean + logging: + default: + enableVectorAgent: null + containers: {} + properties: + containers: + additionalProperties: + anyOf: + - required: + - custom + - {} + description: Fragment derived from `ContainerLogConfigChoice` + properties: + console: + nullable: true + properties: + level: + description: Log levels + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + nullable: true + type: string + type: object + file: + nullable: true + properties: + level: + description: Log levels + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + loggers: + additionalProperties: + properties: + level: + description: Log levels + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + default: {} + type: object + type: object + type: object + enableVectorAgent: + nullable: true + type: boolean + type: object resources: default: memory: @@ -4003,6 +4387,10 @@ spec: type: string nullable: true type: object + vectorAggregatorConfigMapName: + description: Name of the Vector aggregator discovery ConfigMap. It must contain the key `ADDRESS` with the address of the Vector aggregator. + nullable: true + type: string required: - image - logFileDirectory diff --git a/docs/modules/spark-k8s/pages/history_server.adoc b/docs/modules/spark-k8s/pages/history_server.adoc index 32cf4788..65e387e3 100644 --- a/docs/modules/spark-k8s/pages/history_server.adoc +++ b/docs/modules/spark-k8s/pages/history_server.adoc @@ -48,6 +48,23 @@ include::example$example-history-app.yaml[] <6> Credentials used to write event logs. These can, of course, differ from the credentials used to process data. +== Log aggregation + +The logs can be forwarded to a Vector log aggregator by providing a discovery +ConfigMap for the aggregator and by enabling the log agent: + +[source,yaml] +---- +spec: + vectorAggregatorConfigMapName: vector-aggregator-discovery + nodes: + config: + logging: + enableVectorAgent: true +---- + +Further information on how to configure logging, can be found in +xref:home:concepts:logging.adoc[]. == History Web UI diff --git a/docs/modules/spark-k8s/pages/usage.adoc b/docs/modules/spark-k8s/pages/usage.adoc index 1a170e2c..64e6aa5c 100644 --- a/docs/modules/spark-k8s/pages/usage.adoc +++ b/docs/modules/spark-k8s/pages/usage.adoc @@ -177,6 +177,28 @@ Spark allocates a default amount of non-heap memory based on the type of job (JV NOTE: It is possible to define Spark resources either directly by setting configuration properties listed under `sparkConf`, or by using resource limits. If both are used, then `sparkConf` properties take precedence. It is recommended for the sake of clarity to use *_either_* one *_or_* the other. +== Log aggregation + +The logs can be forwarded to a Vector log aggregator by providing a discovery +ConfigMap for the aggregator and by enabling the log agent: + +[source,yaml] +---- +spec: + vectorAggregatorConfigMapName: vector-aggregator-discovery + job: + logging: + enableVectorAgent: true + driver: + logging: + enableVectorAgent: true + executor: + logging: + enableVectorAgent: true +---- + +Further information on how to configure logging, can be found in +xref:home:concepts:logging.adoc[]. == CRD argument coverage diff --git a/rust/crd/src/constants.rs b/rust/crd/src/constants.rs index b1f67887..7d172716 100644 --- a/rust/crd/src/constants.rs +++ b/rust/crd/src/constants.rs @@ -1,21 +1,29 @@ pub const APP_NAME: &str = "spark-k8s"; -pub const VOLUME_MOUNT_NAME_POD_TEMPLATES: &str = "pod-template"; -pub const VOLUME_MOUNT_PATH_POD_TEMPLATES: &str = "/stackable/spark/pod-templates"; +pub const VOLUME_MOUNT_NAME_DRIVER_POD_TEMPLATES: &str = "driver-pod-template"; +pub const VOLUME_MOUNT_PATH_DRIVER_POD_TEMPLATES: &str = "/stackable/spark/driver-pod-templates"; + +pub const VOLUME_MOUNT_NAME_EXECUTOR_POD_TEMPLATES: &str = "executor-pod-template"; +pub const VOLUME_MOUNT_PATH_EXECUTOR_POD_TEMPLATES: &str = + "/stackable/spark/executor-pod-templates"; + +pub const POD_TEMPLATE_FILE: &str = "template.yaml"; + +pub const VOLUME_MOUNT_NAME_CONFIG: &str = "config"; -pub const CONTAINER_NAME_JOB: &str = "job"; pub const VOLUME_MOUNT_NAME_JOB: &str = "job-files"; pub const VOLUME_MOUNT_PATH_JOB: &str = "/stackable/spark/jobs"; -pub const CONTAINER_NAME_REQ: &str = "requirements"; pub const VOLUME_MOUNT_NAME_REQ: &str = "req-files"; pub const VOLUME_MOUNT_PATH_REQ: &str = "/stackable/spark/requirements"; -pub const CONTAINER_IMAGE_NAME_DRIVER: &str = "dummy-overwritten-by-command-line"; -pub const CONTAINER_NAME_DRIVER: &str = "spark-driver"; +pub const VOLUME_MOUNT_NAME_LOG_CONFIG: &str = "log-config"; +pub const VOLUME_MOUNT_PATH_LOG_CONFIG: &str = "/stackable/log_config"; -pub const CONTAINER_IMAGE_NAME_EXECUTOR: &str = "dummy-overwritten-by-command-line"; -pub const CONTAINER_NAME_EXECUTOR: &str = "spark-executor"; +pub const VOLUME_MOUNT_NAME_LOG: &str = "log"; +pub const VOLUME_MOUNT_PATH_LOG: &str = "/stackable/log"; + +pub const LOG4J2_CONFIG_FILE: &str = "log4j2.properties"; pub const ACCESS_KEY_ID: &str = "accessKey"; pub const SECRET_ACCESS_KEY: &str = "secretKey"; @@ -25,6 +33,11 @@ pub const MIN_MEMORY_OVERHEAD: u32 = 384; pub const JVM_OVERHEAD_FACTOR: f32 = 0.1; pub const NON_JVM_OVERHEAD_FACTOR: f32 = 0.4; +pub const MAX_SPARK_LOG_FILES_SIZE_IN_MIB: u32 = 10; +pub const MAX_INIT_CONTAINER_LOG_FILES_SIZE_IN_MIB: u32 = 1; +pub const LOG_VOLUME_SIZE_IN_MIB: u32 = + MAX_SPARK_LOG_FILES_SIZE_IN_MIB + MAX_INIT_CONTAINER_LOG_FILES_SIZE_IN_MIB; + pub const OPERATOR_NAME: &str = "spark.stackable.tech"; pub const CONTROLLER_NAME: &str = "sparkapplication"; pub const POD_DRIVER_CONTROLLER_NAME: &str = "pod-driver"; diff --git a/rust/crd/src/history.rs b/rust/crd/src/history.rs index 955ef4c5..b9377b9e 100644 --- a/rust/crd/src/history.rs +++ b/rust/crd/src/history.rs @@ -27,10 +27,11 @@ use stackable_operator::{ transform_all_roles_to_config, validate_all_roles_and_groups_config, Configuration, ValidatedRoleConfigByPropertyKind, }, + product_logging::{self, spec::Logging}, role_utils::{Role, RoleGroupRef}, schemars::{self, JsonSchema}, }; -use strum::Display; +use strum::{Display, EnumIter}; #[derive(Snafu, Debug)] pub enum Error { @@ -62,6 +63,10 @@ pub enum Error { #[serde(rename_all = "camelCase")] pub struct SparkHistoryServerSpec { pub image: ProductImage, + /// Name of the Vector aggregator discovery ConfigMap. + /// It must contain the key `ADDRESS` with the address of the Vector aggregator. + #[serde(skip_serializing_if = "Option::is_none")] + pub vector_aggregator_config_map_name: Option, pub log_file_directory: LogFileDirectorySpec, #[serde(skip_serializing_if = "Option::is_none")] pub spark_conf: Option>, @@ -180,6 +185,26 @@ pub struct S3LogFileDirectorySpec { )] pub struct HistoryStorageConfig {} +#[derive( + Clone, + Debug, + Deserialize, + Display, + Eq, + EnumIter, + JsonSchema, + Ord, + PartialEq, + PartialOrd, + Serialize, +)] +#[serde(rename_all = "kebab-case")] +#[strum(serialize_all = "kebab-case")] +pub enum SparkHistoryServerContainer { + SparkHistory, + Vector, +} + #[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)] #[fragment_attrs( derive( @@ -200,6 +225,8 @@ pub struct HistoryConfig { #[fragment_attrs(serde(default))] pub resources: Resources, #[fragment_attrs(serde(default))] + pub logging: Logging, + #[fragment_attrs(serde(default))] pub affinity: StackableAffinity, } @@ -218,6 +245,7 @@ impl HistoryConfig { }, storage: HistoryStorageConfigFragment {}, }, + logging: product_logging::spec::default_logging(), affinity: history_affinity(cluster_name), } } diff --git a/rust/crd/src/lib.rs b/rust/crd/src/lib.rs index 693332cb..d144940f 100644 --- a/rust/crd/src/lib.rs +++ b/rust/crd/src/lib.rs @@ -40,10 +40,11 @@ use stackable_operator::{ kube::{CustomResource, ResourceExt}, labels::ObjectLabels, memory::{BinaryMultiple, MemoryQuantity}, + product_logging::{self, spec::Logging}, role_utils::CommonConfiguration, schemars::{self, JsonSchema}, }; -use strum::{Display, EnumString}; +use strum::{Display, EnumIter, EnumString}; #[derive(Snafu, Debug)] pub enum Error { @@ -70,6 +71,8 @@ pub enum Error { FragmentValidationFailure { source: ValidationError }, } +#[derive(Clone, Debug, Deserialize, Display, Eq, PartialEq, Serialize, JsonSchema)] +#[strum(serialize_all = "kebab-case")] pub enum SparkApplicationRole { Driver, Executor, @@ -117,6 +120,8 @@ pub struct SparkStorageConfig {} pub struct SparkConfig { #[fragment_attrs(serde(default))] pub resources: Resources, + #[fragment_attrs(serde(default))] + pub logging: Logging, } impl SparkConfig { @@ -133,6 +138,7 @@ impl SparkConfig { }, storage: SparkStorageConfigFragment {}, }, + logging: product_logging::spec::default_logging(), } } } @@ -169,6 +175,10 @@ pub struct SparkApplicationSpec { pub spark_image_pull_policy: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub spark_image_pull_secrets: Option>, + /// Name of the Vector aggregator discovery ConfigMap. + /// It must contain the key `ADDRESS` with the address of the Vector aggregator. + #[serde(skip_serializing_if = "Option::is_none")] + pub vector_aggregator_config_map_name: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub job: Option, #[serde(default, skip_serializing_if = "Option::is_none")] @@ -224,8 +234,12 @@ impl SparkApplication { .and_then(|common_config| common_config.enable_monitoring) } - pub fn pod_template_config_map_name(&self) -> String { - format!("{}-pod-template", self.name_unchecked()) + pub fn submit_job_config_map_name(&self) -> String { + format!("{app_name}-submit-job", app_name = self.name_any()) + } + + pub fn pod_template_config_map_name(&self, role: SparkApplicationRole) -> String { + format!("{app_name}-{role}-pod-template", app_name = self.name_any()) } pub fn mode(&self) -> Option<&str> { @@ -264,6 +278,7 @@ impl SparkApplication { &self, s3conn: &Option, s3logdir: &Option, + log_config_map: &str, ) -> Vec { let mut result: Vec = self .spec @@ -302,6 +317,21 @@ impl SparkApplication { result.push(v); } + result.push( + VolumeBuilder::new(VOLUME_MOUNT_NAME_LOG_CONFIG) + .with_config_map(log_config_map) + .build(), + ); + + result.push( + VolumeBuilder::new(VOLUME_MOUNT_NAME_LOG) + .with_empty_dir( + None::, + Some(Quantity(format!("{LOG_VOLUME_SIZE_IN_MIB}Mi"))), + ) + .build(), + ); + result } @@ -310,11 +340,18 @@ impl SparkApplication { s3conn: &Option, s3logdir: &Option, ) -> Vec { - let volume_mounts = vec![VolumeMount { - name: VOLUME_MOUNT_NAME_POD_TEMPLATES.into(), - mount_path: VOLUME_MOUNT_PATH_POD_TEMPLATES.into(), - ..VolumeMount::default() - }]; + let volume_mounts = vec![ + VolumeMount { + name: VOLUME_MOUNT_NAME_DRIVER_POD_TEMPLATES.into(), + mount_path: VOLUME_MOUNT_PATH_DRIVER_POD_TEMPLATES.into(), + ..VolumeMount::default() + }, + VolumeMount { + name: VOLUME_MOUNT_NAME_EXECUTOR_POD_TEMPLATES.into(), + mount_path: VOLUME_MOUNT_PATH_EXECUTOR_POD_TEMPLATES.into(), + ..VolumeMount::default() + }, + ]; self.add_common_volume_mounts(volume_mounts, s3conn, s3logdir) } @@ -378,6 +415,18 @@ impl SparkApplication { mounts.push(vm); } + mounts.push(VolumeMount { + name: VOLUME_MOUNT_NAME_LOG_CONFIG.into(), + mount_path: VOLUME_MOUNT_PATH_LOG_CONFIG.into(), + ..VolumeMount::default() + }); + + mounts.push(VolumeMount { + name: VOLUME_MOUNT_NAME_LOG.into(), + mount_path: VOLUME_MOUNT_PATH_LOG.into(), + ..VolumeMount::default() + }); + mounts } @@ -411,14 +460,20 @@ impl SparkApplication { "--master k8s://https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT_HTTPS}".to_string(), format!("--deploy-mode {mode}"), format!("--name {name}"), - format!("--conf spark.kubernetes.driver.podTemplateFile={VOLUME_MOUNT_PATH_POD_TEMPLATES}/driver.yml"), - format!("--conf spark.kubernetes.executor.podTemplateFile={VOLUME_MOUNT_PATH_POD_TEMPLATES}/executor.yml"), - format!("--conf spark.kubernetes.driver.podTemplateContainerName={CONTAINER_NAME_DRIVER}"), - format!("--conf spark.kubernetes.executor.podTemplateContainerName={CONTAINER_NAME_EXECUTOR}"), + format!("--conf spark.kubernetes.driver.podTemplateFile={VOLUME_MOUNT_PATH_DRIVER_POD_TEMPLATES}/{POD_TEMPLATE_FILE}"), + format!("--conf spark.kubernetes.executor.podTemplateFile={VOLUME_MOUNT_PATH_EXECUTOR_POD_TEMPLATES}/{POD_TEMPLATE_FILE}"), + format!("--conf spark.kubernetes.driver.podTemplateContainerName={container_name}", container_name = SparkContainer::Spark), + format!("--conf spark.kubernetes.executor.podTemplateContainerName={container_name}", container_name = SparkContainer::Spark), format!("--conf spark.kubernetes.namespace={}", self.metadata.namespace.as_ref().context(NoNamespaceSnafu)?), format!("--conf spark.kubernetes.driver.container.image={}", self.spec.spark_image.as_ref().context(NoSparkImageSnafu)?), format!("--conf spark.kubernetes.executor.container.image={}", self.spec.spark_image.as_ref().context(NoSparkImageSnafu)?), format!("--conf spark.kubernetes.authenticate.driver.serviceAccountName={}", serviceaccount_name), + format!("--conf spark.driver.defaultJavaOptions=-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}"), + format!("--conf spark.driver.extraClassPath=/stackable/spark/extra-jars/*"), + "--conf spark.driver.userClassPathFirst=true".to_string(), + format!("--conf spark.executor.defaultJavaOptions=-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}"), + format!("--conf spark.executor.extraClassPath=/stackable/spark/extra-jars/*"), + "--conf spark.executor.userClassPathFirst=true".to_string(), ]); // See https://spark.apache.org/docs/latest/running-on-kubernetes.html#dependency-management @@ -635,17 +690,6 @@ impl SparkApplication { e } - pub fn affinity(&self, role: SparkApplicationRole) -> Result { - match role { - SparkApplicationRole::Driver => self - .driver_config() - .map(|driver_config| driver_config.affinity), - SparkApplicationRole::Executor => self - .executor_config() - .map(|executor_config| executor_config.affinity), - } - } - pub fn job_config(&self) -> Result { let mut config = self.spec.job.clone().unwrap_or_default(); config.merge(&SparkConfig::default_config()); @@ -727,6 +771,48 @@ pub struct CommonConfig { pub enable_monitoring: Option, } +#[derive( + Clone, + Debug, + Deserialize, + Display, + Eq, + EnumIter, + JsonSchema, + Ord, + PartialEq, + PartialOrd, + Serialize, +)] +#[serde(rename_all = "kebab-case")] +#[strum(serialize_all = "kebab-case")] +pub enum SubmitJobContainer { + SparkSubmit, + Vector, +} + +#[derive( + Clone, + Debug, + Deserialize, + Display, + Eq, + EnumIter, + JsonSchema, + Ord, + PartialEq, + PartialOrd, + Serialize, +)] +#[serde(rename_all = "kebab-case")] +#[strum(serialize_all = "kebab-case")] +pub enum SparkContainer { + Job, + Requirements, + Spark, + Vector, +} + #[derive(Clone, Debug, Default, Fragment, JsonSchema, PartialEq)] #[fragment_attrs( derive( @@ -744,6 +830,8 @@ pub struct CommonConfig { pub struct DriverConfig { #[fragment_attrs(serde(default))] pub resources: Resources, + #[fragment_attrs(serde(default))] + pub logging: Logging, #[fragment_attrs(serde(default, flatten))] pub volume_mounts: Option, #[fragment_attrs(serde(default))] @@ -764,6 +852,7 @@ impl DriverConfig { }, storage: SparkStorageConfigFragment {}, }, + logging: product_logging::spec::default_logging(), volume_mounts: Some(VolumeMounts::default()), affinity: StackableAffinityFragment::default(), } @@ -789,6 +878,8 @@ pub struct ExecutorConfig { pub instances: Option, #[fragment_attrs(serde(default))] pub resources: Resources, + #[fragment_attrs(serde(default))] + pub logging: Logging, #[fragment_attrs(serde(default, flatten))] pub volume_mounts: Option, #[fragment_attrs(serde(default, flatten))] @@ -812,7 +903,8 @@ impl ExecutorConfig { }, storage: SparkStorageConfigFragment {}, }, - volume_mounts: Default::default(), + logging: product_logging::spec::default_logging(), + volume_mounts: Some(VolumeMounts::default()), node_selector: Default::default(), affinity: Default::default(), } diff --git a/rust/operator-binary/src/history_controller.rs b/rust/operator-binary/src/history_controller.rs index 6bd6c9f2..8f0f8767 100644 --- a/rust/operator-binary/src/history_controller.rs +++ b/rust/operator-binary/src/history_controller.rs @@ -1,10 +1,7 @@ use stackable_operator::{ builder::{ConfigMapBuilder, ContainerBuilder, ObjectMetaBuilder, PodBuilder, VolumeBuilder}, cluster_resources::ClusterResources, - commons::{ - product_image_selection::ResolvedProductImage, - resources::{NoRuntimeLimits, Resources}, - }, + commons::product_image_selection::ResolvedProductImage, k8s_openapi::{ api::{ apps::v1::{StatefulSet, StatefulSetSpec}, @@ -13,7 +10,7 @@ use stackable_operator::{ }, rbac::v1::{ClusterRole, RoleBinding, RoleRef, Subject}, }, - apimachinery::pkg::apis::meta::v1::LabelSelector, + apimachinery::pkg::{api::resource::Quantity, apis::meta::v1::LabelSelector}, }, kube::{ runtime::{controller::Action, reflector::ObjectRef}, @@ -21,20 +18,29 @@ use stackable_operator::{ }, labels::{role_group_selector_labels, role_selector_labels, ObjectLabels}, product_config::ProductConfigManager, + product_logging::{ + framework::vector_container, + spec::{ + ConfigMapLogConfig, ContainerLogConfig, ContainerLogConfigChoice, + CustomContainerLogConfig, + }, + }, role_utils::RoleGroupRef, }; use stackable_spark_k8s_crd::{ constants::*, - history::{HistoryStorageConfig, SparkHistoryServer}, + history::{HistoryConfig, SparkHistoryServer, SparkHistoryServerContainer}, s3logdir::S3LogDir, }; use std::time::Duration; use std::{collections::BTreeMap, sync::Arc}; -use snafu::{ResultExt, Snafu}; +use snafu::{OptionExt, ResultExt, Snafu}; use stackable_operator::logging::controller::ReconcilerError; use strum::{EnumDiscriminants, IntoStaticStr}; +use crate::product_logging::{self, resolve_vector_aggregator_address}; + pub struct Ctx { pub client: stackable_operator::client::Client, pub product_config: ProductConfigManager, @@ -44,15 +50,16 @@ pub struct Ctx { #[strum_discriminants(derive(IntoStaticStr))] #[allow(clippy::enum_variant_names)] pub enum Error { + #[snafu(display("object has no namespace"))] + ObjectHasNoNamespace, #[snafu(display("invalid config map {name}"))] InvalidConfigMap { source: stackable_operator::error::Error, name: String, }, - #[snafu(display("invalid history container name {name}"))] + #[snafu(display("invalid history container name"))] InvalidContainerName { source: stackable_operator::error::Error, - name: String, }, #[snafu(display("object is missing metadata to build owner reference"))] ObjectMissingMetadataForOwnerRef { @@ -102,6 +109,15 @@ pub enum Error { DeleteOrphanedResources { source: stackable_operator::error::Error, }, + #[snafu(display("failed to resolve the Vector aggregator address"))] + ResolveVectorAggregatorAddress { + source: crate::product_logging::Error, + }, + #[snafu(display("failed to add the logging configuration to the ConfigMap [{cm_name}]"))] + InvalidLoggingConfig { + source: crate::product_logging::Error, + cm_name: String, + }, } type Result = std::result::Result; @@ -134,6 +150,16 @@ pub async fn reconcile(shs: Arc, ctx: Arc) -> Result, ctx: Arc) -> Result, ctx: Arc) -> Result, _error: &Error, _ctx: Arc, s3_log_dir: &S3LogDir, + vector_aggregator_address: Option<&str>, ) -> Result { + let cm_name = rolegroupref.object_name(); + let spark_config = spark_config(shs, s3_log_dir, rolegroupref)?; - let result = ConfigMapBuilder::new() + let mut cm_builder = ConfigMapBuilder::new(); + + cm_builder .metadata( ObjectMetaBuilder::new() .name_and_namespace(shs) - .name(rolegroupref.object_name()) + .name(&cm_name) .ownerreference_from_resource(shs, None, Some(true)) .context(ObjectMissingMetadataForOwnerRefSnafu)? .with_recommended_labels(labels(shs, app_version_label, &rolegroupref.role_group)) .build(), ) - .add_data(HISTORY_CONFIG_FILE_NAME, spark_config) - .build() - .context(InvalidConfigMapSnafu { - name: "spark-history-config".to_string(), - })?; + .add_data(HISTORY_CONFIG_FILE_NAME, spark_config); + + product_logging::extend_config_map( + rolegroupref, + vector_aggregator_address, + &config.logging, + SparkHistoryServerContainer::SparkHistory, + SparkHistoryServerContainer::Vector, + &mut cm_builder, + ) + .context(InvalidLoggingConfigSnafu { cm_name: &cm_name })?; - Ok(result) + cm_builder + .build() + .context(InvalidConfigMapSnafu { name: cm_name }) } fn build_stateful_set( @@ -255,35 +297,46 @@ fn build_stateful_set( resolved_product_image: &ResolvedProductImage, rolegroupref: &RoleGroupRef, s3_log_dir: &S3LogDir, - resources: &Resources, + config: &HistoryConfig, serviceaccount: &ServiceAccount, ) -> Result { - let container_name = "spark-history"; - let container = ContainerBuilder::new(container_name) - .context(InvalidContainerNameSnafu { - name: String::from(container_name), - })? - .image_from_product_image(resolved_product_image) - .resources(resources.clone().into()) - .command(vec!["/bin/bash".to_string()]) - .args(command_args(s3_log_dir)) - .add_container_port("http", 18080) - // This env var prevents the history server from detaching itself from the - // start script because this leads to the Pod terminating immediately. - .add_env_var("SPARK_NO_DAEMONIZE", "true") - .add_volume_mounts(s3_log_dir.credentials_volume_mount().into_iter()) - .add_volume_mount("config", "/stackable/spark/conf") - .build(); + let log_config_map = if let Some(ContainerLogConfig { + choice: + Some(ContainerLogConfigChoice::Custom(CustomContainerLogConfig { + custom: ConfigMapLogConfig { config_map }, + })), + }) = config + .logging + .containers + .get(&SparkHistoryServerContainer::SparkHistory) + { + config_map.into() + } else { + rolegroupref.object_name() + }; + + let mut pb = PodBuilder::new(); - let template = PodBuilder::new() - .service_account_name(serviceaccount.name_unchecked()) - .add_container(container) + pb.service_account_name(serviceaccount.name_unchecked()) .image_pull_secrets_from_product_image(resolved_product_image) .add_volume( VolumeBuilder::new("config") .with_config_map(rolegroupref.object_name()) .build(), ) + .add_volume( + VolumeBuilder::new(VOLUME_MOUNT_NAME_LOG_CONFIG) + .with_config_map(log_config_map) + .build(), + ) + .add_volume( + VolumeBuilder::new(VOLUME_MOUNT_NAME_LOG) + .with_empty_dir( + None::, + Some(Quantity(format!("{LOG_VOLUME_SIZE_IN_MIB}Mi"))), + ) + .build(), + ) .add_volumes(s3_log_dir.credentials_volume().into_iter().collect()) .metadata_builder(|m| { m.with_recommended_labels(labels( @@ -297,12 +350,49 @@ fn build_stateful_set( run_as_group: Some(1000), fs_group: Some(1000), ..PodSecurityContext::default() - }) - .build_template(); + }); + + let container_name = "spark-history"; + let container = ContainerBuilder::new(container_name) + .context(InvalidContainerNameSnafu)? + .image_from_product_image(resolved_product_image) + .resources(config.resources.clone().into()) + .command(vec!["/bin/bash".to_string()]) + .args(command_args(s3_log_dir)) + .add_container_port("http", 18080) + // This env var prevents the history server from detaching itself from the + // start script because this leads to the Pod terminating immediately. + .add_env_var("SPARK_NO_DAEMONIZE", "true") + .add_env_var("SPARK_DAEMON_CLASSPATH", "/stackable/spark/extra-jars/*") + .add_env_var( + "SPARK_HISTORY_OPTS", + format!( + "-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}" + ), + ) + .add_volume_mounts(s3_log_dir.credentials_volume_mount().into_iter()) + .add_volume_mount("config", "/stackable/spark/conf") + .add_volume_mount(VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_LOG_CONFIG) + .add_volume_mount(VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_PATH_LOG) + .build(); + pb.add_container(container); + + if config.logging.enable_vector_agent { + pb.add_container(vector_container( + resolved_product_image, + VOLUME_MOUNT_NAME_CONFIG, + VOLUME_MOUNT_NAME_LOG, + config + .logging + .containers + .get(&SparkHistoryServerContainer::Vector), + )); + } Ok(StatefulSet { metadata: ObjectMetaBuilder::new() .name_and_namespace(shs) + .name(rolegroupref.object_name()) .ownerreference_from_resource(shs, None, Some(true)) .context(ObjectMissingMetadataForOwnerRefSnafu)? .with_recommended_labels(labels( @@ -312,7 +402,7 @@ fn build_stateful_set( )) .build(), spec: Some(StatefulSetSpec { - template, + template: pb.build_template(), replicas: shs.replicas(rolegroupref), selector: LabelSelector { match_labels: Some(role_group_selector_labels( diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index f6160fd9..a3143c1b 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -1,5 +1,6 @@ mod history_controller; mod pod_driver_controller; +mod product_logging; mod spark_k8s_controller; use std::sync::Arc; diff --git a/rust/operator-binary/src/product_logging.rs b/rust/operator-binary/src/product_logging.rs new file mode 100644 index 00000000..cfe244d5 --- /dev/null +++ b/rust/operator-binary/src/product_logging.rs @@ -0,0 +1,121 @@ +use std::fmt::Display; + +use snafu::{OptionExt, ResultExt, Snafu}; +use stackable_operator::{ + builder::ConfigMapBuilder, + client::Client, + k8s_openapi::api::core::v1::ConfigMap, + kube::Resource, + product_logging::{ + self, + spec::{ContainerLogConfig, ContainerLogConfigChoice, Logging}, + }, + role_utils::RoleGroupRef, +}; +use stackable_spark_k8s_crd::constants::{ + LOG4J2_CONFIG_FILE, MAX_SPARK_LOG_FILES_SIZE_IN_MIB, VOLUME_MOUNT_PATH_LOG, +}; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("failed to retrieve the ConfigMap {cm_name}"))] + ConfigMapNotFound { + source: stackable_operator::error::Error, + cm_name: String, + }, + #[snafu(display("failed to retrieve the entry {entry} for ConfigMap {cm_name}"))] + MissingConfigMapEntry { + entry: &'static str, + cm_name: String, + }, + #[snafu(display("vectorAggregatorConfigMapName must be set"))] + MissingVectorAggregatorAddress, +} + +type Result = std::result::Result; + +pub const LOG_FILE: &str = "spark.log4j2.xml"; + +const VECTOR_AGGREGATOR_CM_ENTRY: &str = "ADDRESS"; +const CONSOLE_CONVERSION_PATTERN: &str = "%d{ISO8601} %p [%t] %c - %m%n"; + +/// Return the address of the Vector aggregator if the corresponding ConfigMap name is given in the +/// cluster spec +pub async fn resolve_vector_aggregator_address( + client: &Client, + namespace: &str, + vector_aggregator_config_map_name: Option<&str>, +) -> Result> { + let vector_aggregator_address = + if let Some(vector_aggregator_config_map_name) = vector_aggregator_config_map_name { + let vector_aggregator_address = client + .get::(vector_aggregator_config_map_name, namespace) + .await + .context(ConfigMapNotFoundSnafu { + cm_name: vector_aggregator_config_map_name.to_string(), + })? + .data + .and_then(|mut data| data.remove(VECTOR_AGGREGATOR_CM_ENTRY)) + .context(MissingConfigMapEntrySnafu { + entry: VECTOR_AGGREGATOR_CM_ENTRY, + cm_name: vector_aggregator_config_map_name.to_string(), + })?; + Some(vector_aggregator_address) + } else { + None + }; + + Ok(vector_aggregator_address) +} + +/// Extend a ConfigMap with logging and Vector configurations +pub fn extend_config_map( + role_group: &RoleGroupRef, + vector_aggregator_address: Option<&str>, + logging: &Logging, + main_container: C, + vector_container: C, + cm_builder: &mut ConfigMapBuilder, +) -> Result<()> +where + C: Clone + Ord + Display, + K: Resource, +{ + if let Some(ContainerLogConfig { + choice: Some(ContainerLogConfigChoice::Automatic(log_config)), + }) = logging.containers.get(&main_container) + { + cm_builder.add_data( + LOG4J2_CONFIG_FILE, + product_logging::framework::create_log4j2_config( + &format!("{VOLUME_MOUNT_PATH_LOG}/{main_container}"), + LOG_FILE, + MAX_SPARK_LOG_FILES_SIZE_IN_MIB, + CONSOLE_CONVERSION_PATTERN, + log_config, + ), + ); + } + + let vector_log_config = if let Some(ContainerLogConfig { + choice: Some(ContainerLogConfigChoice::Automatic(log_config)), + }) = logging.containers.get(&vector_container) + { + Some(log_config) + } else { + None + }; + + if logging.enable_vector_agent { + cm_builder.add_data( + product_logging::framework::VECTOR_CONFIG_FILE, + product_logging::framework::create_vector_config( + role_group, + vector_aggregator_address.context(MissingVectorAggregatorAddressSnafu)?, + vector_log_config, + ), + ); + } + + Ok(()) +} diff --git a/rust/operator-binary/src/spark_k8s_controller.rs b/rust/operator-binary/src/spark_k8s_controller.rs index 0837e409..2469aab6 100644 --- a/rust/operator-binary/src/spark_k8s_controller.rs +++ b/rust/operator-binary/src/spark_k8s_controller.rs @@ -1,14 +1,17 @@ use std::{sync::Arc, time::Duration}; use stackable_spark_k8s_crd::{ - constants::*, s3logdir::S3LogDir, SparkApplication, SparkApplicationRole, + constants::*, s3logdir::S3LogDir, SparkApplication, SparkApplicationRole, SparkContainer, + SparkStorageConfig, SubmitJobContainer, }; use snafu::{OptionExt, ResultExt, Snafu}; use stackable_operator::{ - builder::{ConfigMapBuilder, ContainerBuilder, ObjectMetaBuilder, PodBuilder}, + builder::{ConfigMapBuilder, ContainerBuilder, ObjectMetaBuilder, PodBuilder, VolumeBuilder}, commons::{ affinity::StackableAffinity, + product_image_selection::ResolvedProductImage, + resources::{NoRuntimeLimits, Resources}, s3::S3ConnectionSpec, tls::{CaCert, TlsVerification}, }, @@ -16,18 +19,31 @@ use stackable_operator::{ api::{ batch::v1::{Job, JobSpec}, core::v1::{ - ConfigMap, ConfigMapVolumeSource, Container, EnvVar, Pod, PodSecurityContext, - PodSpec, PodTemplateSpec, ServiceAccount, Volume, VolumeMount, + ConfigMap, Container, EnvVar, Pod, PodSecurityContext, PodSpec, PodTemplateSpec, + ServiceAccount, Volume, VolumeMount, }, rbac::v1::{ClusterRole, RoleBinding, RoleRef, Subject}, }, Resource, }, - kube::runtime::controller::Action, + kube::{ + runtime::{controller::Action, reflector::ObjectRef}, + ResourceExt, + }, logging::controller::ReconcilerError, + product_logging::{ + framework::{capture_shell_output, shutdown_vector_command, vector_container}, + spec::{ + ConfigMapLogConfig, ContainerLogConfig, ContainerLogConfigChoice, + CustomContainerLogConfig, Logging, + }, + }, + role_utils::RoleGroupRef, }; use strum::{EnumDiscriminants, IntoStaticStr}; +use crate::product_logging::{self, resolve_vector_aggregator_address}; + pub struct Ctx { pub client: stackable_operator::client::Client, } @@ -36,6 +52,8 @@ pub struct Ctx { #[strum_discriminants(derive(IntoStaticStr))] #[allow(clippy::enum_variant_names)] pub enum Error { + #[snafu(display("object has no namespace"))] + ObjectHasNoNamespace, #[snafu(display("object is missing metadata to build owner reference"))] ObjectMissingMetadataForOwnerRef { source: stackable_operator::error::Error, @@ -62,10 +80,8 @@ pub enum Error { }, #[snafu(display("no spark base image specified"))] ObjectHasNoSparkImage, - #[snafu(display("driver pod template serialization"))] - DriverPodTemplateSerde { source: serde_yaml::Error }, - #[snafu(display("executor pod template serialization"))] - ExecutorPodTemplateSerde { source: serde_yaml::Error }, + #[snafu(display("pod template serialization"))] + PodTemplateSerde { source: serde_yaml::Error }, #[snafu(display("s3 bucket error"))] S3Bucket { source: stackable_operator::error::Error, @@ -80,15 +96,23 @@ pub enum Error { }, #[snafu(display("failed to recognise the container name"))] UnrecognisedContainerName, - #[snafu(display("illegal container name: [{container_name}]"))] + #[snafu(display("illegal container name"))] IllegalContainerName { source: stackable_operator::error::Error, - container_name: String, }, #[snafu(display("failed to resolve the s3 log dir configuration"))] S3LogDir { source: stackable_spark_k8s_crd::s3logdir::Error, }, + #[snafu(display("failed to resolve the Vector aggregator address"))] + ResolveVectorAggregatorAddress { + source: crate::product_logging::Error, + }, + #[snafu(display("failed to add the logging configuration to the ConfigMap [{cm_name}]"))] + InvalidLoggingConfig { + source: crate::product_logging::Error, + cm_name: String, + }, } type Result = std::result::Result; @@ -99,6 +123,14 @@ impl ReconcilerError for Error { } } +pub struct PodTemplateConfig { + pub role: SparkApplicationRole, + pub resources: Resources, + pub logging: Logging, + pub volume_mounts: Vec, + pub affinity: StackableAffinity, +} + pub async fn reconcile(spark_application: Arc, ctx: Arc) -> Result { tracing::info!("Starting reconcile"); @@ -150,65 +182,86 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) .await .context(ApplyRoleBindingSnafu)?; + let vector_aggregator_address = resolve_vector_aggregator_address( + client, + spark_application + .namespace() + .as_deref() + .context(ObjectHasNoNamespaceSnafu)?, + spark_application + .spec + .vector_aggregator_config_map_name + .as_deref(), + ) + .await + .context(ResolveVectorAggregatorAddressSnafu)?; + let spark_image = spark_application .spec .spark_image .as_deref() .context(ObjectHasNoSparkImageSnafu)?; - let mut jcb = - ContainerBuilder::new(CONTAINER_NAME_JOB).with_context(|_| IllegalContainerNameSnafu { - container_name: APP_NAME.to_string(), - })?; - let job_container = spark_application.spec.image.as_ref().map(|job_image| { - jcb.image(job_image) - .command(vec![ - "/bin/bash".to_string(), - "-x".to_string(), - "-c".to_string(), - format!("cp /jobs/* {VOLUME_MOUNT_PATH_JOB}"), - ]) - .add_volume_mount(VOLUME_MOUNT_NAME_JOB, VOLUME_MOUNT_PATH_JOB) - .build() - }); + let env_vars = spark_application.env(); - let mut rcb = - ContainerBuilder::new(CONTAINER_NAME_REQ).with_context(|_| IllegalContainerNameSnafu { - container_name: APP_NAME.to_string(), - })?; - let requirements_container = spark_application.requirements().map(|req| { - rcb.image(spark_image) - .command(vec![ - "/bin/bash".to_string(), - "-x".to_string(), - "-c".to_string(), - format!("pip install --target={VOLUME_MOUNT_PATH_REQ} {req}"), - ]) - .add_volume_mount(VOLUME_MOUNT_NAME_REQ, VOLUME_MOUNT_PATH_REQ); - if let Some(image_pull_policy) = spark_application.spark_image_pull_policy() { - rcb.image_pull_policy(image_pull_policy.to_string()); - } - rcb.build() - }); + let driver_config = spark_application + .driver_config() + .context(FailedToResolveConfigSnafu)?; + let driver_pod_template_config = PodTemplateConfig { + role: SparkApplicationRole::Driver, + resources: driver_config.resources.clone(), + logging: driver_config.logging.clone(), + volume_mounts: spark_application.driver_volume_mounts( + &driver_config, + &opt_s3conn, + &s3logdir, + ), + affinity: driver_config.affinity, + }; + let driver_pod_template_config_map = pod_template_config_map( + &spark_application, + &driver_pod_template_config, + &env_vars, + &opt_s3conn, + &s3logdir, + vector_aggregator_address.as_deref(), + )?; + client + .apply_patch( + CONTROLLER_NAME, + &driver_pod_template_config_map, + &driver_pod_template_config_map, + ) + .await + .context(ApplyApplicationSnafu)?; - let env_vars = spark_application.env(); - let init_containers: Vec = - vec![job_container.clone(), requirements_container.clone()] - .into_iter() - .flatten() - .collect(); - let pod_template_config_map = pod_template_config_map( + let executor_config = spark_application + .executor_config() + .context(FailedToResolveConfigSnafu)?; + let executor_pod_template_config = PodTemplateConfig { + role: SparkApplicationRole::Executor, + resources: executor_config.resources.clone(), + logging: executor_config.logging.clone(), + volume_mounts: spark_application.executor_volume_mounts( + &executor_config, + &opt_s3conn, + &s3logdir, + ), + affinity: executor_config.affinity, + }; + let executor_pod_template_config_map = pod_template_config_map( &spark_application, - init_containers.as_ref(), + &executor_pod_template_config, &env_vars, &opt_s3conn, &s3logdir, + vector_aggregator_address.as_deref(), )?; client .apply_patch( CONTROLLER_NAME, - &pod_template_config_map, - &pod_template_config_map, + &executor_pod_template_config_map, + &executor_pod_template_config_map, ) .await .context(ApplyApplicationSnafu)?; @@ -221,11 +274,21 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) ) .context(BuildCommandSnafu)?; + let submit_job_config_map = + submit_job_config_map(&spark_application, vector_aggregator_address.as_deref())?; + client + .apply_patch( + CONTROLLER_NAME, + &submit_job_config_map, + &submit_job_config_map, + ) + .await + .context(ApplyApplicationSnafu)?; + let job = spark_job( &spark_application, spark_image, &serviceaccount, - &job_container, &env_vars, &job_commands, &opt_s3conn, @@ -239,41 +302,106 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) Ok(Action::await_change()) } +fn init_containers( + spark_application: &SparkApplication, + logging: &Logging, +) -> Result> { + let mut jcb = ContainerBuilder::new(&SparkContainer::Job.to_string()) + .context(IllegalContainerNameSnafu)?; + let job_container = spark_application.spec.image.as_ref().map(|job_image| { + let mut args = Vec::new(); + if let Some(ContainerLogConfig { + choice: Some(ContainerLogConfigChoice::Automatic(log_config)), + }) = logging.containers.get(&SparkContainer::Job) + { + args.push(capture_shell_output( + VOLUME_MOUNT_PATH_LOG, + &SparkContainer::Job.to_string(), + log_config, + )); + }; + args.push(format!("echo Copying job files to {VOLUME_MOUNT_PATH_JOB}")); + args.push(format!("cp /jobs/* {VOLUME_MOUNT_PATH_JOB}")); + // Wait until the log file is written. + args.push("sleep 1".into()); + + jcb.image(job_image) + .command(vec!["/bin/bash".to_string(), "-c".to_string()]) + .args(vec![args.join(" && ")]) + .add_volume_mount(VOLUME_MOUNT_NAME_JOB, VOLUME_MOUNT_PATH_JOB) + .add_volume_mount(VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_PATH_LOG) + .build() + }); + + let spark_image = spark_application + .spec + .spark_image + .as_deref() + .context(ObjectHasNoSparkImageSnafu)?; + + let mut rcb = ContainerBuilder::new(&SparkContainer::Requirements.to_string()) + .context(IllegalContainerNameSnafu)?; + let requirements_container = spark_application.requirements().map(|req| { + let mut args = Vec::new(); + if let Some(ContainerLogConfig { + choice: Some(ContainerLogConfigChoice::Automatic(log_config)), + }) = logging.containers.get(&SparkContainer::Requirements) + { + args.push(capture_shell_output( + VOLUME_MOUNT_PATH_LOG, + &SparkContainer::Requirements.to_string(), + log_config, + )); + }; + args.push(format!( + "echo Installing requirements to {VOLUME_MOUNT_PATH_REQ}: {req}" + )); + args.push(format!( + "pip install --target={VOLUME_MOUNT_PATH_REQ} {req}" + )); + + rcb.image(spark_image) + .command(vec!["/bin/bash".to_string(), "-c".to_string()]) + .args(vec![args.join(" && ")]) + .add_volume_mount(VOLUME_MOUNT_NAME_REQ, VOLUME_MOUNT_PATH_REQ) + .add_volume_mount(VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_PATH_LOG); + if let Some(image_pull_policy) = spark_application.spark_image_pull_policy() { + rcb.image_pull_policy(image_pull_policy.to_string()); + } + rcb.build() + }); + + Ok(vec![job_container, requirements_container] + .into_iter() + .flatten() + .collect()) +} + #[allow(clippy::too_many_arguments)] fn pod_template( spark_application: &SparkApplication, - container_name: &str, - init_containers: &[Container], + config: &PodTemplateConfig, volumes: &[Volume], - volume_mounts: &[VolumeMount], env: &[EnvVar], - affinity: Option, ) -> Result { - // N.B. this may be ignored by spark as preference is given to spark - // configuration settings. - let resources = match container_name { - CONTAINER_NAME_DRIVER => { - spark_application - .driver_config() - .context(FailedToResolveConfigSnafu)? - .resources - } - CONTAINER_NAME_EXECUTOR => { - spark_application - .executor_config() - .context(FailedToResolveConfigSnafu)? - .resources - } - _ => return UnrecognisedContainerNameSnafu.fail(), - }; + let container_name = SparkContainer::Spark.to_string(); - let mut cb = - ContainerBuilder::new(container_name).with_context(|_| IllegalContainerNameSnafu { - container_name: APP_NAME.to_string(), - })?; - cb.add_volume_mounts(volume_mounts.to_vec()) + let mut cb = ContainerBuilder::new(&container_name).context(IllegalContainerNameSnafu)?; + cb.add_volume_mounts(config.volume_mounts.clone()) .add_env_vars(env.to_vec()) - .resources(resources.into()); + .resources(config.resources.clone().into()); + + if config.logging.enable_vector_agent { + cb.add_env_var( + "_STACKABLE_POST_HOOK", + [ + // Wait for Vector to gather the logs. + "sleep 10", + &shutdown_vector_command(VOLUME_MOUNT_PATH_LOG), + ] + .join("; "), + ); + } if let Some(image_pull_policy) = spark_application.spark_image_pull_policy() { cb.image_pull_policy(image_pull_policy.to_string()); @@ -282,21 +410,21 @@ fn pod_template( let mut pb = PodBuilder::new(); pb.metadata( ObjectMetaBuilder::new() - .name(container_name) + .name(&container_name) // this reference is not pointing to a controller but only provides a UID that can used to clean up resources // cleanly (specifically driver pods and related config maps) when the spark application is deleted. .ownerreference_from_resource(spark_application, None, None) .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(spark_application.build_recommended_labels(container_name)) + .with_recommended_labels(spark_application.build_recommended_labels(&container_name)) .build(), ) .add_container(cb.build()) .add_volumes(volumes.to_vec()) .security_context(security_context()); - if let Some(affinity) = affinity { - pb.affinity(&affinity); - } + pb.affinity(&config.affinity); + + let init_containers = init_containers(spark_application, &config.logging).unwrap(); for init_container in init_containers { pb.add_init_container(init_container.clone()); @@ -310,56 +438,66 @@ fn pod_template( ); } + if config.logging.enable_vector_agent { + pb.add_container(vector_container( + &ResolvedProductImage { + product_version: "".into(), + app_version_label: "".into(), + image: spark_application + .spec + .spark_image + .clone() + .context(ObjectHasNoSparkImageSnafu)?, + image_pull_policy: "".into(), + pull_secrets: None, + }, + VOLUME_MOUNT_NAME_CONFIG, + VOLUME_MOUNT_NAME_LOG, + config.logging.containers.get(&SparkContainer::Vector), + )); + } + pb.build().context(PodTemplateConfigMapSnafu) } fn pod_template_config_map( spark_application: &SparkApplication, - init_containers: &[Container], + config: &PodTemplateConfig, env: &[EnvVar], s3conn: &Option, s3logdir: &Option, + vector_aggregator_address: Option<&str>, ) -> Result { - let volumes = spark_application.volumes(s3conn, s3logdir); + let cm_name = spark_application.pod_template_config_map_name(config.role.clone()); + + let log_config_map = if let Some(ContainerLogConfig { + choice: + Some(ContainerLogConfigChoice::Custom(CustomContainerLogConfig { + custom: ConfigMapLogConfig { config_map }, + })), + }) = config.logging.containers.get(&SparkContainer::Spark) + { + config_map.into() + } else { + cm_name.clone() + }; - let driver_config = spark_application - .driver_config() - .context(FailedToResolveConfigSnafu)?; - let driver_template = pod_template( - spark_application, - CONTAINER_NAME_DRIVER, - init_containers, - volumes.as_ref(), - spark_application - .driver_volume_mounts(&driver_config, s3conn, s3logdir) - .as_ref(), - env, - spark_application - .affinity(SparkApplicationRole::Driver) - .ok(), - )?; - let executor_config = spark_application - .executor_config() - .context(FailedToResolveConfigSnafu)?; - let executor_template = pod_template( - spark_application, - CONTAINER_NAME_EXECUTOR, - init_containers, - volumes.as_ref(), - spark_application - .executor_volume_mounts(&executor_config, s3conn, s3logdir) - .as_ref(), - env, - spark_application - .affinity(SparkApplicationRole::Executor) - .ok(), - )?; + let mut volumes = spark_application.volumes(s3conn, s3logdir, &log_config_map); + volumes.push( + VolumeBuilder::new(VOLUME_MOUNT_NAME_CONFIG) + .with_config_map(&cm_name) + .build(), + ); + + let template = pod_template(spark_application, config, volumes.as_ref(), env)?; - ConfigMapBuilder::new() + let mut cm_builder = ConfigMapBuilder::new(); + + cm_builder .metadata( ObjectMetaBuilder::new() .name_and_namespace(spark_application) - .name(spark_application.pod_template_config_map_name()) + .name(&cm_name) .ownerreference_from_resource(spark_application, None, Some(true)) .context(ObjectMissingMetadataForOwnerRefSnafu)? .with_recommended_labels( @@ -368,15 +506,64 @@ fn pod_template_config_map( .build(), ) .add_data( - "driver.yml", - serde_yaml::to_string(&driver_template).context(DriverPodTemplateSerdeSnafu)?, - ) - .add_data( - "executor.yml", - serde_yaml::to_string(&executor_template).context(ExecutorPodTemplateSerdeSnafu)?, - ) - .build() - .context(PodTemplateConfigMapSnafu) + POD_TEMPLATE_FILE, + serde_yaml::to_string(&template).context(PodTemplateSerdeSnafu)?, + ); + + product_logging::extend_config_map( + &RoleGroupRef { + cluster: ObjectRef::from_obj(spark_application), + role: String::new(), + role_group: String::new(), + }, + vector_aggregator_address, + &config.logging, + SparkContainer::Spark, + SparkContainer::Vector, + &mut cm_builder, + ) + .context(InvalidLoggingConfigSnafu { cm_name })?; + + cm_builder.build().context(PodTemplateConfigMapSnafu) +} + +fn submit_job_config_map( + spark_application: &SparkApplication, + vector_aggregator_address: Option<&str>, +) -> Result { + let cm_name = spark_application.submit_job_config_map_name(); + + let config = spark_application + .job_config() + .context(FailedToResolveConfigSnafu)?; + + let mut cm_builder = ConfigMapBuilder::new(); + + cm_builder.metadata( + ObjectMetaBuilder::new() + .name_and_namespace(spark_application) + .name(&cm_name) + .ownerreference_from_resource(spark_application, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(spark_application.build_recommended_labels("spark-submit")) + .build(), + ); + + product_logging::extend_config_map( + &RoleGroupRef { + cluster: ObjectRef::from_obj(spark_application), + role: String::new(), + role_group: String::new(), + }, + vector_aggregator_address, + &config.logging, + SubmitJobContainer::SparkSubmit, + SubmitJobContainer::Vector, + &mut cm_builder, + ) + .context(InvalidLoggingConfigSnafu { cm_name })?; + + cm_builder.build().context(PodTemplateConfigMapSnafu) } #[allow(clippy::too_many_arguments)] @@ -384,46 +571,99 @@ fn spark_job( spark_application: &SparkApplication, spark_image: &str, serviceaccount: &ServiceAccount, - job_container: &Option, env: &[EnvVar], job_commands: &[String], s3conn: &Option, s3logdir: &Option, ) -> Result { - let mut cb = - ContainerBuilder::new("spark-submit").with_context(|_| IllegalContainerNameSnafu { - container_name: APP_NAME.to_string(), - })?; + let mut cb = ContainerBuilder::new(&SubmitJobContainer::SparkSubmit.to_string()) + .context(IllegalContainerNameSnafu)?; let job_config = spark_application .job_config() .context(FailedToResolveConfigSnafu)?; + let log_config_map = if let Some(ContainerLogConfig { + choice: + Some(ContainerLogConfigChoice::Custom(CustomContainerLogConfig { + custom: ConfigMapLogConfig { config_map }, + })), + }) = job_config + .logging + .containers + .get(&SubmitJobContainer::SparkSubmit) + { + config_map.into() + } else { + spark_application.submit_job_config_map_name() + }; + + let mut args = vec![job_commands.join(" ")]; + if job_config.logging.enable_vector_agent { + // Wait for Vector to gather the logs. + args.push("sleep 10".into()); + args.push(shutdown_vector_command(VOLUME_MOUNT_PATH_LOG)); + } + cb.image(spark_image) - .command(vec!["/bin/sh".to_string()]) + .command(vec!["/bin/bash".to_string(), "-c".to_string()]) + .args(vec![args.join(" && ")]) .resources(job_config.resources.into()) - .args(vec!["-c".to_string(), job_commands.join(" ")]) .add_volume_mounts(spark_application.spark_job_volume_mounts(s3conn, s3logdir)) .add_env_vars(env.to_vec()) + .add_env_var( + "SPARK_SUBMIT_OPTS", + format!( + "-cp /stackable/spark/extra-jars/*:/stackable/spark/jars/* \ + -Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}" + ), + ) // TODO: move this to the image - .add_env_vars(vec![EnvVar { - name: "SPARK_CONF_DIR".to_string(), - value: Some("/stackable/spark/conf".to_string()), - value_from: None, - }]); + .add_env_var("SPARK_CONF_DIR", "/stackable/spark/conf"); if let Some(image_pull_policy) = spark_application.spark_image_pull_policy() { cb.image_pull_policy(image_pull_policy.to_string()); } - let mut volumes = vec![Volume { - name: String::from(VOLUME_MOUNT_NAME_POD_TEMPLATES), - config_map: Some(ConfigMapVolumeSource { - name: Some(spark_application.pod_template_config_map_name()), - ..ConfigMapVolumeSource::default() - }), - ..Volume::default() - }]; - volumes.extend(spark_application.volumes(s3conn, s3logdir)); + let mut volumes = vec![ + VolumeBuilder::new(VOLUME_MOUNT_NAME_CONFIG) + .with_config_map(spark_application.submit_job_config_map_name()) + .build(), + VolumeBuilder::new(VOLUME_MOUNT_NAME_DRIVER_POD_TEMPLATES) + .with_config_map( + spark_application.pod_template_config_map_name(SparkApplicationRole::Driver), + ) + .build(), + VolumeBuilder::new(VOLUME_MOUNT_NAME_EXECUTOR_POD_TEMPLATES) + .with_config_map( + spark_application.pod_template_config_map_name(SparkApplicationRole::Executor), + ) + .build(), + ]; + volumes.extend(spark_application.volumes(s3conn, s3logdir, &log_config_map)); + + let mut containers = vec![cb.build()]; + + if job_config.logging.enable_vector_agent { + containers.push(vector_container( + &ResolvedProductImage { + product_version: "".into(), + app_version_label: "".into(), + image: spark_application + .spec + .spark_image + .clone() + .context(ObjectHasNoSparkImageSnafu)?, + image_pull_policy: "".into(), + pull_secrets: None, + }, + VOLUME_MOUNT_NAME_CONFIG, + VOLUME_MOUNT_NAME_LOG, + job_config + .logging + .containers + .get(&SubmitJobContainer::Vector), + )); + } let pod = PodTemplateSpec { metadata: Some( @@ -435,8 +675,7 @@ fn spark_job( .build(), ), spec: Some(PodSpec { - containers: vec![cb.build()], - init_containers: job_container.as_ref().map(|c| vec![c.clone()]), + containers, restart_policy: Some("Never".to_string()), service_account_name: serviceaccount.metadata.name.clone(), volumes: Some(volumes), diff --git a/tests/templates/kuttl/logging/00-assert.yaml b/tests/templates/kuttl/logging/00-assert.yaml new file mode 100644 index 00000000..5baf8caa --- /dev/null +++ b/tests/templates/kuttl/logging/00-assert.yaml @@ -0,0 +1,9 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 900 +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: integration-tests-sa diff --git a/tests/templates/kuttl/logging/00-serviceaccount.yaml.j2 b/tests/templates/kuttl/logging/00-serviceaccount.yaml.j2 new file mode 100644 index 00000000..9cbf0351 --- /dev/null +++ b/tests/templates/kuttl/logging/00-serviceaccount.yaml.j2 @@ -0,0 +1,29 @@ +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: use-integration-tests-scc +rules: +{% if test_scenario['values']['openshift'] == "true" %} + - apiGroups: ["security.openshift.io"] + resources: ["securitycontextconstraints"] + resourceNames: ["privileged"] + verbs: ["use"] +{% endif %} +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: integration-tests-sa +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: use-integration-tests-scc +subjects: + - kind: ServiceAccount + name: integration-tests-sa +roleRef: + kind: Role + name: use-integration-tests-scc + apiGroup: rbac.authorization.k8s.io diff --git a/tests/templates/kuttl/logging/01-assert.yaml b/tests/templates/kuttl/logging/01-assert.yaml new file mode 100644 index 00000000..3047747f --- /dev/null +++ b/tests/templates/kuttl/logging/01-assert.yaml @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 600 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: spark-vector-aggregator +status: + readyReplicas: 1 + replicas: 1 diff --git a/tests/templates/kuttl/logging/01-install-spark-vector-aggregator.yaml b/tests/templates/kuttl/logging/01-install-spark-vector-aggregator.yaml new file mode 100644 index 00000000..4a22bc6a --- /dev/null +++ b/tests/templates/kuttl/logging/01-install-spark-vector-aggregator.yaml @@ -0,0 +1,17 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: >- + helm install spark-vector-aggregator vector + --namespace $NAMESPACE + --version 0.20.1 + --repo https://helm.vector.dev + --values spark-vector-aggregator-values.yaml +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: spark-vector-aggregator-discovery +data: + ADDRESS: spark-vector-aggregator:6123 diff --git a/tests/templates/kuttl/logging/02-assert.yaml b/tests/templates/kuttl/logging/02-assert.yaml new file mode 100644 index 00000000..96094db0 --- /dev/null +++ b/tests/templates/kuttl/logging/02-assert.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 900 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: eventlog-minio +status: + readyReplicas: 1 diff --git a/tests/templates/kuttl/logging/02-setup-s3.yaml b/tests/templates/kuttl/logging/02-setup-s3.yaml new file mode 100644 index 00000000..277c6447 --- /dev/null +++ b/tests/templates/kuttl/logging/02-setup-s3.yaml @@ -0,0 +1,88 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: >- + helm install eventlog-minio + --namespace $NAMESPACE + --version 12.2.2 + -f helm-bitnami-eventlog-minio-values.yaml + --repo https://charts.bitnami.com/bitnami minio +--- +apiVersion: v1 +kind: Secret +metadata: + name: history-credentials + labels: + secrets.stackable.tech/class: logging-history-credentials-class +stringData: + accessKey: eventLogAccessKey + secretKey: eventLogSecretKey + # The following two entries are used by the Bitnami chart for MinIO to + # set up credentials for accessing buckets managed by the MinIO tenant. + root-user: eventLogAccessKey + root-password: eventLogSecretKey +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: logging-history-credentials-class +spec: + backend: + k8sSearch: + searchNamespace: + pod: {} +--- +apiVersion: s3.stackable.tech/v1alpha1 +kind: S3Connection +metadata: + name: spark-history-s3-connection +spec: + host: eventlog-minio + port: 9000 + accessStyle: Path + credentials: + secretClass: logging-history-credentials-class +--- +apiVersion: s3.stackable.tech/v1alpha1 +kind: S3Bucket +metadata: + name: spark-history-s3-bucket +spec: + bucketName: spark-logs + connection: + reference: spark-history-s3-connection +--- +apiVersion: v1 +kind: Pod +metadata: + name: eventlog-minio-client + labels: + app: eventlog-minio-client +spec: + restartPolicy: Never + containers: + - name: minio-client + image: docker.io/bitnami/minio-client:2023.3.20-debian-11-r1 + command: ["bash", "-c", "sleep infinity"] + stdin: true + tty: true + env: + - name: MINIO_SERVER_ACCESS_KEY + valueFrom: + secretKeyRef: + name: history-credentials + key: root-user + optional: false + - name: MINIO_SERVER_SECRET_KEY + valueFrom: + secretKeyRef: + name: history-credentials + key: root-password + optional: false + - name: MINIO_SERVER_HOST + value: eventlog-minio + - name: MINIO_SERVER_PORT_NUMBER + value: "9000" + - name: MINIO_SERVER_SCHEME + value: http diff --git a/tests/templates/kuttl/logging/03-prepare-bucket.yaml.j2 b/tests/templates/kuttl/logging/03-prepare-bucket.yaml.j2 new file mode 100644 index 00000000..b678ce07 --- /dev/null +++ b/tests/templates/kuttl/logging/03-prepare-bucket.yaml.j2 @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + # Give MinIO enough time to start + - command: sleep 10 + - command: |- + kubectl exec -n $NAMESPACE eventlog-minio-client -- sh -c + 'mc alias set eventlog-minio http://eventlog-minio:9000 $$MINIO_SERVER_ACCESS_KEY $$MINIO_SERVER_SECRET_KEY' + - command: |- + kubectl exec -n $NAMESPACE eventlog-minio-client -- + mc mb eventlog-minio/spark-logs/eventlogs diff --git a/tests/templates/kuttl/logging/04-assert.yaml b/tests/templates/kuttl/logging/04-assert.yaml new file mode 100644 index 00000000..4112d549 --- /dev/null +++ b/tests/templates/kuttl/logging/04-assert.yaml @@ -0,0 +1,18 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 900 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: spark-history-node-automatic-log-config +status: + readyReplicas: 1 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: spark-history-node-custom-log-config +status: + readyReplicas: 1 diff --git a/tests/templates/kuttl/logging/04-deploy-history-server.yaml.j2 b/tests/templates/kuttl/logging/04-deploy-history-server.yaml.j2 new file mode 100644 index 00000000..7cc64b78 --- /dev/null +++ b/tests/templates/kuttl/logging/04-deploy-history-server.yaml.j2 @@ -0,0 +1,65 @@ +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: spark-history-log-config +data: + log4j2.properties: |- + appenders = FILE + + appender.FILE.type = File + appender.FILE.name = FILE + appender.FILE.fileName = /stackable/log/spark-history/spark.log4j2.xml + appender.FILE.layout.type = XMLLayout + + rootLogger.level = INFO + rootLogger.appenderRefs = FILE + rootLogger.appenderRef.FILE.ref = FILE +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkHistoryServer +metadata: + name: spark-history +spec: + image: + productVersion: "{{ test_scenario['values']['spark'].split('-stackable')[0] }}" + stackableVersion: "{{ test_scenario['values']['spark'].split('-stackable')[1] }}" + vectorAggregatorConfigMapName: spark-vector-aggregator-discovery + logFileDirectory: + s3: + prefix: eventlogs/ + bucket: + reference: spark-history-s3-bucket + nodes: + roleGroups: + automatic-log-config: + replicas: 1 + config: + logging: + enableVectorAgent: true + containers: + spark-history: + console: + level: INFO + file: + level: INFO + loggers: + ROOT: + level: INFO + vector: + console: + level: INFO + file: + level: INFO + loggers: + ROOT: + level: INFO + custom-log-config: + replicas: 1 + config: + logging: + enableVectorAgent: true + containers: + spark-history: + custom: + configMap: spark-history-log-config diff --git a/tests/templates/kuttl/logging/05-assert.yaml b/tests/templates/kuttl/logging/05-assert.yaml new file mode 100644 index 00000000..6779075c --- /dev/null +++ b/tests/templates/kuttl/logging/05-assert.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 900 +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkApplication +metadata: + name: spark-automatic-log-config +status: + phase: Succeeded diff --git a/tests/templates/kuttl/logging/05-deploy-automatic-log-config-spark-app.yaml.j2 b/tests/templates/kuttl/logging/05-deploy-automatic-log-config-spark-app.yaml.j2 new file mode 100644 index 00000000..32a4b650 --- /dev/null +++ b/tests/templates/kuttl/logging/05-deploy-automatic-log-config-spark-app.yaml.j2 @@ -0,0 +1,91 @@ +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkApplication +metadata: + name: spark-automatic-log-config +spec: + version: "1.0" + sparkImage: docker.stackable.tech/stackable/spark-k8s:{{ test_scenario['values']['spark'].split('-stackable')[0] }}-stackable{{ test_scenario['values']['spark'].split('-stackable')[1] }} + sparkImagePullPolicy: IfNotPresent + image: docker.stackable.tech/stackable/ny-tlc-report:{{ test_scenario['values']['ny-tlc-report'] }} + vectorAggregatorConfigMapName: spark-vector-aggregator-discovery + mode: cluster + mainClass: org.apache.spark.examples.SparkALS + mainApplicationFile: local:///stackable/spark/examples/jars/spark-examples_2.12-{{ test_scenario['values']['examples'] }}.jar + job: + logging: + enableVectorAgent: true + containers: + spark-submit: + console: + level: INFO + file: + level: INFO + loggers: + ROOT: + level: INFO + vector: + console: + level: INFO + file: + level: INFO + loggers: + ROOT: + level: INFO + driver: + logging: + enableVectorAgent: true + containers: + spark: + console: + level: INFO + file: + level: INFO + loggers: + ROOT: + level: INFO + job: + console: + level: INFO + file: + level: INFO + loggers: + ROOT: + level: INFO + vector: + console: + level: INFO + file: + level: INFO + loggers: + ROOT: + level: INFO + executor: + instances: 1 + logging: + enableVectorAgent: true + containers: + spark: + console: + level: INFO + file: + level: INFO + loggers: + ROOT: + level: INFO + job: + console: + level: INFO + file: + level: INFO + loggers: + ROOT: + level: INFO + vector: + console: + level: INFO + file: + level: INFO + loggers: + ROOT: + level: INFO diff --git a/tests/templates/kuttl/logging/06-assert.yaml b/tests/templates/kuttl/logging/06-assert.yaml new file mode 100644 index 00000000..9a1c845d --- /dev/null +++ b/tests/templates/kuttl/logging/06-assert.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 900 +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkApplication +metadata: + name: spark-custom-log-config +status: + phase: Succeeded diff --git a/tests/templates/kuttl/logging/06-deploy-custom-log-config-spark-app.yaml.j2 b/tests/templates/kuttl/logging/06-deploy-custom-log-config-spark-app.yaml.j2 new file mode 100644 index 00000000..9e59a166 --- /dev/null +++ b/tests/templates/kuttl/logging/06-deploy-custom-log-config-spark-app.yaml.j2 @@ -0,0 +1,70 @@ +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: spark-log-config +data: + log4j2.properties: |- + appenders = FILE + + appender.FILE.type = File + appender.FILE.name = FILE + appender.FILE.fileName = /stackable/log/spark/spark.log4j2.xml + appender.FILE.layout.type = XMLLayout + + rootLogger.level = INFO + rootLogger.appenderRefs = FILE + rootLogger.appenderRef.FILE.ref = FILE +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: spark-submit-log-config +data: + log4j2.properties: |- + appenders = FILE + + appender.FILE.type = File + appender.FILE.name = FILE + appender.FILE.fileName = /stackable/log/spark-submit/spark.log4j2.xml + appender.FILE.layout.type = XMLLayout + + rootLogger.level = INFO + rootLogger.appenderRefs = FILE + rootLogger.appenderRef.FILE.ref = FILE +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkApplication +metadata: + name: spark-custom-log-config +spec: + version: "1.0" + sparkImage: docker.stackable.tech/stackable/spark-k8s:{{ test_scenario['values']['spark'].split('-stackable')[0] }}-stackable{{ test_scenario['values']['spark'].split('-stackable')[1] }} + sparkImagePullPolicy: IfNotPresent + image: docker.stackable.tech/stackable/ny-tlc-report:{{ test_scenario['values']['ny-tlc-report'] }} + vectorAggregatorConfigMapName: spark-vector-aggregator-discovery + mode: cluster + mainClass: org.apache.spark.examples.SparkALS + mainApplicationFile: local:///stackable/spark/examples/jars/spark-examples_2.12-{{ test_scenario['values']['examples'] }}.jar + job: + logging: + enableVectorAgent: true + containers: + spark-submit: + custom: + configMap: spark-submit-log-config + driver: + logging: + enableVectorAgent: true + containers: + spark: + custom: + configMap: spark-log-config + executor: + instances: 1 + logging: + enableVectorAgent: true + containers: + spark: + custom: + configMap: spark-log-config diff --git a/tests/templates/kuttl/logging/07-assert.yaml b/tests/templates/kuttl/logging/07-assert.yaml new file mode 100644 index 00000000..f01f40a0 --- /dev/null +++ b/tests/templates/kuttl/logging/07-assert.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 900 +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkApplication +metadata: + name: pyspark-automatic-log-config +status: + phase: Succeeded diff --git a/tests/templates/kuttl/logging/07-deploy-automatic-log-config-pyspark-app.yaml.j2 b/tests/templates/kuttl/logging/07-deploy-automatic-log-config-pyspark-app.yaml.j2 new file mode 100644 index 00000000..7dce0c13 --- /dev/null +++ b/tests/templates/kuttl/logging/07-deploy-automatic-log-config-pyspark-app.yaml.j2 @@ -0,0 +1,92 @@ +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkApplication +metadata: + name: pyspark-automatic-log-config +spec: + version: "1.0" + sparkImage: docker.stackable.tech/stackable/pyspark-k8s:{{ test_scenario['values']['spark'].split('-stackable')[0] }}-stackable{{ test_scenario['values']['spark'].split('-stackable')[1] }} + sparkImagePullPolicy: IfNotPresent + vectorAggregatorConfigMapName: spark-vector-aggregator-discovery + mode: cluster + mainApplicationFile: local:///stackable/spark/examples/src/main/python/als.py + deps: + requirements: + - numpy==1.24.2 + job: + logging: + enableVectorAgent: true + containers: + spark-submit: + console: + level: INFO + file: + level: INFO + loggers: + ROOT: + level: INFO + vector: + console: + level: INFO + file: + level: INFO + loggers: + ROOT: + level: INFO + driver: + logging: + enableVectorAgent: true + containers: + spark: + console: + level: INFO + file: + level: INFO + loggers: + ROOT: + level: INFO + requirements: + console: + level: INFO + file: + level: INFO + loggers: + ROOT: + level: INFO + vector: + console: + level: INFO + file: + level: INFO + loggers: + ROOT: + level: INFO + executor: + instances: 1 + logging: + enableVectorAgent: true + containers: + spark: + console: + level: INFO + file: + level: INFO + loggers: + ROOT: + level: INFO + requirements: + console: + level: INFO + file: + level: INFO + loggers: + ROOT: + level: INFO + vector: + console: + level: INFO + file: + level: INFO + loggers: + ROOT: + level: INFO diff --git a/tests/templates/kuttl/logging/08-assert.yaml b/tests/templates/kuttl/logging/08-assert.yaml new file mode 100644 index 00000000..e84c68ea --- /dev/null +++ b/tests/templates/kuttl/logging/08-assert.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 900 +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkApplication +metadata: + name: pyspark-custom-log-config +status: + phase: Succeeded diff --git a/tests/templates/kuttl/logging/08-deploy-custom-log-config-pyspark-app.yaml.j2 b/tests/templates/kuttl/logging/08-deploy-custom-log-config-pyspark-app.yaml.j2 new file mode 100644 index 00000000..2addf6b9 --- /dev/null +++ b/tests/templates/kuttl/logging/08-deploy-custom-log-config-pyspark-app.yaml.j2 @@ -0,0 +1,71 @@ +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: pyspark-log-config +data: + log4j2.properties: |- + appenders = FILE + + appender.FILE.type = File + appender.FILE.name = FILE + appender.FILE.fileName = /stackable/log/spark/spark.log4j2.xml + appender.FILE.layout.type = XMLLayout + + rootLogger.level = INFO + rootLogger.appenderRefs = FILE + rootLogger.appenderRef.FILE.ref = FILE +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: pyspark-submit-log-config +data: + log4j2.properties: |- + appenders = FILE + + appender.FILE.type = File + appender.FILE.name = FILE + appender.FILE.fileName = /stackable/log/spark-submit/spark.log4j2.xml + appender.FILE.layout.type = XMLLayout + + rootLogger.level = INFO + rootLogger.appenderRefs = FILE + rootLogger.appenderRef.FILE.ref = FILE +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkApplication +metadata: + name: pyspark-custom-log-config +spec: + version: "1.0" + sparkImage: docker.stackable.tech/stackable/pyspark-k8s:{{ test_scenario['values']['spark'].split('-stackable')[0] }}-stackable{{ test_scenario['values']['spark'].split('-stackable')[1] }} + sparkImagePullPolicy: IfNotPresent + vectorAggregatorConfigMapName: spark-vector-aggregator-discovery + mode: cluster + mainApplicationFile: local:///stackable/spark/examples/src/main/python/als.py + deps: + requirements: + - numpy==1.24.2 + job: + logging: + enableVectorAgent: true + containers: + spark-submit: + custom: + configMap: pyspark-submit-log-config + driver: + logging: + enableVectorAgent: true + containers: + spark: + custom: + configMap: pyspark-log-config + executor: + instances: 1 + logging: + enableVectorAgent: true + containers: + spark: + custom: + configMap: pyspark-log-config diff --git a/tests/templates/kuttl/logging/09-assert.yaml b/tests/templates/kuttl/logging/09-assert.yaml new file mode 100644 index 00000000..64d967e4 --- /dev/null +++ b/tests/templates/kuttl/logging/09-assert.yaml @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 300 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-runner +status: + readyReplicas: 1 + replicas: 1 diff --git a/tests/templates/kuttl/logging/09-install-test-runner.yaml b/tests/templates/kuttl/logging/09-install-test-runner.yaml new file mode 100644 index 00000000..cdd449c7 --- /dev/null +++ b/tests/templates/kuttl/logging/09-install-test-runner.yaml @@ -0,0 +1,22 @@ +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-runner + labels: + app: test-runner +spec: + replicas: 1 + selector: + matchLabels: + app: test-runner + template: + metadata: + labels: + app: test-runner + spec: + containers: + - name: test-runner + image: docker.stackable.tech/stackable/testing-tools:0.1.0-stackable0.1.0 + stdin: true + tty: true diff --git a/tests/templates/kuttl/logging/10-assert.yaml b/tests/templates/kuttl/logging/10-assert.yaml new file mode 100644 index 00000000..69eaf9e6 --- /dev/null +++ b/tests/templates/kuttl/logging/10-assert.yaml @@ -0,0 +1,8 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 600 +commands: + - script: >- + kubectl exec --namespace=$NAMESPACE test-runner-0 -- + python /tmp/test_log_aggregation.py -n $NAMESPACE diff --git a/tests/templates/kuttl/logging/10-test-log-aggregation.yaml b/tests/templates/kuttl/logging/10-test-log-aggregation.yaml new file mode 100644 index 00000000..353773d7 --- /dev/null +++ b/tests/templates/kuttl/logging/10-test-log-aggregation.yaml @@ -0,0 +1,6 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: | + kubectl cp ./test_log_aggregation.py $NAMESPACE/test-runner-0:/tmp diff --git a/tests/templates/kuttl/logging/helm-bitnami-eventlog-minio-values.yaml b/tests/templates/kuttl/logging/helm-bitnami-eventlog-minio-values.yaml new file mode 100644 index 00000000..bcb802a2 --- /dev/null +++ b/tests/templates/kuttl/logging/helm-bitnami-eventlog-minio-values.yaml @@ -0,0 +1,23 @@ +--- +volumePermissions: + enabled: false + +podSecurityContext: + enabled: false + +containerSecurityContext: + enabled: false + +mode: standalone + +disableWebUI: true + +persistence: + enabled: false + +resources: + requests: + memory: 1Gi + +auth: + existingSecret: history-credentials diff --git a/tests/templates/kuttl/logging/spark-vector-aggregator-values.yaml.j2 b/tests/templates/kuttl/logging/spark-vector-aggregator-values.yaml.j2 new file mode 100644 index 00000000..7aac1d70 --- /dev/null +++ b/tests/templates/kuttl/logging/spark-vector-aggregator-values.yaml.j2 @@ -0,0 +1,288 @@ +--- +role: Aggregator +service: + ports: + - name: api + port: 8686 + protocol: TCP + targetPort: 8686 + - name: vector + port: 6123 + protocol: TCP + targetPort: 6000 +customConfig: + api: + address: 0.0.0.0:8686 + enabled: true + sources: + vector: + address: 0.0.0.0:6000 + type: vector + version: "2" + transforms: + + # SparkHistoryServer spark-history + + filteredSparkHistoryAutomaticLogConfigSparkHistory: + type: filter + inputs: [vector] + condition: >- + .pod == "spark-history-node-automatic-log-config-0" && + .container == "spark-history" + filteredSparkHistoryAutomaticLogConfigVector: + type: filter + inputs: [vector] + condition: >- + .pod == "spark-history-node-automatic-log-config-0" && + .container == "vector" + filteredSparkHistoryCustomLogConfigSparkHistory: + type: filter + inputs: [vector] + condition: >- + .pod == "spark-history-node-custom-log-config-0" && + .container == "spark-history" + filteredSparkHistoryCustomLogConfigVector: + type: filter + inputs: [vector] + condition: >- + .pod == "spark-history-node-custom-log-config-0" && + .container == "vector" + + # SparkApplication spark-automatic-log-config + + filteredSparkAutomaticLogConfigSubmitSpark: + type: filter + inputs: [vector] + condition: >- + match(string!(.pod), r'^spark-automatic-log-config-[^-]+$') && + .container == "spark-submit" + filteredSparkAutomaticLogConfigSubmitVector: + type: filter + inputs: [vector] + condition: >- + match(string!(.pod), r'^spark-automatic-log-config-[^-]+$') && + .container == "vector" + filteredSparkAutomaticLogConfigDriverSpark: + type: filter + inputs: [vector] + condition: >- + .cluster == "spark-automatic-log-config" && + ends_with(string!(.pod), "-driver") && + .container == "spark" + filteredSparkAutomaticLogConfigDriverJob: + type: filter + inputs: [vector] + condition: >- + .cluster == "spark-automatic-log-config" && + ends_with(string!(.pod), "-driver") && + .container == "job" + filteredSparkAutomaticLogConfigDriverVector: + type: filter + inputs: [vector] + condition: >- + .cluster == "spark-automatic-log-config" && + ends_with(string!(.pod), "-driver") && + .container == "vector" + filteredSparkAutomaticLogConfigExecutorSpark: + type: filter + inputs: [vector] + condition: >- + .cluster == "spark-automatic-log-config" && + ends_with(string!(.pod), "-exec-1") && + .container == "spark" + filteredSparkAutomaticLogConfigExecutorJob: + type: filter + inputs: [vector] + condition: >- + .cluster == "spark-automatic-log-config" && + ends_with(string!(.pod), "-exec-1") && + .container == "job" + filteredSparkAutomaticLogConfigExecutorVector: + type: filter + inputs: [vector] + condition: >- + .cluster == "spark-automatic-log-config" && + ends_with(string!(.pod), "-exec-1") && + .container == "vector" + + # SparkApplication spark-custom-log-config + + filteredSparkCustomLogConfigSubmitSpark: + type: filter + inputs: [vector] + condition: >- + match(string!(.pod), r'^spark-custom-log-config-[^-]+$') && + .container == "spark-submit" + filteredSparkCustomLogConfigSubmitVector: + type: filter + inputs: [vector] + condition: >- + match(string!(.pod), r'^spark-custom-log-config-[^-]+$') && + .container == "vector" + filteredSparkCustomLogConfigDriverSpark: + type: filter + inputs: [vector] + condition: >- + .cluster == "spark-custom-log-config" && + ends_with(string!(.pod), "-driver") && + .container == "spark" + filteredSparkCustomLogConfigDriverJob: + type: filter + inputs: [vector] + condition: >- + .cluster == "spark-custom-log-config" && + ends_with(string!(.pod), "-driver") && + .container == "job" + filteredSparkCustomLogConfigDriverVector: + type: filter + inputs: [vector] + condition: >- + .cluster == "spark-custom-log-config" && + ends_with(string!(.pod), "-driver") && + .container == "vector" + filteredSparkCustomLogConfigExecutorSpark: + type: filter + inputs: [vector] + condition: >- + .cluster == "spark-custom-log-config" && + ends_with(string!(.pod), "-exec-1") && + .container == "spark" + filteredSparkCustomLogConfigExecutorJob: + type: filter + inputs: [vector] + condition: >- + .cluster == "spark-custom-log-config" && + ends_with(string!(.pod), "-exec-1") && + .container == "job" + filteredSparkCustomLogConfigExecutorVector: + type: filter + inputs: [vector] + condition: >- + .cluster == "spark-custom-log-config" && + ends_with(string!(.pod), "-exec-1") && + .container == "vector" + + # SparkApplication pyspark-automatic-log-config + + filteredPysparkAutomaticLogConfigSubmitSpark: + type: filter + inputs: [vector] + condition: >- + match(string!(.pod), r'^pyspark-automatic-log-config-[^-]+$') && + .container == "spark-submit" + filteredPysparkAutomaticLogConfigSubmitVector: + type: filter + inputs: [vector] + condition: >- + match(string!(.pod), r'^pyspark-automatic-log-config-[^-]+$') && + .container == "vector" + filteredPysparkAutomaticLogConfigDriverSpark: + type: filter + inputs: [vector] + condition: >- + .cluster == "pyspark-automatic-log-config" && + ends_with(string!(.pod), "-driver") && + .container == "spark" + filteredPysparkAutomaticLogConfigDriverRequirements: + type: filter + inputs: [vector] + condition: >- + .cluster == "pyspark-automatic-log-config" && + ends_with(string!(.pod), "-driver") && + .container == "requirements" + filteredPysparkAutomaticLogConfigDriverVector: + type: filter + inputs: [vector] + condition: >- + .cluster == "pyspark-automatic-log-config" && + ends_with(string!(.pod), "-driver") && + .container == "vector" + filteredPysparkAutomaticLogConfigExecutorSpark: + type: filter + inputs: [vector] + condition: >- + .cluster == "pyspark-automatic-log-config" && + ends_with(string!(.pod), "-exec-1") && + .container == "spark" + filteredPysparkAutomaticLogConfigExecutorRequirements: + type: filter + inputs: [vector] + condition: >- + .cluster == "pyspark-automatic-log-config" && + ends_with(string!(.pod), "-exec-1") && + .container == "requirements" + filteredPysparkAutomaticLogConfigExecutorVector: + type: filter + inputs: [vector] + condition: >- + .cluster == "pyspark-automatic-log-config" && + ends_with(string!(.pod), "-exec-1") && + .container == "vector" + + # SparkApplication pyspark-custom-log-config + + filteredPysparkCustomLogConfigSubmitSpark: + type: filter + inputs: [vector] + condition: >- + match(string!(.pod), r'^pyspark-custom-log-config-[^-]+$') && + .container == "spark-submit" + filteredPysparkCustomLogConfigSubmitVector: + type: filter + inputs: [vector] + condition: >- + match(string!(.pod), r'^pyspark-custom-log-config-[^-]+$') && + .container == "vector" + filteredPysparkCustomLogConfigDriverSpark: + type: filter + inputs: [vector] + condition: >- + .cluster == "pyspark-custom-log-config" && + ends_with(string!(.pod), "-driver") && + .container == "spark" + filteredPysparkCustomLogConfigDriverRequirements: + type: filter + inputs: [vector] + condition: >- + .cluster == "pyspark-custom-log-config" && + ends_with(string!(.pod), "-driver") && + .container == "requirements" + filteredPysparkCustomLogConfigDriverVector: + type: filter + inputs: [vector] + condition: >- + .cluster == "pyspark-custom-log-config" && + ends_with(string!(.pod), "-driver") && + .container == "vector" + filteredPysparkCustomLogConfigExecutorSpark: + type: filter + inputs: [vector] + condition: >- + .cluster == "pyspark-custom-log-config" && + ends_with(string!(.pod), "-exec-1") && + .container == "spark" + filteredPysparkCustomLogConfigExecutorRequirements: + type: filter + inputs: [vector] + condition: >- + .cluster == "pyspark-custom-log-config" && + ends_with(string!(.pod), "-exec-1") && + .container == "requirements" + filteredPysparkCustomLogConfigExecutorVector: + type: filter + inputs: [vector] + condition: >- + .cluster == "pyspark-custom-log-config" && + ends_with(string!(.pod), "-exec-1") && + .container == "vector" + sinks: + out: + inputs: [filtered*] +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + type: vector + address: {{ lookup('env', 'VECTOR_AGGREGATOR') }} + buffer: + when_full: drop_newest +{% else %} + type: blackhole +{% endif %} diff --git a/tests/templates/kuttl/logging/test_log_aggregation.py b/tests/templates/kuttl/logging/test_log_aggregation.py new file mode 100755 index 00000000..34310ede --- /dev/null +++ b/tests/templates/kuttl/logging/test_log_aggregation.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 +import requests + + +def check_processed_events(): + response = requests.post( + 'http://spark-vector-aggregator:8686/graphql', + json={ + 'query': """ + { + transforms(first:100) { + nodes { + componentId + metrics { + processedEventsTotal { + processedEventsTotal + } + } + } + } + } + """ + } + ) + + assert response.status_code == 200, \ + 'Cannot access the API of the vector aggregator.' + + result = response.json() + + transforms = result['data']['transforms']['nodes'] + for transform in transforms: + processedEvents = transform['metrics']['processedEventsTotal']['processedEventsTotal'] + componentId = transform['componentId'] + assert processedEvents > 0, \ + f'No events were processed in "{componentId}".' + + +if __name__ == '__main__': + check_processed_events() + print('Test successful!') diff --git a/tests/templates/kuttl/pyspark-ny-public-s3-image/01-assert.yaml.j2 b/tests/templates/kuttl/pyspark-ny-public-s3-image/01-assert.yaml.j2 new file mode 100644 index 00000000..50b1d4c3 --- /dev/null +++ b/tests/templates/kuttl/pyspark-ny-public-s3-image/01-assert.yaml.j2 @@ -0,0 +1,10 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +{% endif %} diff --git a/tests/templates/kuttl/pyspark-ny-public-s3-image/01-install-vector-aggregator-discovery-configmap.yaml.j2 b/tests/templates/kuttl/pyspark-ny-public-s3-image/01-install-vector-aggregator-discovery-configmap.yaml.j2 new file mode 100644 index 00000000..2d6a0df5 --- /dev/null +++ b/tests/templates/kuttl/pyspark-ny-public-s3-image/01-install-vector-aggregator-discovery-configmap.yaml.j2 @@ -0,0 +1,9 @@ +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +data: + ADDRESS: {{ lookup('env', 'VECTOR_AGGREGATOR') }} +{% endif %} diff --git a/tests/templates/kuttl/pyspark-ny-public-s3-image/10-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/pyspark-ny-public-s3-image/10-deploy-spark-app.yaml.j2 index 9d58e862..003c62c0 100644 --- a/tests/templates/kuttl/pyspark-ny-public-s3-image/10-deploy-spark-app.yaml.j2 +++ b/tests/templates/kuttl/pyspark-ny-public-s3-image/10-deploy-spark-app.yaml.j2 @@ -7,6 +7,9 @@ spec: version: "1.0" # everything under /jobs will be copied to /stackable/spark/jobs image: "docker.stackable.tech/stackable/ny-tlc-report:{{ test_scenario['values']['ny-tlc-report'] }}" +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} sparkImage: "docker.stackable.tech/stackable/pyspark-k8s:{{ test_scenario['values']['spark'].split('-stackable')[0] }}-stackable{{ test_scenario['values']['spark'].split('-stackable')[1] }}" sparkImagePullPolicy: IfNotPresent mode: cluster @@ -21,5 +24,13 @@ spec: host: test-minio port: 9000 accessStyle: Path + job: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + driver: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} executor: instances: 3 + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} diff --git a/tests/templates/kuttl/pyspark-ny-public-s3/01-assert.yaml.j2 b/tests/templates/kuttl/pyspark-ny-public-s3/01-assert.yaml.j2 new file mode 100644 index 00000000..50b1d4c3 --- /dev/null +++ b/tests/templates/kuttl/pyspark-ny-public-s3/01-assert.yaml.j2 @@ -0,0 +1,10 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +{% endif %} diff --git a/tests/templates/kuttl/pyspark-ny-public-s3/01-install-vector-aggregator-discovery-configmap.yaml.j2 b/tests/templates/kuttl/pyspark-ny-public-s3/01-install-vector-aggregator-discovery-configmap.yaml.j2 new file mode 100644 index 00000000..2d6a0df5 --- /dev/null +++ b/tests/templates/kuttl/pyspark-ny-public-s3/01-install-vector-aggregator-discovery-configmap.yaml.j2 @@ -0,0 +1,9 @@ +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +data: + ADDRESS: {{ lookup('env', 'VECTOR_AGGREGATOR') }} +{% endif %} diff --git a/tests/templates/kuttl/pyspark-ny-public-s3/10-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/pyspark-ny-public-s3/10-deploy-spark-app.yaml.j2 index 1e18491a..e10c2c7a 100644 --- a/tests/templates/kuttl/pyspark-ny-public-s3/10-deploy-spark-app.yaml.j2 +++ b/tests/templates/kuttl/pyspark-ny-public-s3/10-deploy-spark-app.yaml.j2 @@ -5,6 +5,9 @@ metadata: name: pyspark-ny-public-s3 spec: version: "1.0" +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} # everything under /jobs will be copied to /stackable/spark/jobs sparkImage: "docker.stackable.tech/stackable/pyspark-k8s:{{ test_scenario['values']['spark'].split('-stackable')[0] }}-stackable{{ test_scenario['values']['spark'].split('-stackable')[1] }}" sparkImagePullPolicy: IfNotPresent @@ -20,5 +23,13 @@ spec: host: test-minio port: 9000 accessStyle: Path + job: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + driver: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} executor: instances: 3 + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} diff --git a/tests/templates/kuttl/resources/01-assert.yaml.j2 b/tests/templates/kuttl/resources/01-assert.yaml.j2 new file mode 100644 index 00000000..50b1d4c3 --- /dev/null +++ b/tests/templates/kuttl/resources/01-assert.yaml.j2 @@ -0,0 +1,10 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +{% endif %} diff --git a/tests/templates/kuttl/resources/01-install-vector-aggregator-discovery-configmap.yaml.j2 b/tests/templates/kuttl/resources/01-install-vector-aggregator-discovery-configmap.yaml.j2 new file mode 100644 index 00000000..2d6a0df5 --- /dev/null +++ b/tests/templates/kuttl/resources/01-install-vector-aggregator-discovery-configmap.yaml.j2 @@ -0,0 +1,9 @@ +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +data: + ADDRESS: {{ lookup('env', 'VECTOR_AGGREGATOR') }} +{% endif %} diff --git a/tests/templates/kuttl/resources/10-assert.yaml b/tests/templates/kuttl/resources/10-assert.yaml.j2 similarity index 77% rename from tests/templates/kuttl/resources/10-assert.yaml rename to tests/templates/kuttl/resources/10-assert.yaml.j2 index 52829e83..c6dc93b2 100644 --- a/tests/templates/kuttl/resources/10-assert.yaml +++ b/tests/templates/kuttl/resources/10-assert.yaml.j2 @@ -19,6 +19,9 @@ spec: requests: cpu: 100m memory: 1Gi +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + - name: vector +{% endif %} --- apiVersion: v1 kind: Pod @@ -26,7 +29,10 @@ metadata: name: resources-crd-driver spec: containers: - - name: spark-driver +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + - name: vector +{% endif %} + - name: spark resources: limits: cpu: "2" @@ -41,7 +47,10 @@ metadata: name: resources-crd-exec-1 spec: containers: - - name: spark-executor +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + - name: vector +{% endif %} + - name: spark resources: limits: cpu: "3" diff --git a/tests/templates/kuttl/resources/10-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/resources/10-deploy-spark-app.yaml.j2 index f966a10d..0fd09d16 100644 --- a/tests/templates/kuttl/resources/10-deploy-spark-app.yaml.j2 +++ b/tests/templates/kuttl/resources/10-deploy-spark-app.yaml.j2 @@ -5,6 +5,9 @@ metadata: name: resources-crd spec: version: "1.0" +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} sparkImage: "docker.stackable.tech/stackable/pyspark-k8s:{{ test_scenario['values']['spark'].split('-stackable')[0] }}-stackable{{ test_scenario['values']['spark'].split('-stackable')[1] }}" mode: cluster mainApplicationFile: local:///stackable/spark/examples/src/main/python/streaming/hdfs_wordcount.py @@ -15,6 +18,8 @@ spec: spark.kubernetes.driver.pod.name: "resources-crd-driver" spark.kubernetes.executor.podNamePrefix: "resources-crd" job: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} resources: cpu: min: "100m" @@ -22,6 +27,8 @@ spec: memory: limit: "1Gi" driver: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} resources: cpu: min: "1" @@ -30,6 +37,8 @@ spec: limit: "1Gi" executor: instances: 1 + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} resources: cpu: min: "1500m" diff --git a/tests/templates/kuttl/resources/12-assert.yaml b/tests/templates/kuttl/resources/12-assert.yaml.j2 similarity index 76% rename from tests/templates/kuttl/resources/12-assert.yaml rename to tests/templates/kuttl/resources/12-assert.yaml.j2 index 05e38e74..5fe8994e 100644 --- a/tests/templates/kuttl/resources/12-assert.yaml +++ b/tests/templates/kuttl/resources/12-assert.yaml.j2 @@ -19,6 +19,9 @@ spec: requests: cpu: 500m memory: 1Gi +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + - name: vector +{% endif %} --- apiVersion: v1 kind: Pod @@ -26,7 +29,10 @@ metadata: name: resources-sparkconf-driver spec: containers: - - name: spark-driver +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + - name: vector +{% endif %} + - name: spark resources: limits: cpu: "3" @@ -41,7 +47,10 @@ metadata: name: resources-sparkconf-exec-1 spec: containers: - - name: spark-executor +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + - name: vector +{% endif %} + - name: spark resources: limits: cpu: "3" diff --git a/tests/templates/kuttl/resources/12-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/resources/12-deploy-spark-app.yaml.j2 index 167b290f..64caa234 100644 --- a/tests/templates/kuttl/resources/12-deploy-spark-app.yaml.j2 +++ b/tests/templates/kuttl/resources/12-deploy-spark-app.yaml.j2 @@ -5,6 +5,9 @@ metadata: name: resources-sparkconf spec: version: "1.0" +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} sparkImage: "docker.stackable.tech/stackable/pyspark-k8s:{{ test_scenario['values']['spark'].split('-stackable')[0] }}-stackable{{ test_scenario['values']['spark'].split('-stackable')[1] }}" mode: cluster mainApplicationFile: local:///stackable/spark/examples/src/main/python/streaming/hdfs_wordcount.py @@ -25,4 +28,12 @@ spec: spark.executor.memory: "2g" spark.executor.memoryOverheadFactor: "0.4" spark.executor.instances: "1" - + job: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + driver: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + executor: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} diff --git a/tests/templates/kuttl/resources/14-assert.yaml b/tests/templates/kuttl/resources/14-assert.yaml.j2 similarity index 75% rename from tests/templates/kuttl/resources/14-assert.yaml rename to tests/templates/kuttl/resources/14-assert.yaml.j2 index e4ff8b32..ba35f29c 100644 --- a/tests/templates/kuttl/resources/14-assert.yaml +++ b/tests/templates/kuttl/resources/14-assert.yaml.j2 @@ -19,6 +19,9 @@ spec: requests: cpu: 500m memory: 1Gi +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + - name: vector +{% endif %} --- apiVersion: v1 kind: Pod @@ -26,7 +29,10 @@ metadata: name: resources-defaults-driver spec: containers: - - name: spark-driver +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + - name: vector +{% endif %} + - name: spark resources: limits: cpu: "2" @@ -41,7 +47,10 @@ metadata: name: resources-defaults-exec-1 spec: containers: - - name: spark-executor +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + - name: vector +{% endif %} + - name: spark resources: limits: cpu: "4" @@ -56,7 +65,10 @@ metadata: name: resources-defaults-exec-2 spec: containers: - - name: spark-executor +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + - name: vector +{% endif %} + - name: spark resources: limits: cpu: "4" diff --git a/tests/templates/kuttl/resources/14-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/resources/14-deploy-spark-app.yaml.j2 index ceac8802..f70eeea4 100644 --- a/tests/templates/kuttl/resources/14-deploy-spark-app.yaml.j2 +++ b/tests/templates/kuttl/resources/14-deploy-spark-app.yaml.j2 @@ -5,6 +5,9 @@ metadata: name: resources-defaults spec: version: "1.0" +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} sparkImage: "docker.stackable.tech/stackable/pyspark-k8s:{{ test_scenario['values']['spark'].split('-stackable')[0] }}-stackable{{ test_scenario['values']['spark'].split('-stackable')[1] }}" mode: cluster mainApplicationFile: local:///stackable/spark/examples/src/main/python/streaming/hdfs_wordcount.py @@ -14,4 +17,12 @@ spec: spark.kubernetes.submission.waitAppCompletion: "false" spark.kubernetes.driver.pod.name: "resources-defaults-driver" spark.kubernetes.executor.podNamePrefix: "resources-defaults" - + job: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + driver: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + executor: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} diff --git a/tests/templates/kuttl/spark-examples/01-assert.yaml.j2 b/tests/templates/kuttl/spark-examples/01-assert.yaml.j2 new file mode 100644 index 00000000..50b1d4c3 --- /dev/null +++ b/tests/templates/kuttl/spark-examples/01-assert.yaml.j2 @@ -0,0 +1,10 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +{% endif %} diff --git a/tests/templates/kuttl/spark-examples/01-install-vector-aggregator-discovery-configmap.yaml.j2 b/tests/templates/kuttl/spark-examples/01-install-vector-aggregator-discovery-configmap.yaml.j2 new file mode 100644 index 00000000..2d6a0df5 --- /dev/null +++ b/tests/templates/kuttl/spark-examples/01-install-vector-aggregator-discovery-configmap.yaml.j2 @@ -0,0 +1,9 @@ +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +data: + ADDRESS: {{ lookup('env', 'VECTOR_AGGREGATOR') }} +{% endif %} diff --git a/tests/templates/kuttl/spark-examples/10-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/spark-examples/10-deploy-spark-app.yaml.j2 index 388421c2..a3d97806 100644 --- a/tests/templates/kuttl/spark-examples/10-deploy-spark-app.yaml.j2 +++ b/tests/templates/kuttl/spark-examples/10-deploy-spark-app.yaml.j2 @@ -5,10 +5,21 @@ metadata: name: spark-examples spec: version: "1.0" +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} sparkImage: "docker.stackable.tech/stackable/spark-k8s:{{ test_scenario['values']['spark'].split('-stackable')[0] }}-stackable{{ test_scenario['values']['spark'].split('-stackable')[1] }}" sparkImagePullPolicy: IfNotPresent mode: cluster mainClass: org.apache.spark.examples.SparkALS mainApplicationFile: "local:///stackable/spark/examples/jars/spark-examples_2.12-{{ test_scenario['values']['examples'] }}.jar" + job: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + driver: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} executor: instances: 1 + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} diff --git a/tests/templates/kuttl/spark-history-server/01-assert.yaml.j2 b/tests/templates/kuttl/spark-history-server/01-assert.yaml.j2 new file mode 100644 index 00000000..50b1d4c3 --- /dev/null +++ b/tests/templates/kuttl/spark-history-server/01-assert.yaml.j2 @@ -0,0 +1,10 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +{% endif %} diff --git a/tests/templates/kuttl/spark-history-server/01-install-vector-aggregator-discovery-configmap.yaml.j2 b/tests/templates/kuttl/spark-history-server/01-install-vector-aggregator-discovery-configmap.yaml.j2 new file mode 100644 index 00000000..2d6a0df5 --- /dev/null +++ b/tests/templates/kuttl/spark-history-server/01-install-vector-aggregator-discovery-configmap.yaml.j2 @@ -0,0 +1,9 @@ +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +data: + ADDRESS: {{ lookup('env', 'VECTOR_AGGREGATOR') }} +{% endif %} diff --git a/tests/templates/kuttl/spark-history-server/06-assert.yaml b/tests/templates/kuttl/spark-history-server/06-assert.yaml index af9c593a..eec0d4a3 100644 --- a/tests/templates/kuttl/spark-history-server/06-assert.yaml +++ b/tests/templates/kuttl/spark-history-server/06-assert.yaml @@ -6,6 +6,6 @@ timeout: 900 apiVersion: apps/v1 kind: StatefulSet metadata: - name: spark-history + name: spark-history-node-cleaner status: readyReplicas: 1 diff --git a/tests/templates/kuttl/spark-history-server/06-deploy-history-server.yaml.j2 b/tests/templates/kuttl/spark-history-server/06-deploy-history-server.yaml.j2 index cfcd6c16..13baa1bc 100644 --- a/tests/templates/kuttl/spark-history-server/06-deploy-history-server.yaml.j2 +++ b/tests/templates/kuttl/spark-history-server/06-deploy-history-server.yaml.j2 @@ -7,6 +7,9 @@ spec: image: productVersion: "{{ test_scenario['values']['spark'].split('-stackable')[0] }}" stackableVersion: "{{ test_scenario['values']['spark'].split('-stackable')[1] }}" +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} logFileDirectory: s3: prefix: eventlogs/ @@ -15,6 +18,9 @@ spec: # For possible properties see: https://spark.apache.org/docs/latest/monitoring.html#spark-history-server-configuration-options #sparkConf: nodes: + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} roleGroups: cleaner: replicas: 1 diff --git a/tests/templates/kuttl/spark-history-server/10-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/spark-history-server/10-deploy-spark-app.yaml.j2 index 9d3f50c4..e144baf2 100644 --- a/tests/templates/kuttl/spark-history-server/10-deploy-spark-app.yaml.j2 +++ b/tests/templates/kuttl/spark-history-server/10-deploy-spark-app.yaml.j2 @@ -5,6 +5,9 @@ metadata: name: spark-pi-s3-1 spec: version: "1.0" +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} sparkImage: "docker.stackable.tech/stackable/spark-k8s:{{ test_scenario['values']['spark'].split('-stackable')[0] }}-stackable{{ test_scenario['values']['spark'].split('-stackable')[1] }}" sparkImagePullPolicy: IfNotPresent mode: cluster @@ -17,5 +20,13 @@ spec: prefix: eventlogs/ bucket: reference: spark-history-s3-bucket + job: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + driver: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} executor: instances: 1 + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} diff --git a/tests/templates/kuttl/spark-history-server/12-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/spark-history-server/12-deploy-spark-app.yaml.j2 index 92ddcd6d..2f3e2871 100644 --- a/tests/templates/kuttl/spark-history-server/12-deploy-spark-app.yaml.j2 +++ b/tests/templates/kuttl/spark-history-server/12-deploy-spark-app.yaml.j2 @@ -5,6 +5,9 @@ metadata: name: spark-pi-s3-2 spec: version: "1.0" +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} sparkImage: "docker.stackable.tech/stackable/spark-k8s:{{ test_scenario['values']['spark'].split('-stackable')[0] }}-stackable{{ test_scenario['values']['spark'].split('-stackable')[1] }}" sparkImagePullPolicy: IfNotPresent mode: cluster @@ -17,5 +20,13 @@ spec: prefix: eventlogs/ bucket: reference: spark-history-s3-bucket + job: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + driver: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} executor: instances: 1 + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} diff --git a/tests/templates/kuttl/spark-ny-public-s3/01-assert.yaml.j2 b/tests/templates/kuttl/spark-ny-public-s3/01-assert.yaml.j2 new file mode 100644 index 00000000..50b1d4c3 --- /dev/null +++ b/tests/templates/kuttl/spark-ny-public-s3/01-assert.yaml.j2 @@ -0,0 +1,10 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +{% endif %} diff --git a/tests/templates/kuttl/spark-ny-public-s3/01-install-vector-aggregator-discovery-configmap.yaml.j2 b/tests/templates/kuttl/spark-ny-public-s3/01-install-vector-aggregator-discovery-configmap.yaml.j2 new file mode 100644 index 00000000..2d6a0df5 --- /dev/null +++ b/tests/templates/kuttl/spark-ny-public-s3/01-install-vector-aggregator-discovery-configmap.yaml.j2 @@ -0,0 +1,9 @@ +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +data: + ADDRESS: {{ lookup('env', 'VECTOR_AGGREGATOR') }} +{% endif %} diff --git a/tests/templates/kuttl/spark-ny-public-s3/10-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/spark-ny-public-s3/10-deploy-spark-app.yaml.j2 index 95cbb8cc..47e03495 100644 --- a/tests/templates/kuttl/spark-ny-public-s3/10-deploy-spark-app.yaml.j2 +++ b/tests/templates/kuttl/spark-ny-public-s3/10-deploy-spark-app.yaml.j2 @@ -13,6 +13,9 @@ metadata: name: spark-ny-cm spec: version: "1.0" +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} sparkImage: "docker.stackable.tech/stackable/spark-k8s:{{ test_scenario['values']['spark'].split('-stackable')[0] }}-stackable{{ test_scenario['values']['spark'].split('-stackable')[1] }}" sparkImagePullPolicy: IfNotPresent mode: cluster @@ -29,12 +32,19 @@ spec: host: test-minio port: 9000 accessStyle: Path + job: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} driver: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} volumeMounts: - name: cm-job-arguments mountPath: /arguments executor: instances: 3 + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} volumeMounts: - name: cm-job-arguments mountPath: /arguments diff --git a/tests/templates/kuttl/spark-pi-private-s3/01-assert.yaml.j2 b/tests/templates/kuttl/spark-pi-private-s3/01-assert.yaml.j2 new file mode 100644 index 00000000..50b1d4c3 --- /dev/null +++ b/tests/templates/kuttl/spark-pi-private-s3/01-assert.yaml.j2 @@ -0,0 +1,10 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +{% endif %} diff --git a/tests/templates/kuttl/spark-pi-private-s3/01-install-vector-aggregator-discovery-configmap.yaml.j2 b/tests/templates/kuttl/spark-pi-private-s3/01-install-vector-aggregator-discovery-configmap.yaml.j2 new file mode 100644 index 00000000..2d6a0df5 --- /dev/null +++ b/tests/templates/kuttl/spark-pi-private-s3/01-install-vector-aggregator-discovery-configmap.yaml.j2 @@ -0,0 +1,9 @@ +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +data: + ADDRESS: {{ lookup('env', 'VECTOR_AGGREGATOR') }} +{% endif %} diff --git a/tests/templates/kuttl/spark-pi-private-s3/10-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/spark-pi-private-s3/10-deploy-spark-app.yaml.j2 index dc9d4b31..807e2123 100644 --- a/tests/templates/kuttl/spark-pi-private-s3/10-deploy-spark-app.yaml.j2 +++ b/tests/templates/kuttl/spark-pi-private-s3/10-deploy-spark-app.yaml.j2 @@ -5,6 +5,9 @@ metadata: name: spark-pi-private-s3 spec: version: "1.0" +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} sparkImage: docker.stackable.tech/stackable/spark-k8s:{{ test_scenario['values']['spark'].split('-stackable')[0] }}-stackable{{ test_scenario['values']['spark'].split('-stackable')[1] }} sparkImagePullPolicy: IfNotPresent mode: cluster @@ -17,5 +20,13 @@ spec: accessStyle: Path credentials: secretClass: spark-pi-private-s3-credentials-class + job: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + driver: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} executor: instances: 1 + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} diff --git a/tests/templates/kuttl/spark-pi-public-s3/01-assert.yaml.j2 b/tests/templates/kuttl/spark-pi-public-s3/01-assert.yaml.j2 new file mode 100644 index 00000000..50b1d4c3 --- /dev/null +++ b/tests/templates/kuttl/spark-pi-public-s3/01-assert.yaml.j2 @@ -0,0 +1,10 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +{% endif %} diff --git a/tests/templates/kuttl/spark-pi-public-s3/01-install-vector-aggregator-discovery-configmap.yaml.j2 b/tests/templates/kuttl/spark-pi-public-s3/01-install-vector-aggregator-discovery-configmap.yaml.j2 new file mode 100644 index 00000000..2d6a0df5 --- /dev/null +++ b/tests/templates/kuttl/spark-pi-public-s3/01-install-vector-aggregator-discovery-configmap.yaml.j2 @@ -0,0 +1,9 @@ +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +data: + ADDRESS: {{ lookup('env', 'VECTOR_AGGREGATOR') }} +{% endif %} diff --git a/tests/templates/kuttl/spark-pi-public-s3/10-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/spark-pi-public-s3/10-deploy-spark-app.yaml.j2 index 8b8b11ed..a16012ff 100644 --- a/tests/templates/kuttl/spark-pi-public-s3/10-deploy-spark-app.yaml.j2 +++ b/tests/templates/kuttl/spark-pi-public-s3/10-deploy-spark-app.yaml.j2 @@ -5,6 +5,9 @@ metadata: name: spark-pi-public-s3 spec: version: "1.0" +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} sparkImage: docker.stackable.tech/stackable/spark-k8s:{{ test_scenario['values']['spark'].split('-stackable')[0] }}-stackable{{ test_scenario['values']['spark'].split('-stackable')[1] }} sparkImagePullPolicy: IfNotPresent mode: cluster @@ -15,5 +18,13 @@ spec: host: test-minio port: 9000 accessStyle: Path + job: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + driver: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} executor: instances: 1 + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index 3a2c7267..e43789ea 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -10,7 +10,7 @@ dimensions: - "false" - name: spark values: - - 3.3.0-stackable0.3.0 + - 3.3.0-stackable23.4.0-rc2 - name: ny-tlc-report values: - 0.1.0 @@ -65,3 +65,10 @@ tests: - stackable - openshift - examples + - name: logging + dimensions: + - spark + - stackable + - ny-tlc-report + - openshift + - examples