diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f144101..6c277b5a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Added + +- Experimental support for Spark Connect ([#539]). + ### Changed - BREAKING: Replace stackable-operator `initialize_logging` with stackable-telemetry `Tracing` ([#547], [#554]). @@ -19,6 +23,7 @@ All notable changes to this project will be documented in this file. - Use `json` file extension for log files ([#553]). +[#539]: https://github.com/stackabletech/spark-k8s-operator/pull/539 [#547]: https://github.com/stackabletech/spark-k8s-operator/pull/547 [#551]: https://github.com/stackabletech/spark-k8s-operator/pull/551 [#553]: https://github.com/stackabletech/spark-k8s-operator/pull/553 diff --git a/crate-hashes.json b/crate-hashes.json index e4629311..330394f5 100644 --- a/crate-hashes.json +++ b/crate-hashes.json @@ -7,4 +7,4 @@ "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.91.1#stackable-versioned-macros@0.7.1": "11zqwlwvfigca7lfsdch1wqd3vl694hff1avf6rhiawpnassj2cw", "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.91.1#stackable-versioned@0.7.1": "11zqwlwvfigca7lfsdch1wqd3vl694hff1avf6rhiawpnassj2cw", "git+https://github.com/stackabletech/product-config.git?tag=0.7.0#product-config@0.7.0": "0gjsm80g6r75pm3824dcyiz4ysq1ka4c1if6k1mjm9cnd5ym0gny" -} \ No newline at end of file +} diff --git a/deploy/helm/spark-k8s-operator/crds/crds.yaml b/deploy/helm/spark-k8s-operator/crds/crds.yaml index cbdcb3f7..ab67da5b 100644 --- a/deploy/helm/spark-k8s-operator/crds/crds.yaml +++ b/deploy/helm/spark-k8s-operator/crds/crds.yaml @@ -368,7 +368,7 @@ spec: type: object type: array executor: - description: The executor role specifies the configuration that, together with the driver pod template, is used by Spark to create the executor pods. This is RoleGroup instead of plain CommonConfiguration because it needs to allows for the number of replicas. to be specified. + description: The executor role specifies the configuration that, together with the driver pod template, is used by Spark to create the executor pods. This is RoleGroup instead of plain CommonConfiguration because it needs to allow for the number of replicas. to be specified. nullable: true properties: cliOverrides: @@ -1845,3 +1845,584 @@ spec: served: true storage: true subresources: {} +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: sparkconnectservers.spark.stackable.tech + annotations: + helm.sh/resource-policy: keep +spec: + group: spark.stackable.tech + names: + categories: [] + kind: SparkConnectServer + plural: sparkconnectservers + shortNames: + - sparkconnect + singular: sparkconnectserver + scope: Namespaced + versions: + - additionalPrinterColumns: [] + name: v1alpha1 + schema: + openAPIV3Schema: + description: Auto-generated derived type for SparkConnectServerSpec via `CustomResource` + properties: + spec: + description: An Apache Spark Connect server component. This resource is managed by the Stackable operator for Apache Spark. Find more information on how to use it in the [operator documentation](https://docs.stackable.tech/home/nightly/spark-k8s/usage-guide/connect-server). + properties: + args: + default: [] + description: User provided command line arguments appended to the server entry point. + items: + type: string + type: array + clusterConfig: + default: + listenerClass: external-unstable + description: Global Spark Connect server configuration that applies to all roles. + properties: + listenerClass: + default: external-unstable + description: |- + This field controls which type of Service the Operator creates for this ConnectServer: + + * cluster-internal: Use a ClusterIP service + + * external-unstable: Use a NodePort service + + * external-stable: Use a LoadBalancer service + + This is a temporary solution with the goal to keep yaml manifests forward compatible. In the future, this setting will control which ListenerClass will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change. + enum: + - cluster-internal + - external-unstable + - external-stable + type: string + type: object + clusterOperation: + default: + reconciliationPaused: false + stopped: false + description: '[Cluster operations](https://docs.stackable.tech/home/nightly/concepts/operations/cluster_operations) properties, allow stopping the product instance as well as pausing reconciliation.' + properties: + reconciliationPaused: + default: false + description: Flag to stop cluster reconciliation by the operator. This means that all changes in the custom resource spec are ignored until this flag is set to false or removed. The operator will however still watch the deployed resources at the time and update the custom resource status field. If applied at the same time with `stopped`, `reconciliationPaused` will take precedence over `stopped` and stop the reconciliation immediately. + type: boolean + stopped: + default: false + description: Flag to stop the cluster. This means all deployed resources (e.g. Services, StatefulSets, ConfigMaps) are kept but all deployed Pods (e.g. replicas from a StatefulSet) are scaled to 0 and therefore stopped and removed. If applied at the same time with `reconciliationPaused`, the latter will pause reconciliation and `stopped` will take no effect until `reconciliationPaused` is set to false or removed. + type: boolean + type: object + executor: + description: Spark Connect executor properties. + nullable: true + properties: + cliOverrides: + additionalProperties: + type: string + default: {} + type: object + config: + default: {} + properties: + affinity: + default: + nodeAffinity: null + nodeSelector: null + podAffinity: null + podAntiAffinity: null + description: These configuration settings control [Pod placement](https://docs.stackable.tech/home/nightly/concepts/operations/pod_placement). + properties: + nodeAffinity: + description: Same as the `spec.affinity.nodeAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + nodeSelector: + additionalProperties: + type: string + description: Simple key-value pairs forming a nodeSelector, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + podAffinity: + description: Same as the `spec.affinity.podAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + podAntiAffinity: + description: Same as the `spec.affinity.podAntiAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + type: object + logging: + default: + containers: {} + enableVectorAgent: null + description: Logging configuration, learn more in the [logging concept documentation](https://docs.stackable.tech/home/nightly/concepts/logging). + properties: + containers: + additionalProperties: + anyOf: + - required: + - custom + - {} + description: Log configuration of the container + properties: + console: + description: Configuration for the console appender + nullable: true + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + description: ConfigMap containing the log configuration files + nullable: true + type: string + type: object + file: + description: Configuration for the file appender + nullable: true + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + loggers: + additionalProperties: + description: Configuration of a logger + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + default: {} + description: Configuration per logger + type: object + type: object + description: Log configuration per container. + type: object + enableVectorAgent: + description: Wether or not to deploy a container with the Vector log agent. + nullable: true + type: boolean + type: object + requestedSecretLifetime: + description: Request secret (currently only autoTls certificates) lifetime from the secret operator, e.g. `7d`, or `30d`. This can be shortened by the `maxCertificateLifetime` setting on the SecretClass issuing the TLS certificate. + nullable: true + type: string + resources: + default: + cpu: + max: null + min: null + memory: + limit: null + runtimeLimits: {} + storage: {} + description: Resource usage is configured here, this includes CPU usage, memory usage and disk storage usage, if this role needs any. + properties: + cpu: + default: + max: null + min: null + properties: + max: + description: The maximum amount of CPU cores that can be requested by Pods. Equivalent to the `limit` for Pod resource configuration. Cores are specified either as a decimal point number or as milli units. For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + type: string + min: + description: The minimal amount of CPU cores that Pods need to run. Equivalent to the `request` for Pod resource configuration. Cores are specified either as a decimal point number or as milli units. For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + type: string + type: object + memory: + properties: + limit: + description: 'The maximum amount of memory that should be available to the Pod. Specified as a byte [Quantity](https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/quantity/), which means these suffixes are supported: E, P, T, G, M, k. You can also use the power-of-two equivalents: Ei, Pi, Ti, Gi, Mi, Ki. For example, the following represent roughly the same value: `128974848, 129e6, 129M, 128974848000m, 123Mi`' + nullable: true + type: string + runtimeLimits: + description: Additional options that can be specified. + type: object + type: object + storage: + type: object + type: object + type: object + configOverrides: + additionalProperties: + additionalProperties: + type: string + type: object + default: {} + description: The `configOverrides` can be used to configure properties in product config files that are not exposed in the CRD. Read the [config overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#config-overrides) and consult the operator specific usage guide documentation for details on the available config files and settings for the specific product. + type: object + envOverrides: + additionalProperties: + type: string + default: {} + description: '`envOverrides` configure environment variables to be set in the Pods. It is a map from strings to strings - environment variables and the value to set. Read the [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) for more information and consult the operator specific usage guide to find out about the product specific environment variables that are available.' + type: object + jvmArgumentOverrides: + default: + add: [] + remove: [] + removeRegex: [] + description: Allows overriding JVM arguments. Please read on the [JVM argument overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#jvm-argument-overrides) for details on the usage. + properties: + add: + default: [] + description: JVM arguments to be added + items: + type: string + type: array + remove: + default: [] + description: JVM arguments to be removed by exact match + items: + type: string + type: array + removeRegex: + default: [] + description: JVM arguments matching any of this regexes will be removed + items: + type: string + type: array + type: object + podOverrides: + default: {} + description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information. + type: object + x-kubernetes-preserve-unknown-fields: true + type: object + image: + anyOf: + - required: + - custom + - productVersion + - required: + - productVersion + description: |- + Specify which image to use, the easiest way is to only configure the `productVersion`. You can also configure a custom image registry to pull from, as well as completely custom images. + + Consult the [Product image selection documentation](https://docs.stackable.tech/home/nightly/concepts/product_image_selection) for details. + properties: + custom: + description: Overwrite the docker image. Specify the full docker image name, e.g. `oci.stackable.tech/sdp/superset:1.4.1-stackable2.1.0` + type: string + productVersion: + description: Version of the product, e.g. `1.4.1`. + type: string + pullPolicy: + default: Always + description: '[Pull policy](https://kubernetes.io/docs/concepts/containers/images/#image-pull-policy) used when pulling the image.' + enum: + - IfNotPresent + - Always + - Never + type: string + pullSecrets: + description: '[Image pull secrets](https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod) to pull images from a private registry.' + items: + description: LocalObjectReference contains enough information to let you locate the referenced object inside the same namespace. + properties: + name: + description: 'Name of the referent. This field is effectively required, but due to backwards compatibility is allowed to be empty. Instances of this type with an empty value here are almost certainly wrong. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' + type: string + required: + - name + type: object + nullable: true + type: array + repo: + description: Name of the docker repo, e.g. `oci.stackable.tech/sdp` + nullable: true + type: string + stackableVersion: + description: Stackable version of the product, e.g. `23.4`, `23.4.1` or `0.0.0-dev`. If not specified, the operator will use its own version, e.g. `23.4.1`. When using a nightly operator or a pr version, it will use the nightly `0.0.0-dev` image. + nullable: true + type: string + type: object + server: + description: A Spark Connect server definition. + nullable: true + properties: + cliOverrides: + additionalProperties: + type: string + default: {} + type: object + config: + default: {} + properties: + logging: + default: + containers: {} + enableVectorAgent: null + description: Logging configuration, learn more in the [logging concept documentation](https://docs.stackable.tech/home/nightly/concepts/logging). + properties: + containers: + additionalProperties: + anyOf: + - required: + - custom + - {} + description: Log configuration of the container + properties: + console: + description: Configuration for the console appender + nullable: true + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + description: ConfigMap containing the log configuration files + nullable: true + type: string + type: object + file: + description: Configuration for the file appender + nullable: true + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + loggers: + additionalProperties: + description: Configuration of a logger + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + default: {} + description: Configuration per logger + type: object + type: object + description: Log configuration per container. + type: object + enableVectorAgent: + description: Wether or not to deploy a container with the Vector log agent. + nullable: true + type: boolean + type: object + requestedSecretLifetime: + description: Request secret (currently only autoTls certificates) lifetime from the secret operator, e.g. `7d`, or `30d`. This can be shortened by the `maxCertificateLifetime` setting on the SecretClass issuing the TLS certificate. + nullable: true + type: string + resources: + default: + cpu: + max: null + min: null + memory: + limit: null + runtimeLimits: {} + storage: {} + description: Resource usage is configured here, this includes CPU usage, memory usage and disk storage usage, if this role needs any. + properties: + cpu: + default: + max: null + min: null + properties: + max: + description: The maximum amount of CPU cores that can be requested by Pods. Equivalent to the `limit` for Pod resource configuration. Cores are specified either as a decimal point number or as milli units. For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + type: string + min: + description: The minimal amount of CPU cores that Pods need to run. Equivalent to the `request` for Pod resource configuration. Cores are specified either as a decimal point number or as milli units. For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + type: string + type: object + memory: + properties: + limit: + description: 'The maximum amount of memory that should be available to the Pod. Specified as a byte [Quantity](https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/quantity/), which means these suffixes are supported: E, P, T, G, M, k. You can also use the power-of-two equivalents: Ei, Pi, Ti, Gi, Mi, Ki. For example, the following represent roughly the same value: `128974848, 129e6, 129M, 128974848000m, 123Mi`' + nullable: true + type: string + runtimeLimits: + description: Additional options that can be specified. + type: object + type: object + storage: + type: object + type: object + type: object + configOverrides: + additionalProperties: + additionalProperties: + type: string + type: object + default: {} + description: The `configOverrides` can be used to configure properties in product config files that are not exposed in the CRD. Read the [config overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#config-overrides) and consult the operator specific usage guide documentation for details on the available config files and settings for the specific product. + type: object + envOverrides: + additionalProperties: + type: string + default: {} + description: '`envOverrides` configure environment variables to be set in the Pods. It is a map from strings to strings - environment variables and the value to set. Read the [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) for more information and consult the operator specific usage guide to find out about the product specific environment variables that are available.' + type: object + jvmArgumentOverrides: + default: + add: [] + remove: [] + removeRegex: [] + description: Allows overriding JVM arguments. Please read on the [JVM argument overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#jvm-argument-overrides) for details on the usage. + properties: + add: + default: [] + description: JVM arguments to be added + items: + type: string + type: array + remove: + default: [] + description: JVM arguments to be removed by exact match + items: + type: string + type: array + removeRegex: + default: [] + description: JVM arguments matching any of this regexes will be removed + items: + type: string + type: array + type: object + podOverrides: + default: {} + description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information. + type: object + x-kubernetes-preserve-unknown-fields: true + type: object + vectorAggregatorConfigMapName: + description: Name of the Vector aggregator discovery ConfigMap. It must contain the key `ADDRESS` with the address of the Vector aggregator. + nullable: true + type: string + required: + - image + type: object + status: + nullable: true + properties: + conditions: + default: [] + items: + properties: + lastTransitionTime: + description: Last time the condition transitioned from one status to another. + format: date-time + nullable: true + type: string + lastUpdateTime: + description: The last time this condition was updated. + format: date-time + nullable: true + type: string + message: + description: A human readable message indicating details about the transition. + nullable: true + type: string + reason: + description: The reason for the condition's last transition. + nullable: true + type: string + status: + description: Status of the condition, one of True, False, Unknown. + enum: + - 'True' + - 'False' + - Unknown + type: string + type: + description: Type of deployment condition. + enum: + - Available + - Degraded + - Progressing + - ReconciliationPaused + - Stopped + type: string + required: + - status + - type + type: object + type: array + type: object + required: + - spec + title: SparkConnectServer + type: object + served: true + storage: true + subresources: + status: {} diff --git a/deploy/helm/spark-k8s-operator/templates/roles.yaml b/deploy/helm/spark-k8s-operator/templates/roles.yaml index df9bb848..8a7fd882 100644 --- a/deploy/helm/spark-k8s-operator/templates/roles.yaml +++ b/deploy/helm/spark-k8s-operator/templates/roles.yaml @@ -53,6 +53,7 @@ rules: - apps resources: - statefulsets + - deployments verbs: - create - delete @@ -102,6 +103,7 @@ rules: resources: - sparkapplications - sparkhistoryservers + - sparkconnectservers verbs: - get - list @@ -111,6 +113,7 @@ rules: - spark.stackable.tech resources: - sparkapplications/status + - sparkconnectservers/status verbs: - patch - apiGroups: diff --git a/docs/modules/spark-k8s/examples/example-spark-connect.yaml b/docs/modules/spark-k8s/examples/example-spark-connect.yaml new file mode 100644 index 00000000..c89cd2e1 --- /dev/null +++ b/docs/modules/spark-k8s/examples/example-spark-connect.yaml @@ -0,0 +1,44 @@ +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkConnectServer +metadata: + name: spark-connect # <1> +spec: + image: + productVersion: "3.5.5" # <2> + pullPolicy: IfNotPresent + args: + - "--package org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1" # <3> + server: + podOverrides: + spec: + containers: + - name: spark + env: + - name: DEMO_GREETING # <4> + value: "Hello" + jvmArgumentOverrides: + add: + - -Dmy.custom.jvm.arg=customValue # <5> + config: + logging: + enableVectorAgent: False + containers: + spark: + custom: + configMap: spark-connect-log-config # <6> + configOverrides: + spark-defaults.conf: + spark.driver.cores: "3" # <7> + executor: + configOverrides: + spark-defaults.conf: + spark.executor.memoryOverhead: "1m" # <8> + spark.executor.instances: "3" + config: + logging: + enableVectorAgent: False + containers: + spark: + custom: + configMap: spark-connect-log-config diff --git a/docs/modules/spark-k8s/pages/usage-guide/spark-connect.adoc b/docs/modules/spark-k8s/pages/usage-guide/spark-connect.adoc new file mode 100644 index 00000000..d4a0307d --- /dev/null +++ b/docs/modules/spark-k8s/pages/usage-guide/spark-connect.adoc @@ -0,0 +1,59 @@ += Spark Connect +:description: Set up a Spark Connect Server with Kubernetes as distributed execution engine with an external service to be used by clients +:page-aliases: spark_connect.adoc + +WARNING: Support for Apache Spark Connect is considered experimental and is subject to change in future releases. Spark Connect is a young technology and there are important questions to be answered yet, mostly related to security and multi-tenancy. + +Apache Spark Connect is a remote procedure call (RPC) server that allows clients to run Spark applications on a remote cluster. Clients can connect to the Spark Connect server using a variety of programming languages, editors and IDEs without needing to install Spark locally. + +The Stackable Spark operator can set up Spark Connect servers backed by Kubernetes as a distributed execution engine. + +== Deployment + +The example below demonstrates how to set up a Spark Connect server and apply some customizations. + +[source,yaml] +---- +include::example$example-spark-connect.yaml[] +---- + +<1> The name of the Spark Connect server. +<2> Version of the Spark Connect server. +<3> Additional package to install when starting the Spark Connect server and executors. +<4> Environment variable to be created via `podOverrides`. Alternatively, the environment variable can be set in the `spec.server.envOverrides` section. +<5> Additional argument to be passed to the Spark Connect JVM settings. Do not use this to tweak heap settings. Use `spec.server.jvmOptions` instead. +<6> A custom log4j configuration file to be used by the Spark Connect server. The config map must have an entry called `log4j.properties`. +<7> Customize the driver properties in the `server` role. The number of cores here is not related to Kubernetes cores! +<8> Customize `spark.executor.\*` and `spark.kubernetes.executor.*` in the `executor` role. + +== Metrics + +The server pod exposes Prometheus metrics at the following endpoints: + +* `/metrics/prometheus` for driver instances. +* `/metrics/executors/prometheus` for executor instances. + +To customize the metrics configuration use the `spec.server.configOverrides' like this: + +``` +spec: + server: + configOverrides: + metrics.properties: + applications.sink.prometheusServlet.path: "/metrics/applications/prometheus" +``` + +The example above adds a new endpoint for application metrics. + +== Notable Omissions + +The following features are not supported by the Stackable Spark operator yet + +* Integration with the Spark History Server. +* Authorization and authentication. Currently, anyone with access to the Spark Connect service can run jobs. +* Volumes and volume mounts can be added only with pod overrides. +* Job dependencies must be provisioned as custom images or via `--packages` or `--jars` arguments. + +== Known Issues + +* Dynamically provisioning the iceberg runtime leads to "iceberg.SparkWrite$WriterFactory" ClassNotfoundException when attempting to use it from clients. diff --git a/docs/modules/spark-k8s/partials/nav.adoc b/docs/modules/spark-k8s/partials/nav.adoc index db281b43..e4ac8d3e 100644 --- a/docs/modules/spark-k8s/partials/nav.adoc +++ b/docs/modules/spark-k8s/partials/nav.adoc @@ -9,6 +9,7 @@ ** xref:spark-k8s:usage-guide/security.adoc[] ** xref:spark-k8s:usage-guide/logging.adoc[] ** xref:spark-k8s:usage-guide/history-server.adoc[] +** xref:spark-k8s:usage-guide/spark-connect.adoc[] ** xref:spark-k8s:usage-guide/examples.adoc[] ** xref:spark-k8s:usage-guide/overrides.adoc[] ** xref:spark-k8s:usage-guide/operations/index.adoc[] diff --git a/rust/operator-binary/src/connect/common.rs b/rust/operator-binary/src/connect/common.rs new file mode 100644 index 00000000..5f5facfb --- /dev/null +++ b/rust/operator-binary/src/connect/common.rs @@ -0,0 +1,152 @@ +use std::collections::{BTreeMap, HashMap}; + +use product_config::writer::to_java_properties_string; +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + kvp::ObjectLabels, + role_utils::{JavaCommonConfig, JvmArgumentOverrides}, +}; +use strum::Display; + +use super::crd::CONNECT_EXECUTOR_ROLE_NAME; +use crate::{ + connect::crd::{ + CONNECT_CONTROLLER_NAME, CONNECT_SERVER_ROLE_NAME, DUMMY_SPARK_CONNECT_GROUP_NAME, + }, + crd::constants::{APP_NAME, OPERATOR_NAME}, +}; + +#[derive(Snafu, Debug)] +#[allow(clippy::enum_variant_names)] +pub enum Error { + #[snafu(display("failed to merge jvm argument overrides"))] + MergeJvmArgumentOverrides { + source: stackable_operator::role_utils::Error, + }, + + #[snafu(display("failed to serialize spark properties"))] + SparkProperties { + source: product_config::writer::PropertiesWriterError, + }, + + #[snafu(display("failed to serialize jvm security properties",))] + JvmSecurityProperties { + source: product_config::writer::PropertiesWriterError, + }, +} + +pub(crate) fn labels<'a, T>( + scs: &'a T, + app_version_label: &'a str, + role: &'a str, +) -> ObjectLabels<'a, T> { + ObjectLabels { + owner: scs, + app_name: APP_NAME, + app_version: app_version_label, + operator_name: OPERATOR_NAME, + controller_name: CONNECT_CONTROLLER_NAME, + role, + role_group: DUMMY_SPARK_CONNECT_GROUP_NAME, + } +} + +// The dead code annotation is to shut up complains about missing Executor instantiations +// These will come in the future. +#[allow(dead_code)] +#[derive(Clone, Debug, Display)] +#[strum(serialize_all = "lowercase")] +pub(crate) enum SparkConnectRole { + Server, + Executor, +} + +pub(crate) fn object_name(stacklet_name: &str, role: SparkConnectRole) -> String { + match role { + SparkConnectRole::Server => format!("{}-{}", stacklet_name, CONNECT_SERVER_ROLE_NAME), + SparkConnectRole::Executor => format!("{}-{}", stacklet_name, CONNECT_EXECUTOR_ROLE_NAME), + } +} + +// Returns the jvm arguments a user has provided merged with the operator props. +pub(crate) fn jvm_args( + jvm_args: &[String], + user_java_config: Option<&JavaCommonConfig>, +) -> Result { + if let Some(user_jvm_props) = user_java_config { + let operator_generated = JvmArgumentOverrides::new_with_only_additions(jvm_args.to_vec()); + let mut user_jvm_props_copy = user_jvm_props.jvm_argument_overrides.clone(); + user_jvm_props_copy + .try_merge(&operator_generated) + .context(MergeJvmArgumentOverridesSnafu)?; + Ok(user_jvm_props_copy + .effective_jvm_config_after_merging() + .join(" ")) + } else { + Ok(jvm_args.join(" ")) + } +} + +// Merges server and executor properties and renders the contents +// of the Spark properties file. +pub(crate) fn spark_properties( + props: &[BTreeMap>; 2], +) -> Result { + let mut result = BTreeMap::new(); + for p in props { + result.extend(p); + } + to_java_properties_string(result.into_iter()).context(SparkPropertiesSnafu) +} + +pub(crate) fn security_properties( + config_overrides: Option<&HashMap>, +) -> Result { + let mut result: BTreeMap> = [ + ( + "networkaddress.cache.ttl".to_string(), + Some("30".to_string()), + ), + ( + "networkaddress.cache.negative.ttl".to_string(), + Some("0".to_string()), + ), + ] + .into(); + + if let Some(user_config) = config_overrides { + result.extend( + user_config + .iter() + .map(|(k, v)| (k.clone(), Some(v.clone()))), + ); + } + + to_java_properties_string(result.iter()).context(JvmSecurityPropertiesSnafu) +} + +pub(crate) fn metrics_properties( + config_overrides: Option<&HashMap>, +) -> Result { + let mut result: BTreeMap> = [ + ( + "*.sink.prometheusServlet.class".to_string(), + Some("org.apache.spark.metrics.sink.PrometheusServlet".to_string()), + ), + ( + "*.sink.prometheusServlet.path".to_string(), + Some("/metrics/prometheus".to_string()), + ), + ] + .into(); + + if let Some(user_config) = config_overrides { + result.extend( + user_config + .iter() + .map(|(k, v)| (k.clone(), Some(v.clone()))), + ); + } + + to_java_properties_string(result.iter()).context(JvmSecurityPropertiesSnafu) +} diff --git a/rust/operator-binary/src/connect/controller.rs b/rust/operator-binary/src/connect/controller.rs new file mode 100644 index 00000000..e1031a17 --- /dev/null +++ b/rust/operator-binary/src/connect/controller.rs @@ -0,0 +1,330 @@ +use std::sync::Arc; + +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + cluster_resources::{ClusterResourceApplyStrategy, ClusterResources}, + commons::rbac::build_rbac_resources, + kube::{ + Resource, ResourceExt, + core::{DeserializeGuard, error_boundary}, + runtime::controller::Action, + }, + logging::controller::ReconcilerError, + status::condition::{ + compute_conditions, deployment::DeploymentConditionBuilder, + operations::ClusterOperationsConditionBuilder, + }, + time::Duration, +}; +use strum::{EnumDiscriminants, IntoStaticStr}; + +use super::crd::{CONNECT_CONTROLLER_NAME, v1alpha1}; +use crate::{ + Ctx, + connect::{common, crd::SparkConnectServerStatus, executor, server}, + crd::constants::{APP_NAME, OPERATOR_NAME, SPARK_IMAGE_BASE_NAME}, +}; + +#[derive(Snafu, Debug, EnumDiscriminants)] +#[strum_discriminants(derive(IntoStaticStr))] +#[allow(clippy::enum_variant_names)] +pub enum Error { + #[snafu(display("failed to serialize connect properties"))] + SerializeProperties { source: common::Error }, + + #[snafu(display("failed to build connect executor properties"))] + ExecutorProperties { source: executor::Error }, + + #[snafu(display("failed to build connect server properties"))] + ServerProperties { source: server::Error }, + + #[snafu(display("failed to build spark connect service"))] + BuildService { source: server::Error }, + + #[snafu(display("failed to build spark connect executor config map for {name}"))] + BuildExecutorConfigMap { + source: executor::Error, + name: String, + }, + + #[snafu(display("failed to build spark connect server config map for {name}"))] + BuildServerConfigMap { source: server::Error, name: String }, + + #[snafu(display("failed to build spark connect deployment"))] + BuildServerDeployment { source: server::Error }, + + #[snafu(display("failed to update status of spark connect server {name}"))] + ApplyStatus { + source: stackable_operator::client::Error, + name: String, + }, + + #[snafu(display("spark connect object has no namespace"))] + ObjectHasNoNamespace, + + #[snafu(display("failed to update the connect server deployment"))] + ApplyDeployment { + source: stackable_operator::cluster_resources::Error, + }, + + #[snafu(display("failed to update connect executor config map for {name}"))] + ApplyExecutorConfigMap { + source: stackable_operator::cluster_resources::Error, + name: String, + }, + + #[snafu(display("failed to update connect server config map for {name}"))] + ApplyServerConfigMap { + source: stackable_operator::cluster_resources::Error, + name: String, + }, + + #[snafu(display("failed to update connect server service"))] + ApplyService { + source: stackable_operator::cluster_resources::Error, + }, + + #[snafu(display("failed to apply role ServiceAccount"))] + ApplyServiceAccount { + source: stackable_operator::cluster_resources::Error, + }, + + #[snafu(display("failed to apply global RoleBinding"))] + ApplyRoleBinding { + source: stackable_operator::cluster_resources::Error, + }, + + #[snafu(display("failed to create cluster resources"))] + CreateClusterResources { + source: stackable_operator::cluster_resources::Error, + }, + + #[snafu(display("failed to delete orphaned resources"))] + DeleteOrphanedResources { + source: stackable_operator::cluster_resources::Error, + }, + + #[snafu(display("failed to get required Labels"))] + GetRequiredLabels { + source: + stackable_operator::kvp::KeyValuePairError, + }, + + #[snafu(display("SparkConnectServer object is invalid"))] + InvalidSparkConnectServer { + source: error_boundary::InvalidObject, + }, + + #[snafu(display("failed to build RBAC resources"))] + BuildRbacResources { + source: stackable_operator::commons::rbac::Error, + }, + #[snafu(display("failed to build connect server configuration"))] + ServerConfig { source: crate::connect::crd::Error }, + + #[snafu(display("failed to build connect executor configuration"))] + ExecutorConfig { source: crate::connect::crd::Error }, + + #[snafu(display("failed to build connect executor pod template"))] + ExecutorPodTemplate { + source: crate::connect::executor::Error, + }, + + #[snafu(display("failed to serialize executor pod template"))] + ExecutorPodTemplateSerde { source: serde_yaml::Error }, +} + +type Result = std::result::Result; + +impl ReconcilerError for Error { + fn category(&self) -> &'static str { + ErrorDiscriminants::from(self).into() + } +} +/// Updates the status of the SparkApplication that started the pod. +pub async fn reconcile( + scs: Arc>, + ctx: Arc, +) -> Result { + tracing::info!("Starting reconcile connect server"); + + let scs = scs + .0 + .as_ref() + .map_err(error_boundary::InvalidObject::clone) + .context(InvalidSparkConnectServerSnafu)?; + + let server_config = scs.server_config().context(ServerConfigSnafu)?; + let executor_config = scs.executor_config().context(ExecutorConfigSnafu)?; + + let client = &ctx.client; + + let mut cluster_resources = ClusterResources::new( + APP_NAME, + OPERATOR_NAME, + CONNECT_CONTROLLER_NAME, + &scs.object_ref(&()), + ClusterResourceApplyStrategy::from(&scs.spec.cluster_operation), + ) + .context(CreateClusterResourcesSnafu)?; + + let resolved_product_image = scs + .spec + .image + .resolve(SPARK_IMAGE_BASE_NAME, crate::built_info::PKG_VERSION); + + // Use a dedicated service account for connect server pods. + let (service_account, role_binding) = build_rbac_resources( + scs, + APP_NAME, + cluster_resources + .get_required_labels() + .context(GetRequiredLabelsSnafu)?, + ) + .context(BuildRbacResourcesSnafu)?; + + let service_account = cluster_resources + .add(client, service_account) + .await + .context(ApplyServiceAccountSnafu)?; + cluster_resources + .add(client, role_binding) + .await + .context(ApplyRoleBindingSnafu)?; + + // Expose connect server to the outside world + let service = server::build_service(scs, &resolved_product_image.app_version_label, None) + .context(BuildServiceSnafu)?; + cluster_resources + .add(client, service.clone()) + .await + .context(ApplyServiceSnafu)?; + + // Headless service used by executors connect back to the driver + let service = server::build_service( + scs, + &resolved_product_image.app_version_label, + Some("None".to_string()), + ) + .context(BuildServiceSnafu)?; + + cluster_resources + .add(client, service.clone()) + .await + .context(ApplyServiceSnafu)?; + + // ======================================== + // Server config map + + let spark_props = common::spark_properties(&[ + server::server_properties( + scs, + &server_config, + &service, + &service_account, + &resolved_product_image, + ) + .context(ServerPropertiesSnafu)?, + executor::executor_properties(scs, &executor_config, &resolved_product_image) + .context(ExecutorPropertiesSnafu)?, + ]) + .context(SerializePropertiesSnafu)?; + + // ======================================== + // Executor config map and pod template + let executor_config_map = + executor::executor_config_map(scs, &executor_config, &resolved_product_image).context( + BuildExecutorConfigMapSnafu { + name: scs.name_unchecked(), + }, + )?; + cluster_resources + .add(client, executor_config_map.clone()) + .await + .context(ApplyExecutorConfigMapSnafu { + name: scs.name_unchecked(), + })?; + + let executor_pod_template = serde_yaml::to_string( + &executor::executor_pod_template( + scs, + &executor_config, + &resolved_product_image, + &executor_config_map, + ) + .context(ExecutorPodTemplateSnafu)?, + ) + .context(ExecutorPodTemplateSerdeSnafu)?; + + // ======================================== + // Server config map + let server_config_map = server::server_config_map( + scs, + &server_config, + &resolved_product_image, + &spark_props, + &executor_pod_template, + ) + .context(BuildServerConfigMapSnafu { + name: scs.name_unchecked(), + })?; + cluster_resources + .add(client, server_config_map.clone()) + .await + .context(ApplyServerConfigMapSnafu { + name: scs.name_unchecked(), + })?; + + let args = server::command_args(&scs.spec.args); + let deployment = server::build_deployment( + scs, + &server_config, + &resolved_product_image, + &service_account, + &server_config_map, + args, + ) + .context(BuildServerDeploymentSnafu)?; + + let mut ss_cond_builder = DeploymentConditionBuilder::default(); + + ss_cond_builder.add( + cluster_resources + .add(client, deployment) + .await + .context(ApplyDeploymentSnafu)?, + ); + + cluster_resources + .delete_orphaned_resources(client) + .await + .context(DeleteOrphanedResourcesSnafu)?; + + // ======================================== + // Spark connect server status + let cluster_operation_cond_builder = + ClusterOperationsConditionBuilder::new(&scs.spec.cluster_operation); + + let status = SparkConnectServerStatus { + conditions: compute_conditions(scs, &[&ss_cond_builder, &cluster_operation_cond_builder]), + }; + client + .apply_patch_status(OPERATOR_NAME, scs, &status) + .await + .context(ApplyStatusSnafu { + name: scs.name_any(), + })?; + + Ok(Action::await_change()) +} + +pub fn error_policy( + _obj: Arc>, + error: &Error, + _ctx: Arc, +) -> Action { + match error { + Error::InvalidSparkConnectServer { .. } => Action::await_change(), + _ => Action::requeue(*Duration::from_secs(5)), + } +} diff --git a/rust/operator-binary/src/connect/crd.rs b/rust/operator-binary/src/connect/crd.rs new file mode 100644 index 00000000..a491e952 --- /dev/null +++ b/rust/operator-binary/src/connect/crd.rs @@ -0,0 +1,396 @@ +use const_format::concatcp; +use serde::{Deserialize, Serialize}; +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + commons::{ + affinity::{StackableAffinity, StackableAffinityFragment, affinity_between_role_pods}, + cluster_operation::ClusterOperation, + product_image_selection::ProductImage, + resources::{ + CpuLimitsFragment, MemoryLimitsFragment, NoRuntimeLimits, NoRuntimeLimitsFragment, + Resources, ResourcesFragment, + }, + }, + config::{ + fragment::{self, Fragment, ValidationError}, + merge::Merge, + }, + k8s_openapi::{api::core::v1::PodAntiAffinity, apimachinery::pkg::api::resource::Quantity}, + kube::{CustomResource, ResourceExt}, + product_logging::{ + self, + spec::{ + ConfigMapLogConfig, ContainerLogConfig, ContainerLogConfigChoice, + CustomContainerLogConfig, Logging, + }, + }, + role_utils::{CommonConfiguration, JavaCommonConfig}, + schemars::{self, JsonSchema}, + status::condition::{ClusterCondition, HasStatusCondition}, + time::Duration, + versioned::versioned, +}; +use strum::{Display, EnumIter}; + +use super::common::SparkConnectRole; +use crate::crd::constants::APP_NAME; + +pub const CONNECT_CONTROLLER_NAME: &str = "connect"; +pub const CONNECT_FULL_CONTROLLER_NAME: &str = concatcp!( + CONNECT_CONTROLLER_NAME, + '.', + crate::crd::constants::OPERATOR_NAME +); +pub const CONNECT_SERVER_ROLE_NAME: &str = "server"; +pub const CONNECT_EXECUTOR_ROLE_NAME: &str = "executor"; +pub const CONNECT_GRPC_PORT: i32 = 15002; +pub const CONNECT_UI_PORT: i32 = 4040; + +pub const DUMMY_SPARK_CONNECT_GROUP_NAME: &str = "default"; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("fragment validation failure"))] + FragmentValidationFailure { source: ValidationError }, +} + +#[versioned(version(name = "v1alpha1"))] +pub mod versioned { + + /// An Apache Spark Connect server component. This resource is managed by the Stackable operator + /// for Apache Spark. Find more information on how to use it in the + /// [operator documentation](DOCS_BASE_URL_PLACEHOLDER/spark-k8s/usage-guide/connect-server). + #[versioned(k8s( + group = "spark.stackable.tech", + kind = "SparkConnectServer", + plural = "sparkconnectservers", + shortname = "sparkconnect", + status = "SparkConnectServerStatus", + namespaced, + crates( + kube_core = "stackable_operator::kube::core", + k8s_openapi = "stackable_operator::k8s_openapi", + schemars = "stackable_operator::schemars" + ) + ))] + #[derive(Clone, CustomResource, Debug, Deserialize, JsonSchema, Serialize)] + #[serde(rename_all = "camelCase")] + pub struct SparkConnectServerSpec { + pub image: ProductImage, + + /// Global Spark Connect server configuration that applies to all roles. + #[serde(default)] + pub cluster_config: v1alpha1::SparkConnectServerClusterConfig, + + // no doc string - See ClusterOperation struct + #[serde(default)] + pub cluster_operation: ClusterOperation, + + /// User provided command line arguments appended to the server entry point. + #[serde(default)] + pub args: Vec, + + /// Name of the Vector aggregator discovery ConfigMap. + /// It must contain the key `ADDRESS` with the address of the Vector aggregator. + #[serde(skip_serializing_if = "Option::is_none")] + pub vector_aggregator_config_map_name: Option, + + /// A Spark Connect server definition. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub server: Option>, + + /// Spark Connect executor properties. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub executor: Option>, + } + + #[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)] + #[serde(rename_all = "camelCase")] + pub struct SparkConnectServerClusterConfig { + /// This field controls which type of Service the Operator creates for this ConnectServer: + /// + /// * cluster-internal: Use a ClusterIP service + /// + /// * external-unstable: Use a NodePort service + /// + /// * external-stable: Use a LoadBalancer service + /// + /// This is a temporary solution with the goal to keep yaml manifests forward compatible. + /// In the future, this setting will control which ListenerClass + /// will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change. + #[serde(default)] + pub listener_class: CurrentlySupportedListenerClasses, + } + + #[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)] + #[fragment_attrs( + derive( + Clone, + Debug, + Default, + Deserialize, + Merge, + JsonSchema, + PartialEq, + Serialize + ), + serde(rename_all = "camelCase") + )] + pub struct ServerConfig { + #[fragment_attrs(serde(default))] + pub resources: Resources, + + #[fragment_attrs(serde(default))] + pub logging: Logging, + + /// Request secret (currently only autoTls certificates) lifetime from the secret operator, e.g. `7d`, or `30d`. + /// This can be shortened by the `maxCertificateLifetime` setting on the SecretClass issuing the TLS certificate. + #[fragment_attrs(serde(default))] + pub requested_secret_lifetime: Option, + } + + #[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)] + #[fragment_attrs( + derive( + Clone, + Debug, + Default, + Deserialize, + Merge, + JsonSchema, + PartialEq, + Serialize + ), + serde(rename_all = "camelCase") + )] + pub struct ExecutorConfig { + #[fragment_attrs(serde(default))] + pub resources: Resources, + #[fragment_attrs(serde(default))] + pub logging: Logging, + #[fragment_attrs(serde(default))] + pub affinity: StackableAffinity, + + /// Request secret (currently only autoTls certificates) lifetime from the secret operator, e.g. `7d`, or `30d`. + /// This can be shortened by the `maxCertificateLifetime` setting on the SecretClass issuing the TLS certificate. + #[fragment_attrs(serde(default))] + pub requested_secret_lifetime: Option, + } +} + +// TODO: Temporary solution until listener-operator is finished +#[derive(Clone, Debug, Default, Display, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "PascalCase")] +pub(crate) enum CurrentlySupportedListenerClasses { + #[serde(rename = "cluster-internal")] + ClusterInternal, + #[default] + #[serde(rename = "external-unstable")] + ExternalUnstable, + #[serde(rename = "external-stable")] + ExternalStable, +} + +impl CurrentlySupportedListenerClasses { + pub fn k8s_service_type(&self) -> String { + match self { + CurrentlySupportedListenerClasses::ClusterInternal => "ClusterIP".to_string(), + CurrentlySupportedListenerClasses::ExternalUnstable => "NodePort".to_string(), + CurrentlySupportedListenerClasses::ExternalStable => "LoadBalancer".to_string(), + } + } +} + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)] +#[fragment_attrs( + allow(clippy::derive_partial_eq_without_eq), + derive( + Clone, + Debug, + Default, + Deserialize, + Merge, + JsonSchema, + PartialEq, + Serialize + ), + serde(rename_all = "camelCase") +)] +pub(crate) struct ConnectStorageConfig {} + +#[derive( + Clone, + Debug, + Deserialize, + Display, + EnumIter, + Eq, + JsonSchema, + Ord, + PartialEq, + PartialOrd, + Serialize, +)] +#[serde(rename_all = "lowercase")] +#[strum(serialize_all = "lowercase")] +pub(crate) enum SparkConnectContainer { + Spark, + Vector, +} + +impl v1alpha1::ServerConfig { + // Auto TLS certificate lifetime + const DEFAULT_CONNECT_SECRET_LIFETIME: Duration = Duration::from_days_unchecked(1); + + fn default_config() -> v1alpha1::ServerConfigFragment { + v1alpha1::ServerConfigFragment { + resources: ResourcesFragment { + cpu: CpuLimitsFragment { + min: Some(Quantity("250m".to_owned())), + max: Some(Quantity("1".to_owned())), + }, + memory: MemoryLimitsFragment { + limit: Some(Quantity("1024Mi".to_owned())), + runtime_limits: NoRuntimeLimitsFragment {}, + }, + storage: ConnectStorageConfigFragment {}, + }, + logging: product_logging::spec::default_logging(), + requested_secret_lifetime: Some(Self::DEFAULT_CONNECT_SECRET_LIFETIME), + } + } + + pub fn log_config_map(&self) -> Option { + let container_log_config = self + .logging + .containers + .get(&SparkConnectContainer::Spark) + .cloned(); + + match container_log_config { + Some(ContainerLogConfig { + choice: + Some(ContainerLogConfigChoice::Custom(CustomContainerLogConfig { + custom: ConfigMapLogConfig { config_map }, + })), + }) => Some(config_map.clone()), + _ => None, + } + } +} + +// This is the equivalent to merged_config() in other ops +// only here we only need to merge operator defaults with +// user configuration. +impl v1alpha1::SparkConnectServer { + pub fn server_config(&self) -> Result { + let defaults = v1alpha1::ServerConfig::default_config(); + fragment::validate( + match self.spec.server.as_ref().map(|cc| cc.config.clone()) { + Some(fragment) => { + let mut fc = fragment.clone(); + fc.merge(&defaults); + fc + } + _ => defaults, + }, + ) + .context(FragmentValidationFailureSnafu) + } + + pub fn executor_config(&self) -> Result { + let defaults = v1alpha1::ExecutorConfig::default_config(&self.name_unchecked()); + fragment::validate( + match self.spec.executor.as_ref().map(|cc| cc.config.clone()) { + Some(fragment) => { + let mut fc = fragment.clone(); + fc.merge(&defaults); + fc + } + _ => defaults, + }, + ) + .context(FragmentValidationFailureSnafu) + } +} + +#[derive(Clone, Debug, Default, Deserialize, JsonSchema, PartialEq, Eq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct SparkConnectServerStatus { + #[serde(default)] + pub conditions: Vec, +} + +impl HasStatusCondition for v1alpha1::SparkConnectServer { + fn conditions(&self) -> Vec { + match &self.status { + Some(status) => status.conditions.clone(), + None => vec![], + } + } +} + +impl v1alpha1::ExecutorConfig { + // Auto TLS certificate lifetime + const DEFAULT_CONNECT_SECRET_LIFETIME: Duration = Duration::from_days_unchecked(1); + + fn default_config(cluster_name: &str) -> v1alpha1::ExecutorConfigFragment { + v1alpha1::ExecutorConfigFragment { + resources: ResourcesFragment { + cpu: CpuLimitsFragment { + min: Some(Quantity("1".to_owned())), + max: Some(Quantity("1".to_owned())), + }, + memory: MemoryLimitsFragment { + limit: Some(Quantity("1024M".to_owned())), + runtime_limits: NoRuntimeLimitsFragment {}, + }, + storage: ConnectStorageConfigFragment {}, + }, + logging: product_logging::spec::default_logging(), + affinity: v1alpha1::ExecutorConfig::affinity(cluster_name), + + requested_secret_lifetime: Some(Self::DEFAULT_CONNECT_SECRET_LIFETIME), + } + } + + pub fn log_config_map(&self) -> Option { + let container_log_config = self + .logging + .containers + .get(&SparkConnectContainer::Spark) + .cloned(); + + match container_log_config { + Some(ContainerLogConfig { + choice: + Some(ContainerLogConfigChoice::Custom(CustomContainerLogConfig { + custom: ConfigMapLogConfig { config_map }, + })), + }) => Some(config_map.clone()), + _ => None, + } + } + + fn affinity(cluster_name: &str) -> StackableAffinityFragment { + let affinity_between_role_pods = affinity_between_role_pods( + APP_NAME, + cluster_name, + &SparkConnectRole::Executor.to_string(), + 70, + ); + + StackableAffinityFragment { + pod_affinity: None, + pod_anti_affinity: Some(PodAntiAffinity { + preferred_during_scheduling_ignored_during_execution: Some(vec![ + affinity_between_role_pods, + ]), + required_during_scheduling_ignored_during_execution: None, + }), + node_affinity: None, + node_selector: None, + } + } +} diff --git a/rust/operator-binary/src/connect/executor.rs b/rust/operator-binary/src/connect/executor.rs new file mode 100644 index 00000000..6b213c8d --- /dev/null +++ b/rust/operator-binary/src/connect/executor.rs @@ -0,0 +1,373 @@ +use std::collections::{BTreeMap, HashMap}; + +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + builder::{ + self, + configmap::ConfigMapBuilder, + meta::ObjectMetaBuilder, + pod::{PodBuilder, container::ContainerBuilder, volume::VolumeBuilder}, + }, + commons::{ + product_image_selection::ResolvedProductImage, + resources::{CpuLimits, MemoryLimits, Resources}, + }, + k8s_openapi::{ + DeepMerge, + api::core::v1::{ConfigMap, EnvVar, PodTemplateSpec}, + }, + kube::{ResourceExt, runtime::reflector::ObjectRef}, + product_logging::framework::calculate_log_volume_size_limit, + role_utils::RoleGroupRef, +}; + +use super::{ + common::{SparkConnectRole, object_name}, + crd::{DUMMY_SPARK_CONNECT_GROUP_NAME, SparkConnectContainer}, +}; +use crate::{ + connect::{common, crd::v1alpha1}, + crd::constants::{ + JVM_SECURITY_PROPERTIES_FILE, LOG4J2_CONFIG_FILE, MAX_SPARK_LOG_FILES_SIZE, + METRICS_PROPERTIES_FILE, POD_TEMPLATE_FILE, SPARK_DEFAULTS_FILE_NAME, + VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_NAME_LOG_CONFIG, + VOLUME_MOUNT_PATH_CONFIG, VOLUME_MOUNT_PATH_LOG, VOLUME_MOUNT_PATH_LOG_CONFIG, + }, + product_logging, +}; + +#[derive(Snafu, Debug)] +#[allow(clippy::enum_variant_names)] +pub enum Error { + #[snafu(display("object is missing metadata to build owner reference"))] + ObjectMissingMetadataForOwnerRef { + source: stackable_operator::builder::meta::Error, + }, + + #[snafu(display("failed to build metadata for spark connect executor pod template"))] + PodTemplateMetadataBuild { source: builder::meta::Error }, + + #[snafu(display("invalid connect container name"))] + InvalidContainerName { + source: builder::pod::container::Error, + }, + + #[snafu(display("failed to add volume"))] + AddVolume { source: builder::pod::Error }, + + #[snafu(display("failed to add volume mount"))] + AddVolumeMount { + source: builder::pod::container::Error, + }, + + #[snafu(display("failed build connect executor jvm args for {name}"))] + ExecutorJvmArgs { source: common::Error, name: String }, + + #[snafu(display("failed build connect executor security properties"))] + ExecutorJvmSecurityProperties { source: common::Error }, + + #[snafu(display("executor metrics properties for spark connect {name}",))] + MetricsProperties { source: common::Error, name: String }, + + #[snafu(display("failed build connect executor config map metadata"))] + ConfigMapMetadataBuild { source: builder::meta::Error }, + + #[snafu(display( + "failed to add the logging configuration to connect executor config map [{cm_name}]" + ))] + InvalidLoggingConfig { + source: product_logging::Error, + cm_name: String, + }, + + #[snafu(display("failed to build connect executor config map [{cm_name}]"))] + InvalidConfigMap { + source: builder::configmap::Error, + cm_name: String, + }, +} + +// The executor pod template can contain only a handful of properties. +// because spark overrides them. +// +// See https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template-properties +// for a list of properties that are overridden/changed by Spark. +// +// Most notable properties that cannot be set here are: +// - container resources +// +#[allow(clippy::result_large_err)] +pub fn executor_pod_template( + scs: &v1alpha1::SparkConnectServer, + config: &v1alpha1::ExecutorConfig, + resolved_product_image: &ResolvedProductImage, + config_map: &ConfigMap, +) -> Result { + let container_env = executor_env( + scs.spec + .executor + .as_ref() + .map(|s| s.env_overrides.clone()) + .as_ref(), + )?; + + let mut container = ContainerBuilder::new(&SparkConnectContainer::Spark.to_string()) + .context(InvalidContainerNameSnafu)?; + container + .add_env_vars(container_env) + .add_volume_mount(VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_PATH_CONFIG) + .context(AddVolumeMountSnafu)? + .add_volume_mount(VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_PATH_LOG) + .context(AddVolumeMountSnafu)?; + + let metadata = ObjectMetaBuilder::new() + .with_recommended_labels(common::labels( + scs, + &resolved_product_image.app_version_label, + &SparkConnectRole::Executor.to_string(), + )) + .context(PodTemplateMetadataBuildSnafu)? + .build(); + + let mut template = PodBuilder::new(); + template + .metadata(metadata) + .affinity(&config.affinity) + .add_volume( + VolumeBuilder::new(VOLUME_MOUNT_NAME_LOG) + .with_empty_dir( + None::, + Some(calculate_log_volume_size_limit(&[MAX_SPARK_LOG_FILES_SIZE])), + ) + .build(), + ) + .context(AddVolumeSnafu)? + .add_volume( + VolumeBuilder::new(VOLUME_MOUNT_NAME_CONFIG) + .with_config_map(config_map.name_unchecked()) + .build(), + ) + .context(AddVolumeSnafu)?; + + if let Some(cm_name) = config.log_config_map() { + container + .add_volume_mount(VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_LOG_CONFIG) + .context(AddVolumeMountSnafu)?; + + template + .add_volume( + VolumeBuilder::new(VOLUME_MOUNT_NAME_LOG_CONFIG) + .with_config_map(cm_name) + .build(), + ) + .context(AddVolumeSnafu)?; + } + + let mut result = template.add_container(container.build()).build_template(); + + // Merge user provided pod spec if any + if let Some(pod_overrides_spec) = scs.spec.executor.as_ref().map(|s| s.pod_overrides.clone()) { + result.merge_from(pod_overrides_spec); + } + + Ok(result) +} + +fn executor_env(env_overrides: Option<&HashMap>) -> Result, Error> { + let mut envs = BTreeMap::from([ + // Needed by the `containerdebug` running in the background of the connect container + // to log its tracing information to. + ( + "CONTAINERDEBUG_LOG_DIRECTORY".to_string(), + format!("{VOLUME_MOUNT_PATH_LOG}/containerdebug"), + ), + ]); + + // Add env overrides + if let Some(user_env) = env_overrides { + envs.extend(user_env.clone()); + } + + Ok(envs + .into_iter() + .map(|(name, value)| EnvVar { + name: name.to_owned(), + value: Some(value.to_owned()), + value_from: None, + }) + .collect()) +} + +pub(crate) fn executor_properties( + scs: &v1alpha1::SparkConnectServer, + config: &v1alpha1::ExecutorConfig, + resolved_product_image: &ResolvedProductImage, +) -> Result>, Error> { + let spark_image = resolved_product_image.image.clone(); + + let mut result: BTreeMap> = [ + ( + "spark.kubernetes.executor.container.image".to_string(), + Some(spark_image), + ), + ( + "spark.executor.defaultJavaOptions".to_string(), + Some(executor_jvm_args(scs, config)?), + ), + ( + "spark.kubernetes.executor.podTemplateFile".to_string(), + Some(format!("{VOLUME_MOUNT_PATH_CONFIG}/{POD_TEMPLATE_FILE}")), + ), + ( + "spark.kubernetes.executor.podTemplateContainerName".to_string(), + Some(SparkConnectContainer::Spark.to_string()), + ), + ] + .into(); + + // ======================================== + // Add executor resource properties + let Resources { + cpu: CpuLimits { min, max }, + memory: MemoryLimits { + limit, + runtime_limits: _, + }, + storage: _, + } = &config.resources; + result.insert( + "spark.kubernetes.executor.limit.cores".to_string(), + max.clone().map(|v| v.0), + ); + result.insert( + "spark.kubernetes.executor.request.cores".to_string(), + min.clone().map(|v| v.0), + ); + result.insert( + "spark.executor.memory".to_string(), + limit.clone().map(|v| v.0), + ); + // This ensures that the pod's memory limit is exactly the value + // in `config.resources.memory.limit`. + // By default, Spark computes an `executor.memoryOverhead` as 6-10% from the + // `executor.memory`. + result.insert( + "spark.executor.memoryOverhead".to_string(), + Some("0".to_string()), + ); + + // ======================================== + // Add the user provided executor properties + + let config_overrides = scs + .spec + .executor + .as_ref() + .and_then(|s| s.config_overrides.get(SPARK_DEFAULTS_FILE_NAME)); + + if let Some(user_config) = config_overrides { + result.extend( + user_config + .iter() + .map(|(k, v)| (k.clone(), Some(v.clone()))), + ); + } + + Ok(result) +} + +fn executor_jvm_args( + scs: &v1alpha1::SparkConnectServer, + config: &v1alpha1::ExecutorConfig, +) -> Result { + let mut jvm_args = vec![format!( + "-Djava.security.properties={VOLUME_MOUNT_PATH_CONFIG}/{JVM_SECURITY_PROPERTIES_FILE}" + )]; + + if config.log_config_map().is_some() { + jvm_args.push(format!( + "-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}" + )); + } + + common::jvm_args( + &jvm_args, + scs.spec + .executor + .as_ref() + .map(|s| &s.product_specific_common_config), + ) + .context(ExecutorJvmArgsSnafu { + name: scs.name_any(), + }) +} + +// Assemble the configuration of the spark-connect executor. +// This config map contains the following entries: +// - security.properties : with jvm dns cache ttls +// - log4j2.properties : with logging configuration (if configured) +// +pub(crate) fn executor_config_map( + scs: &v1alpha1::SparkConnectServer, + config: &v1alpha1::ExecutorConfig, + resolved_product_image: &ResolvedProductImage, +) -> Result { + let cm_name = object_name(&scs.name_any(), SparkConnectRole::Executor); + let jvm_sec_props = common::security_properties( + scs.spec + .executor + .as_ref() + .and_then(|s| s.config_overrides.get(JVM_SECURITY_PROPERTIES_FILE)), + ) + .context(ExecutorJvmSecurityPropertiesSnafu)?; + + let metrics_props = common::metrics_properties( + scs.spec + .executor + .as_ref() + .and_then(|s| s.config_overrides.get(METRICS_PROPERTIES_FILE)), + ) + .context(MetricsPropertiesSnafu { + name: scs.name_unchecked(), + })?; + + let mut cm_builder = ConfigMapBuilder::new(); + + cm_builder + .metadata( + ObjectMetaBuilder::new() + .name_and_namespace(scs) + .name(&cm_name) + .ownerreference_from_resource(scs, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(common::labels( + scs, + &resolved_product_image.app_version_label, + &SparkConnectRole::Executor.to_string(), + )) + .context(ConfigMapMetadataBuildSnafu)? + .build(), + ) + .add_data(JVM_SECURITY_PROPERTIES_FILE, jvm_sec_props) + .add_data(METRICS_PROPERTIES_FILE, metrics_props); + + let role_group_ref = RoleGroupRef { + cluster: ObjectRef::from_obj(scs), + role: SparkConnectRole::Executor.to_string(), + role_group: DUMMY_SPARK_CONNECT_GROUP_NAME.to_string(), + }; + product_logging::extend_config_map( + &role_group_ref, + &config.logging, + SparkConnectContainer::Spark, + SparkConnectContainer::Vector, + &mut cm_builder, + ) + .context(InvalidLoggingConfigSnafu { + cm_name: cm_name.clone(), + })?; + + cm_builder + .build() + .context(InvalidConfigMapSnafu { cm_name }) +} diff --git a/rust/operator-binary/src/connect/mod.rs b/rust/operator-binary/src/connect/mod.rs new file mode 100644 index 00000000..b8144f1a --- /dev/null +++ b/rust/operator-binary/src/connect/mod.rs @@ -0,0 +1,5 @@ +mod common; +pub mod controller; +pub mod crd; +mod executor; +pub mod server; diff --git a/rust/operator-binary/src/connect/server.rs b/rust/operator-binary/src/connect/server.rs new file mode 100644 index 00000000..18944430 --- /dev/null +++ b/rust/operator-binary/src/connect/server.rs @@ -0,0 +1,586 @@ +use std::collections::{BTreeMap, HashMap}; + +use snafu::{OptionExt, ResultExt, Snafu}; +use stackable_operator::{ + builder::{ + self, + configmap::ConfigMapBuilder, + meta::ObjectMetaBuilder, + pod::{ + PodBuilder, container::ContainerBuilder, resources::ResourceRequirementsBuilder, + volume::VolumeBuilder, + }, + }, + commons::product_image_selection::ResolvedProductImage, + k8s_openapi::{ + DeepMerge, + api::{ + apps::v1::{Deployment, DeploymentSpec}, + core::v1::{ + ConfigMap, EnvVar, HTTPGetAction, PodSecurityContext, Probe, Service, + ServiceAccount, ServicePort, ServiceSpec, + }, + }, + apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString}, + }, + kube::{ResourceExt, runtime::reflector::ObjectRef}, + kvp::{Label, Labels}, + product_logging::framework::{LoggingError, calculate_log_volume_size_limit, vector_container}, + role_utils::RoleGroupRef, +}; + +use crate::{ + connect::{ + common::{self, SparkConnectRole, object_name}, + crd::{ + CONNECT_GRPC_PORT, CONNECT_UI_PORT, DUMMY_SPARK_CONNECT_GROUP_NAME, + SparkConnectContainer, v1alpha1, + }, + }, + crd::constants::{ + APP_NAME, JVM_SECURITY_PROPERTIES_FILE, LOG4J2_CONFIG_FILE, MAX_SPARK_LOG_FILES_SIZE, + METRICS_PROPERTIES_FILE, POD_TEMPLATE_FILE, SPARK_DEFAULTS_FILE_NAME, SPARK_UID, + VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_NAME_LOG_CONFIG, + VOLUME_MOUNT_PATH_CONFIG, VOLUME_MOUNT_PATH_LOG, VOLUME_MOUNT_PATH_LOG_CONFIG, + }, + product_logging, +}; + +const GRPC: &str = "grpc"; +const HTTP: &str = "http"; + +#[derive(Snafu, Debug)] +#[allow(clippy::enum_variant_names)] +pub enum Error { + #[snafu(display("vector agent is enabled but vector aggregator ConfigMap is missing"))] + VectorAggregatorConfigMapMissing, + + #[snafu(display("spark connect object has no namespace"))] + ObjectHasNoNamespace, + + #[snafu(display("invalid config map {name}"))] + InvalidConfigMap { + source: builder::configmap::Error, + name: String, + }, + + #[snafu(display("invalid connect container name"))] + InvalidContainerName { + source: builder::pod::container::Error, + }, + + #[snafu(display("object is missing metadata to build owner reference"))] + ObjectMissingMetadataForOwnerRef { source: builder::meta::Error }, + + #[snafu(display("failed to add the logging configuration to the ConfigMap [{cm_name}]"))] + InvalidLoggingConfig { + source: product_logging::Error, + cm_name: String, + }, + + #[snafu(display("failed to configure logging"))] + ConfigureLogging { source: LoggingError }, + + #[snafu(display("server jvm security properties for spark connect {name}",))] + ServerJvmSecurityProperties { source: common::Error, name: String }, + + #[snafu(display("server metrics properties for spark connect {name}",))] + MetricsProperties { source: common::Error, name: String }, + + #[snafu(display("failed to build Labels"))] + LabelBuild { + source: stackable_operator::kvp::LabelError, + }, + + #[snafu(display("failed to build Metadata"))] + MetadataBuild { source: builder::meta::Error }, + + #[snafu(display("failed to add needed volume"))] + AddVolume { source: builder::pod::Error }, + + #[snafu(display("failed to add needed volumeMount"))] + AddVolumeMount { + source: builder::pod::container::Error, + }, + + #[snafu(display("failed build connect server jvm args for {name}"))] + ServerJvmArgs { source: common::Error, name: String }, +} + +// Assemble the configuration of the spark-connect server. +// This config map contains the following entries: +// - security.properties : with jvm dns cache ttls +// - spark-defaults.conf : with spark configuration properties +// - log4j2.properties : with logging configuration (if configured) +// - template.yaml : executor pod template +// - spark-env.sh : OMITTED because the environment variables are added directly +// to the container environment. +#[allow(clippy::result_large_err)] +pub(crate) fn server_config_map( + scs: &v1alpha1::SparkConnectServer, + config: &v1alpha1::ServerConfig, + resolved_product_image: &ResolvedProductImage, + spark_properties: &str, + executor_pod_template_spec: &str, +) -> Result { + let cm_name = object_name(&scs.name_any(), SparkConnectRole::Server); + let jvm_sec_props = common::security_properties( + scs.spec + .server + .as_ref() + .and_then(|s| s.config_overrides.get(JVM_SECURITY_PROPERTIES_FILE)), + ) + .context(ServerJvmSecurityPropertiesSnafu { + name: scs.name_unchecked(), + })?; + + let metrics_props = common::metrics_properties( + scs.spec + .server + .as_ref() + .and_then(|s| s.config_overrides.get(METRICS_PROPERTIES_FILE)), + ) + .context(MetricsPropertiesSnafu { + name: scs.name_unchecked(), + })?; + + let mut cm_builder = ConfigMapBuilder::new(); + + cm_builder + .metadata( + ObjectMetaBuilder::new() + .name_and_namespace(scs) + .name(&cm_name) + .ownerreference_from_resource(scs, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(common::labels( + scs, + &resolved_product_image.app_version_label, + &SparkConnectRole::Server.to_string(), + )) + .context(MetadataBuildSnafu)? + .build(), + ) + .add_data(SPARK_DEFAULTS_FILE_NAME, spark_properties) + .add_data(POD_TEMPLATE_FILE, executor_pod_template_spec) + .add_data(JVM_SECURITY_PROPERTIES_FILE, jvm_sec_props) + .add_data(METRICS_PROPERTIES_FILE, metrics_props); + + let role_group_ref = RoleGroupRef { + cluster: ObjectRef::from_obj(scs), + role: SparkConnectRole::Server.to_string(), + role_group: DUMMY_SPARK_CONNECT_GROUP_NAME.to_string(), + }; + product_logging::extend_config_map( + &role_group_ref, + &config.logging, + SparkConnectContainer::Spark, + SparkConnectContainer::Vector, + &mut cm_builder, + ) + .context(InvalidLoggingConfigSnafu { + cm_name: cm_name.clone(), + })?; + + cm_builder + .build() + .context(InvalidConfigMapSnafu { name: cm_name }) +} + +#[allow(clippy::result_large_err)] +pub(crate) fn build_deployment( + scs: &v1alpha1::SparkConnectServer, + config: &v1alpha1::ServerConfig, + resolved_product_image: &ResolvedProductImage, + service_account: &ServiceAccount, + config_map: &ConfigMap, + args: Vec, +) -> Result { + let metadata = ObjectMetaBuilder::new() + .with_recommended_labels(common::labels( + scs, + &resolved_product_image.app_version_label, + &SparkConnectRole::Server.to_string(), + )) + .context(MetadataBuildSnafu)? + .with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?) + .build(); + + let mut pb = PodBuilder::new(); + + pb.service_account_name(service_account.name_unchecked()) + .metadata(metadata) + .image_pull_secrets_from_product_image(resolved_product_image) + .add_volume( + VolumeBuilder::new(VOLUME_MOUNT_NAME_CONFIG) + .with_config_map(config_map.name_any()) + .build(), + ) + .context(AddVolumeSnafu)? + .add_volume( + VolumeBuilder::new(VOLUME_MOUNT_NAME_LOG) + .with_empty_dir( + None::, + Some(calculate_log_volume_size_limit(&[MAX_SPARK_LOG_FILES_SIZE])), + ) + .build(), + ) + .context(AddVolumeSnafu)? + .security_context(PodSecurityContext { + run_as_user: Some(SPARK_UID), + run_as_group: Some(0), + fs_group: Some(1000), + ..PodSecurityContext::default() + }); + + let container_env = env(scs + .spec + .server + .as_ref() + .map(|s| s.env_overrides.clone()) + .as_ref())?; + + let mut container = ContainerBuilder::new(&SparkConnectContainer::Spark.to_string()) + .context(InvalidContainerNameSnafu)?; + container + .image_from_product_image(resolved_product_image) + .resources(config.resources.clone().into()) + .command(vec![ + "/bin/bash".to_string(), + "-x".to_string(), + "-euo".to_string(), + "pipefail".to_string(), + "-c".to_string(), + ]) + .args(args) + .add_container_port(GRPC, CONNECT_GRPC_PORT) + .add_container_port(HTTP, CONNECT_UI_PORT) + .add_env_vars(container_env) + .add_volume_mount(VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_PATH_CONFIG) + .context(AddVolumeMountSnafu)? + .add_volume_mount(VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_PATH_LOG) + .context(AddVolumeMountSnafu)? + .readiness_probe(probe()) + .liveness_probe(probe()); + + // Add custom log4j config map volumes if configured + if let Some(cm_name) = config.log_config_map() { + pb.add_volume( + VolumeBuilder::new(VOLUME_MOUNT_NAME_LOG_CONFIG) + .with_config_map(cm_name) + .build(), + ) + .context(AddVolumeSnafu)?; + + container + .add_volume_mount(VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_LOG_CONFIG) + .context(AddVolumeMountSnafu)?; + } + + pb.add_container(container.build()); + + if config.logging.enable_vector_agent { + match scs.spec.vector_aggregator_config_map_name.to_owned() { + Some(vector_aggregator_config_map_name) => { + pb.add_container( + vector_container( + resolved_product_image, + VOLUME_MOUNT_NAME_CONFIG, + VOLUME_MOUNT_NAME_LOG, + config + .logging + .containers + .get(&SparkConnectContainer::Vector), + ResourceRequirementsBuilder::new() + .with_cpu_request("250m") + .with_cpu_limit("500m") + .with_memory_request("128Mi") + .with_memory_limit("128Mi") + .build(), + &vector_aggregator_config_map_name, + ) + .context(ConfigureLoggingSnafu)?, + ); + } + None => { + VectorAggregatorConfigMapMissingSnafu.fail()?; + } + } + } + + // Merge user defined pod template if available + let mut pod_template = pb.build_template(); + if let Some(pod_overrides_spec) = scs.spec.server.as_ref().map(|s| s.pod_overrides.clone()) { + pod_template.merge_from(pod_overrides_spec); + } + + Ok(Deployment { + metadata: ObjectMetaBuilder::new() + .name_and_namespace(scs) + .name(object_name(&scs.name_any(), SparkConnectRole::Server)) + .ownerreference_from_resource(scs, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(common::labels( + scs, + &resolved_product_image.app_version_label, + &SparkConnectRole::Server.to_string(), + )) + .context(MetadataBuildSnafu)? + .build(), + spec: Some(DeploymentSpec { + template: pod_template, + replicas: Some(1), + selector: LabelSelector { + match_labels: Some( + Labels::role_group_selector( + scs, + APP_NAME, + &SparkConnectRole::Server.to_string(), + DUMMY_SPARK_CONNECT_GROUP_NAME, + ) + .context(LabelBuildSnafu)? + .into(), + ), + ..LabelSelector::default() + }, + ..DeploymentSpec::default() + }), + ..Deployment::default() + }) +} + +#[allow(clippy::result_large_err)] +pub(crate) fn build_service( + scs: &v1alpha1::SparkConnectServer, + app_version_label: &str, + service_cluster_ip: Option, +) -> Result { + let (service_name, service_type, publish_not_ready_addresses) = match service_cluster_ip.clone() + { + Some(_) => ( + // These are the properties of the headless driver service used for the internal + // communication with the executors as recommended by the Spark docs. + // + // The flag `publish_not_ready_addresses` *must* be `true` to allow for readiness + // probes. Without it, the driver runs into a deadlock beacuse the Pod cannot become + // "ready" until the Service is "ready" and vice versa. + object_name(&scs.name_any(), SparkConnectRole::Server), + "ClusterIP".to_string(), + Some(true), + ), + None => ( + format!( + "{}-{}", + object_name(&scs.name_any(), SparkConnectRole::Server), + SparkConnectRole::Server + ), + scs.spec.cluster_config.listener_class.k8s_service_type(), + Some(false), + ), + }; + + let selector = Labels::role_selector(scs, APP_NAME, &SparkConnectRole::Server.to_string()) + .context(LabelBuildSnafu)? + .into(); + + Ok(Service { + metadata: ObjectMetaBuilder::new() + .name_and_namespace(scs) + .name(service_name) + .ownerreference_from_resource(scs, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(common::labels( + scs, + app_version_label, + &SparkConnectRole::Server.to_string(), + )) + .context(MetadataBuildSnafu)? + .with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?) + .build(), + spec: Some(ServiceSpec { + type_: Some(service_type), + cluster_ip: service_cluster_ip, + ports: Some(vec![ + ServicePort { + name: Some(String::from(GRPC)), + port: CONNECT_GRPC_PORT, + ..ServicePort::default() + }, + ServicePort { + name: Some(String::from(HTTP)), + port: CONNECT_UI_PORT, + ..ServicePort::default() + }, + ]), + selector: Some(selector), + publish_not_ready_addresses, + ..ServiceSpec::default() + }), + status: None, + }) +} + +#[allow(clippy::result_large_err)] +pub(crate) fn command_args(user_args: &[String]) -> Vec { + let mut command = vec![ + // ---------- start containerdebug + format!( + "containerdebug --output={VOLUME_MOUNT_PATH_LOG}/containerdebug-state.json --loop &" + ), + // ---------- start spark connect server + "/stackable/spark/sbin/start-connect-server.sh".to_string(), + "--deploy-mode client".to_string(), // 'cluster' mode not supported + "--master k8s://https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT_HTTPS}" + .to_string(), + format!("--properties-file {VOLUME_MOUNT_PATH_CONFIG}/{SPARK_DEFAULTS_FILE_NAME}"), + ]; + + // User provided command line arguments + command.extend_from_slice(user_args); + + vec![command.join(" ")] +} + +#[allow(clippy::result_large_err)] +fn env(env_overrides: Option<&HashMap>) -> Result, Error> { + let mut envs = BTreeMap::from([ + // Needed by the `containerdebug` running in the background of the connect container + // to log its tracing information to. + ( + "CONTAINERDEBUG_LOG_DIRECTORY".to_string(), + format!("{VOLUME_MOUNT_PATH_LOG}/containerdebug"), + ), + // This env var prevents the connect server from detaching itself from the + // start script because this leads to the Pod terminating immediately. + ("SPARK_NO_DAEMONIZE".to_string(), "true".to_string()), + ]); + + // Add env overrides + if let Some(user_env) = env_overrides { + envs.extend(user_env.clone()); + } + + Ok(envs + .into_iter() + .map(|(name, value)| EnvVar { + name: name.to_owned(), + value: Some(value.to_owned()), + value_from: None, + }) + .collect()) +} + +// Returns the contents of the spark properties file. +// It merges operator properties with user properties. +#[allow(clippy::result_large_err)] +pub(crate) fn server_properties( + scs: &v1alpha1::SparkConnectServer, + config: &v1alpha1::ServerConfig, + driver_service: &Service, + service_account: &ServiceAccount, + resolved_product_image: &ResolvedProductImage, +) -> Result>, Error> { + let spark_image = resolved_product_image.image.clone(); + let spark_version = resolved_product_image.product_version.clone(); + let service_account_name = service_account.name_unchecked(); + let namespace = driver_service + .namespace() + .context(ObjectHasNoNamespaceSnafu)?; + + let config_overrides = scs + .spec + .server + .as_ref() + .and_then(|s| s.config_overrides.get(SPARK_DEFAULTS_FILE_NAME)); + + let mut result: BTreeMap> = [ + // This needs to match the name of the headless service for the executors to be able + // to connect back to the driver. + ( + "spark.driver.host".to_string(), + Some(driver_service.name_any()), + ), + ( + "spark.kubernetes.driver.container.image".to_string(), + Some(spark_image.clone()), + ), + ("spark.kubernetes.namespace".to_string(), Some(namespace)), + ( + "spark.kubernetes.authenticate.driver.serviceAccountName".to_string(), + Some(service_account_name), + ), + ( + "spark.kubernetes.driver.pod.name".to_string(), + Some("${env:HOSTNAME}".to_string()), + ), + ( + "spark.driver.defaultJavaOptions".to_string(), + Some(server_jvm_args(scs, config)?), + ), + ( + "spark.driver.extraClassPath".to_string(), + Some(format!("/stackable/spark/extra-jars/*:/stackable/spark/connect/spark-connect_2.12-{spark_version}.jar")), + ), + ( + "spark.metrics.conf".to_string(), + Some(format!("{VOLUME_MOUNT_PATH_CONFIG}/{METRICS_PROPERTIES_FILE}")), + ), + // This enables the "/metrics/executors/prometheus" endpoint on the server pod. + // The driver collects metrics from the executors and makes them available here. + // The "/metrics/prometheus" endpoint delievers the driver metrics. + ( + "spark.ui.prometheus.enabled".to_string(), + Some("true".to_string()), + ), + + ] + .into(); + + if let Some(user_config) = config_overrides { + result.extend( + user_config + .iter() + .map(|(k, v)| (k.clone(), Some(v.clone()))), + ); + } + Ok(result) +} + +fn server_jvm_args( + scs: &v1alpha1::SparkConnectServer, + config: &v1alpha1::ServerConfig, +) -> Result { + let mut jvm_args = vec![format!( + "-Djava.security.properties={VOLUME_MOUNT_PATH_CONFIG}/{JVM_SECURITY_PROPERTIES_FILE}" + )]; + + if config.log_config_map().is_some() { + jvm_args.push(format!( + "-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}" + )); + } + + common::jvm_args( + &jvm_args, + scs.spec + .server + .as_ref() + .map(|s| &s.product_specific_common_config), + ) + .context(ServerJvmArgsSnafu { + name: scs.name_any(), + }) +} + +fn probe() -> Probe { + Probe { + http_get: Some(HTTPGetAction { + port: IntOrString::Int(CONNECT_UI_PORT), + scheme: Some("HTTP".to_string()), + path: Some("/metrics/prometheus".to_string()), + ..Default::default() + }), + failure_threshold: Some(10), + ..Probe::default() + } +} diff --git a/rust/operator-binary/src/crd/constants.rs b/rust/operator-binary/src/crd/constants.rs index 9b7f3d63..f25eb254 100644 --- a/rust/operator-binary/src/crd/constants.rs +++ b/rust/operator-binary/src/crd/constants.rs @@ -33,6 +33,7 @@ pub const VOLUME_MOUNT_PATH_LOG: &str = "/stackable/log"; pub const LOG4J2_CONFIG_FILE: &str = "log4j2.properties"; pub const JVM_SECURITY_PROPERTIES_FILE: &str = "security.properties"; +pub const METRICS_PROPERTIES_FILE: &str = "metrics.properties"; pub const ACCESS_KEY_ID: &str = "accessKey"; pub const SECRET_ACCESS_KEY: &str = "secretKey"; diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index fa6249e3..340ff5fa 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -190,7 +190,7 @@ pub struct SparkApplicationSpec { /// The executor role specifies the configuration that, together with the driver pod template, is used by /// Spark to create the executor pods. - /// This is RoleGroup instead of plain CommonConfiguration because it needs to allows for the number of replicas. + /// This is RoleGroup instead of plain CommonConfiguration because it needs to allow for the number of replicas. /// to be specified. #[serde(default, skip_serializing_if = "Option::is_none")] pub executor: Option>, diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index 643fd7cc..bc3910f2 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -1,14 +1,15 @@ use std::sync::Arc; use clap::Parser; -use futures::{StreamExt, pin_mut}; +use connect::crd::{CONNECT_FULL_CONTROLLER_NAME, SparkConnectServer}; +use futures::{StreamExt, pin_mut, select}; use history::history_controller; use product_config::ProductConfigManager; use stackable_operator::{ YamlSchema, cli::{Command, ProductOperatorRun}, k8s_openapi::api::{ - apps::v1::StatefulSet, + apps::v1::{Deployment, StatefulSet}, core::v1::{ConfigMap, Pod, Service}, }, kube::{ @@ -36,6 +37,7 @@ use crate::crd::{ }; mod config; +mod connect; mod crd; mod history; mod pod_driver_controller; @@ -71,6 +73,8 @@ async fn main() -> anyhow::Result<()> { .print_yaml_schema(built_info::PKG_VERSION, SerializeOptions::default())?; SparkHistoryServer::merged_crd(SparkHistoryServer::V1Alpha1)? .print_yaml_schema(built_info::PKG_VERSION, SerializeOptions::default())?; + SparkConnectServer::merged_crd(SparkConnectServer::V1Alpha1)? + .print_yaml_schema(built_info::PKG_VERSION, SerializeOptions::default())?; } Command::Run(ProductOperatorRun { product_config, @@ -243,13 +247,82 @@ async fn main() -> anyhow::Result<()> { }, ); - pin_mut!(app_controller, pod_driver_controller, history_controller); - // kube-runtime's Controller will tokio::spawn each reconciliation, so this only concerns the internal watch machinery - futures::future::select( - futures::future::select(app_controller, pod_driver_controller), - history_controller, + // ============================== + // Create new object because Ctx cannot be cloned + let ctx = Ctx { + client: client.clone(), + product_config: product_config.load(&PRODUCT_CONFIG_PATHS)?, + }; + let connect_event_recorder = + Arc::new(Recorder::new(client.as_kube_client(), Reporter { + controller: CONNECT_FULL_CONTROLLER_NAME.to_string(), + instance: None, + })); + let connect_controller = Controller::new( + watch_namespace + .get_api::>( + &client, + ), + watcher::Config::default(), + ) + .owns( + watch_namespace + .get_api::>( + &client, + ), + watcher::Config::default(), + ) + .owns( + watch_namespace.get_api::>(&client), + watcher::Config::default(), + ) + .owns( + watch_namespace.get_api::>(&client), + watcher::Config::default(), ) - .await; + .owns( + watch_namespace.get_api::>(&client), + watcher::Config::default(), + ) + .shutdown_on_signal() + .run( + connect::controller::reconcile, + connect::controller::error_policy, + Arc::new(ctx), + ) + .instrument(info_span!("connect_controller")) + // We can let the reporting happen in the background + .for_each_concurrent( + 16, // concurrency limit + |result| { + // The event_recorder needs to be shared across all invocations, so that + // events are correctly aggregated + let connect_event_recorder = connect_event_recorder.clone(); + async move { + report_controller_reconciled( + &connect_event_recorder, + CONNECT_FULL_CONTROLLER_NAME, + &result, + ) + .await; + } + }, + ); + + // + pin_mut!( + app_controller, + pod_driver_controller, + history_controller, + connect_controller + ); + // kube-runtime's Controller will tokio::spawn each reconciliation, so this only concerns the internal watch machinery + select! { + r1 = app_controller => r1, + r2 = pod_driver_controller => r2, + r3 = history_controller => r3, + r4 = connect_controller => r4, + }; } } Ok(()) diff --git a/tests/templates/kuttl/spark-connect/00-assert.yaml b/tests/templates/kuttl/spark-connect/00-assert.yaml new file mode 100644 index 00000000..5baf8caa --- /dev/null +++ b/tests/templates/kuttl/spark-connect/00-assert.yaml @@ -0,0 +1,9 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 900 +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: integration-tests-sa diff --git a/tests/templates/kuttl/spark-connect/00-patch-ns.yaml.j2 b/tests/templates/kuttl/spark-connect/00-patch-ns.yaml.j2 new file mode 100644 index 00000000..67185acf --- /dev/null +++ b/tests/templates/kuttl/spark-connect/00-patch-ns.yaml.j2 @@ -0,0 +1,9 @@ +{% if test_scenario['values']['openshift'] == 'true' %} +# see https://github.com/stackabletech/issues/issues/566 +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: kubectl patch namespace $NAMESPACE -p '{"metadata":{"labels":{"pod-security.kubernetes.io/enforce":"privileged"}}}' + timeout: 120 +{% endif %} diff --git a/tests/templates/kuttl/spark-connect/00-serviceaccount.yaml.j2 b/tests/templates/kuttl/spark-connect/00-serviceaccount.yaml.j2 new file mode 100644 index 00000000..9cbf0351 --- /dev/null +++ b/tests/templates/kuttl/spark-connect/00-serviceaccount.yaml.j2 @@ -0,0 +1,29 @@ +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: use-integration-tests-scc +rules: +{% if test_scenario['values']['openshift'] == "true" %} + - apiGroups: ["security.openshift.io"] + resources: ["securitycontextconstraints"] + resourceNames: ["privileged"] + verbs: ["use"] +{% endif %} +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: integration-tests-sa +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: use-integration-tests-scc +subjects: + - kind: ServiceAccount + name: integration-tests-sa +roleRef: + kind: Role + name: use-integration-tests-scc + apiGroup: rbac.authorization.k8s.io diff --git a/tests/templates/kuttl/spark-connect/01-assert.yaml.j2 b/tests/templates/kuttl/spark-connect/01-assert.yaml.j2 new file mode 100644 index 00000000..50b1d4c3 --- /dev/null +++ b/tests/templates/kuttl/spark-connect/01-assert.yaml.j2 @@ -0,0 +1,10 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +{% endif %} diff --git a/tests/templates/kuttl/spark-connect/01-install-vector-aggregator-discovery-configmap.yaml.j2 b/tests/templates/kuttl/spark-connect/01-install-vector-aggregator-discovery-configmap.yaml.j2 new file mode 100644 index 00000000..2d6a0df5 --- /dev/null +++ b/tests/templates/kuttl/spark-connect/01-install-vector-aggregator-discovery-configmap.yaml.j2 @@ -0,0 +1,9 @@ +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +data: + ADDRESS: {{ lookup('env', 'VECTOR_AGGREGATOR') }} +{% endif %} diff --git a/tests/templates/kuttl/spark-connect/10-assert.yaml b/tests/templates/kuttl/spark-connect/10-assert.yaml new file mode 100644 index 00000000..85072f7b --- /dev/null +++ b/tests/templates/kuttl/spark-connect/10-assert.yaml @@ -0,0 +1,23 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 900 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: spark-connect-server +status: + readyReplicas: 1 +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 90 +commands: + # Test that spark connect executors are running. + # Sleep to prevent the following spark connect app from failing + # while the spark-connect server is busy setting up the executors. + - script: | + sleep 10 + EXECUTOR_COUNT=$(kubectl get pods -n $NAMESPACE --selector 'spark-app-name=spark-connect-server' --field-selector='status.phase=Running' -o NAME|wc -l) + test 1 -eq "$EXECUTOR_COUNT" diff --git a/tests/templates/kuttl/spark-connect/10-deploy-spark-connect.yaml.j2 b/tests/templates/kuttl/spark-connect/10-deploy-spark-connect.yaml.j2 new file mode 100644 index 00000000..3e6464b0 --- /dev/null +++ b/tests/templates/kuttl/spark-connect/10-deploy-spark-connect.yaml.j2 @@ -0,0 +1,77 @@ +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: spark-connect-log-config +data: + log4j2.properties: |- + appenders = CONSOLE + + appender.CONSOLE.type = Console + appender.CONSOLE.name = CONSOLE + appender.CONSOLE.target = SYSTEM_ERR + appender.CONSOLE.layout.type = PatternLayout + appender.CONSOLE.layout.pattern = %d{ISO8601} %p [%t] %c - %m%n + appender.CONSOLE.filter.threshold.type = ThresholdFilter + appender.CONSOLE.filter.threshold.level = DEBUG + + rootLogger.level=INFO + rootLogger.appenderRefs = CONSOLE + rootLogger.appenderRef.CONSOLE.ref = CONSOLE +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkConnectServer +metadata: + name: spark-connect +spec: + image: +{% if test_scenario['values']['spark-connect'].find(",") > 0 %} + custom: "{{ test_scenario['values']['spark-connect'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['spark-connect'].split(',')[0] }}" +{% else %} + productVersion: "{{ test_scenario['values']['spark-connect'] }}" +{% endif %} + pullPolicy: IfNotPresent +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} + args: + - --packages org.apache.iceberg:iceberg-spark-runtime-{{ ".".join(test_scenario['values']['spark-connect'].split('.')[:2]) }}_2.12:1.8.1 + server: + podOverrides: + spec: + containers: + - name: spark + env: + - name: DEMO_GREETING + value: "Hello from the overlords" + jvmArgumentOverrides: + add: + - -Dmy.custom.jvm.arg=customValue + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + containers: + spark: + custom: + configMap: spark-connect-log-config + configOverrides: + spark-defaults.conf: + spark.jars.ivy: /tmp/ivy2 + spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions + spark.sql.catalog.local: org.apache.iceberg.spark.SparkCatalog + spark.sql.catalog.local.type: hadoop + spark.sql.catalog.local.warehouse: /tmp/warehouse + spark.sql.defaultCatalog: local + executor: + configOverrides: + spark-defaults.conf: + spark.executor.instances: "1" + spark.executor.memoryOverhead: "1m" + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + containers: + spark: + custom: + configMap: spark-connect-log-config diff --git a/tests/templates/kuttl/spark-connect/20-assert.yaml b/tests/templates/kuttl/spark-connect/20-assert.yaml new file mode 100644 index 00000000..b73cfea5 --- /dev/null +++ b/tests/templates/kuttl/spark-connect/20-assert.yaml @@ -0,0 +1,13 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: simple-connect-app +timeout: 180 +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: simple-connect-app +status: + succeeded: 1 diff --git a/tests/templates/kuttl/spark-connect/20-run-connect-client.yaml.j2 b/tests/templates/kuttl/spark-connect/20-run-connect-client.yaml.j2 new file mode 100644 index 00000000..99f97902 --- /dev/null +++ b/tests/templates/kuttl/spark-connect/20-run-connect-client.yaml.j2 @@ -0,0 +1,33 @@ +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: simple-connect-app + labels: + stackable.tech/vendor: Stackable +spec: + template: + spec: + restartPolicy: OnFailure + activeDeadlineSeconds: 100 + containers: + - name: simple-connect-app +{% if test_scenario['values']['spark-connect-client'].find(",") > 0 %} + image: "{{ test_scenario['values']['spark-connect-client'].split(',')[1] }}" +{% else %} + image: oci.stackable.tech/sdp/spark-connect-client:{{ test_scenario['values']['spark-connect-client'] }}-stackable0.0.0-dev +{% endif %} + imagePullPolicy: IfNotPresent + command: + [ + "/usr/bin/python", + "/stackable/spark-connect-examples/python/simple-connect-app.py", + "sc://spark-connect-server", + ] + resources: + limits: + cpu: 500m + memory: 128Mi + requests: + cpu: 200m + memory: 128Mi diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index 70db06c4..3ff1ebe6 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -14,6 +14,14 @@ dimensions: values: - 3.5.5 # - 3.5.5,oci.stackable.tech/sandbox/spark-k8s:3.5.5-stackable0.0.0-dev + - name: spark-connect + values: + - 3.5.5 + # - 3.5.5,oci.stackable.tech/sandbox/spark-k8s:3.5.5-stackable0.0.0-dev + - name: spark-connect-client + values: + - 3.5.5 + # - 3.5.5,oci.stackable.tech/sandbox/spark-k8s:3.5.5-stackable0.0.0-dev - name: hbase values: - 2.6.1 @@ -106,6 +114,12 @@ tests: - hdfs-latest - zookeeper-latest - openshift + - name: spark-connect + dimensions: + - spark-connect + - spark-connect-client + - openshift + suites: - name: nightly patch: