diff --git a/CHANGELOG.md b/CHANGELOG.md index 63f63c43..9a0c5aa4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,11 +4,19 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Added + +- Create and manage history servers ([#187]) + +[#187]: https://github.com/stackabletech/spark-k8s-operator/pull/187 + ### Changed - Updated stackable image versions ([#176]) - `operator-rs` `0.22.0` → `0.27.1` ([#178]) +- `operator-rs` `0.27.1` -> `0.30.2` ([#187]) - Don't run init container as root and avoid chmod and chowning ([#183]) +- [BREAKING] Implement fix for S3 reference inconsistency as described in the issue #162 ([#187]) [#176]: https://github.com/stackabletech/spark-k8s-operator/pull/176 [#178]: https://github.com/stackabletech/spark-k8s-operator/pull/178 @@ -43,7 +51,6 @@ All notable changes to this project will be documented in this file. - Update RBAC properties for OpenShift compatibility ([#126]). [#112]: https://github.com/stackabletech/spark-k8s-operator/pull/112 -[#114]: https://github.com/stackabletech/spark-k8s-operator/pull/114 [#126]: https://github.com/stackabletech/spark-k8s-operator/pull/126 ## [0.4.0] - 2022-08-03 diff --git a/Cargo.lock b/Cargo.lock index d0d89307..c25bb246 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1793,8 +1793,8 @@ dependencies = [ [[package]] name = "stackable-operator" -version = "0.27.1" -source = "git+https://github.com/stackabletech/operator-rs.git?tag=0.27.1#c470ea5de96c0f4081e77fd7c8ce197ecebbd406" +version = "0.30.2" +source = "git+https://github.com/stackabletech/operator-rs.git?tag=0.30.2#133db3918fb3af191a4203106a5056d77cc9579f" dependencies = [ "chrono", "clap", @@ -1827,8 +1827,8 @@ dependencies = [ [[package]] name = "stackable-operator-derive" -version = "0.27.1" -source = "git+https://github.com/stackabletech/operator-rs.git?tag=0.27.1#c470ea5de96c0f4081e77fd7c8ce197ecebbd406" +version = "0.30.2" +source = "git+https://github.com/stackabletech/operator-rs.git?tag=0.30.2#133db3918fb3af191a4203106a5056d77cc9579f" dependencies = [ "darling", "proc-macro2", diff --git a/deploy/helm/spark-k8s-operator/crds/crds.yaml b/deploy/helm/spark-k8s-operator/crds/crds.yaml index 51d6aba3..d8f3e9f7 100644 --- a/deploy/helm/spark-k8s-operator/crds/crds.yaml +++ b/deploy/helm/spark-k8s-operator/crds/crds.yaml @@ -616,6 +616,138 @@ spec: type: object type: object type: object + logFileDirectory: + nullable: true + oneOf: + - required: + - s3 + properties: + s3: + properties: + bucket: + description: Operators are expected to define fields for this type in order to work with S3 buckets. + oneOf: + - required: + - inline + - required: + - reference + properties: + inline: + description: S3 bucket specification containing only the bucket name and an inlined or referenced connection specification. + properties: + bucketName: + nullable: true + type: string + connection: + description: Operators are expected to define fields for this type in order to work with S3 connections. + nullable: true + oneOf: + - required: + - inline + - required: + - reference + properties: + inline: + description: S3 connection definition as CRD. + properties: + accessStyle: + description: Which access style to use. Defaults to virtual hosted-style as most of the data products out there. Have a look at the official documentation on + enum: + - Path + - VirtualHosted + nullable: true + type: string + credentials: + description: If the S3 uses authentication you have to specify you S3 credentials. In the most cases a SecretClass providing `accessKey` and `secretKey` is sufficient. + nullable: true + properties: + scope: + description: '[Scope](https://docs.stackable.tech/secret-operator/scope.html) of the [SecretClass](https://docs.stackable.tech/secret-operator/secretclass.html)' + nullable: true + properties: + node: + default: false + type: boolean + pod: + default: false + type: boolean + services: + default: [] + items: + type: string + type: array + type: object + secretClass: + description: '[SecretClass](https://docs.stackable.tech/secret-operator/secretclass.html) containing the LDAP bind credentials' + type: string + required: + - secretClass + type: object + host: + description: Hostname of the S3 server without any protocol or port + nullable: true + type: string + port: + description: Port the S3 server listens on. If not specified the products will determine the port to use. + format: uint16 + minimum: 0.0 + nullable: true + type: integer + tls: + description: If you want to use TLS when talking to S3 you can enable TLS encrypted communication with this setting. + nullable: true + properties: + verification: + description: The verification method used to verify the certificates of the server and/or the client + oneOf: + - required: + - none + - required: + - server + properties: + none: + description: Use TLS but don't verify certificates + type: object + server: + description: Use TLS and ca certificate to verify the server + properties: + caCert: + description: Ca cert to verify the server + oneOf: + - required: + - webPki + - required: + - secretClass + properties: + secretClass: + description: Name of the SecretClass which will provide the ca cert. Note that a SecretClass does not need to have a key but can also work with just a ca cert. So if you got provided with a ca cert but don't have access to the key you can still use this method. + type: string + webPki: + description: Use TLS and the ca certificates trusted by the common web browsers to verify the server. This can be useful when you e.g. use public AWS S3 or other public available services. + type: object + type: object + required: + - caCert + type: object + type: object + required: + - verification + type: object + type: object + reference: + type: string + type: object + type: object + reference: + type: string + type: object + prefix: + type: string + required: + - bucket + - prefix + type: object + type: object mainApplicationFile: nullable: true type: string @@ -625,8 +757,8 @@ spec: mode: nullable: true type: string - s3bucket: - description: Operators are expected to define fields for this type in order to work with S3 buckets. + s3connection: + description: Operators are expected to define fields for this type in order to work with S3 connections. nullable: true oneOf: - required: @@ -635,109 +767,90 @@ spec: - reference properties: inline: - description: S3 bucket specification containing only the bucket name and an inlined or referenced connection specification. + description: S3 connection definition as CRD. properties: - bucketName: + accessStyle: + description: Which access style to use. Defaults to virtual hosted-style as most of the data products out there. Have a look at the official documentation on + enum: + - Path + - VirtualHosted nullable: true type: string - connection: - description: Operators are expected to define fields for this type in order to work with S3 connections. + credentials: + description: If the S3 uses authentication you have to specify you S3 credentials. In the most cases a SecretClass providing `accessKey` and `secretKey` is sufficient. nullable: true - oneOf: - - required: - - inline - - required: - - reference properties: - inline: - description: S3 connection definition as CRD. + scope: + description: '[Scope](https://docs.stackable.tech/secret-operator/scope.html) of the [SecretClass](https://docs.stackable.tech/secret-operator/secretclass.html)' + nullable: true properties: - accessStyle: - description: Which access style to use. Defaults to virtual hosted-style as most of the data products out there. Have a look at the official documentation on - enum: - - Path - - VirtualHosted - nullable: true - type: string - credentials: - description: If the S3 uses authentication you have to specify you S3 credentials. In the most cases a SecretClass providing `accessKey` and `secretKey` is sufficient. - nullable: true - properties: - scope: - description: '[Scope](https://docs.stackable.tech/secret-operator/scope.html) of the [SecretClass](https://docs.stackable.tech/secret-operator/secretclass.html)' - nullable: true - properties: - node: - default: false - type: boolean - pod: - default: false - type: boolean - services: - default: [] - items: - type: string - type: array - type: object - secretClass: - description: '[SecretClass](https://docs.stackable.tech/secret-operator/secretclass.html) containing the LDAP bind credentials' - type: string - required: - - secretClass + node: + default: false + type: boolean + pod: + default: false + type: boolean + services: + default: [] + items: + type: string + type: array + type: object + secretClass: + description: '[SecretClass](https://docs.stackable.tech/secret-operator/secretclass.html) containing the LDAP bind credentials' + type: string + required: + - secretClass + type: object + host: + description: Hostname of the S3 server without any protocol or port + nullable: true + type: string + port: + description: Port the S3 server listens on. If not specified the products will determine the port to use. + format: uint16 + minimum: 0.0 + nullable: true + type: integer + tls: + description: If you want to use TLS when talking to S3 you can enable TLS encrypted communication with this setting. + nullable: true + properties: + verification: + description: The verification method used to verify the certificates of the server and/or the client + oneOf: + - required: + - none + - required: + - server + properties: + none: + description: Use TLS but don't verify certificates type: object - host: - description: Hostname of the S3 server without any protocol or port - nullable: true - type: string - port: - description: Port the S3 server listens on. If not specified the products will determine the port to use. - format: uint16 - minimum: 0.0 - nullable: true - type: integer - tls: - description: If you want to use TLS when talking to S3 you can enable TLS encrypted communication with this setting. - nullable: true + server: + description: Use TLS and ca certificate to verify the server properties: - verification: - description: The verification method used to verify the certificates of the server and/or the client + caCert: + description: Ca cert to verify the server oneOf: - required: - - none + - webPki - required: - - server + - secretClass properties: - none: - description: Use TLS but don't verify certificates - type: object - server: - description: Use TLS and ca certificate to verify the server - properties: - caCert: - description: Ca cert to verify the server - oneOf: - - required: - - webPki - - required: - - secretClass - properties: - secretClass: - description: Name of the SecretClass which will provide the ca cert. Note that a SecretClass does not need to have a key but can also work with just a ca cert. So if you got provided with a ca cert but don't have access to the key you can still use this method. - type: string - webPki: - description: Use TLS and the ca certificates trusted by the common web browsers to verify the server. This can be useful when you e.g. use public AWS S3 or other public available services. - type: object - type: object - required: - - caCert + secretClass: + description: Name of the SecretClass which will provide the ca cert. Note that a SecretClass does not need to have a key but can also work with just a ca cert. So if you got provided with a ca cert but don't have access to the key you can still use this method. + type: string + webPki: + description: Use TLS and the ca certificates trusted by the common web browsers to verify the server. This can be useful when you e.g. use public AWS S3 or other public available services. type: object type: object required: - - verification + - caCert type: object type: object - reference: - type: string + required: + - verification type: object type: object reference: @@ -1913,3 +2026,571 @@ spec: storage: true subresources: status: {} +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: sparkhistoryservers.spark.stackable.tech + annotations: + helm.sh/resource-policy: keep +spec: + group: spark.stackable.tech + names: + categories: [] + kind: SparkHistoryServer + plural: sparkhistoryservers + shortNames: + - shs + singular: sparkhistoryserver + scope: Namespaced + versions: + - additionalPrinterColumns: [] + name: v1alpha1 + schema: + openAPIV3Schema: + description: Auto-generated derived type for SparkHistoryServerSpec via `CustomResource` + properties: + spec: + properties: + image: + anyOf: + - required: + - custom + - productVersion + - required: + - productVersion + - stackableVersion + properties: + custom: + description: Overwrite the docker image. Specify the full docker image name, e.g. `docker.stackable.tech/stackable/superset:1.4.1-stackable2.1.0` + type: string + productVersion: + description: Version of the product, e.g. `1.4.1`. + type: string + pullPolicy: + default: IfNotPresent + description: '[Pull policy](https://kubernetes.io/docs/concepts/containers/images/#image-pull-policy) used when pulling the Images' + enum: + - IfNotPresent + - Always + - Never + type: string + pullSecrets: + description: '[Image pull secrets](https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod) to pull images from a private registry' + items: + description: LocalObjectReference contains enough information to let you locate the referenced object inside the same namespace. + properties: + name: + description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' + type: string + type: object + nullable: true + type: array + repo: + description: Name of the docker repo, e.g. `docker.stackable.tech/stackable` + nullable: true + type: string + stackableVersion: + description: Stackable version of the product, e.g. 2.1.0 + type: string + type: object + logFileDirectory: + oneOf: + - required: + - s3 + properties: + s3: + properties: + bucket: + description: Operators are expected to define fields for this type in order to work with S3 buckets. + oneOf: + - required: + - inline + - required: + - reference + properties: + inline: + description: S3 bucket specification containing only the bucket name and an inlined or referenced connection specification. + properties: + bucketName: + nullable: true + type: string + connection: + description: Operators are expected to define fields for this type in order to work with S3 connections. + nullable: true + oneOf: + - required: + - inline + - required: + - reference + properties: + inline: + description: S3 connection definition as CRD. + properties: + accessStyle: + description: Which access style to use. Defaults to virtual hosted-style as most of the data products out there. Have a look at the official documentation on + enum: + - Path + - VirtualHosted + nullable: true + type: string + credentials: + description: If the S3 uses authentication you have to specify you S3 credentials. In the most cases a SecretClass providing `accessKey` and `secretKey` is sufficient. + nullable: true + properties: + scope: + description: '[Scope](https://docs.stackable.tech/secret-operator/scope.html) of the [SecretClass](https://docs.stackable.tech/secret-operator/secretclass.html)' + nullable: true + properties: + node: + default: false + type: boolean + pod: + default: false + type: boolean + services: + default: [] + items: + type: string + type: array + type: object + secretClass: + description: '[SecretClass](https://docs.stackable.tech/secret-operator/secretclass.html) containing the LDAP bind credentials' + type: string + required: + - secretClass + type: object + host: + description: Hostname of the S3 server without any protocol or port + nullable: true + type: string + port: + description: Port the S3 server listens on. If not specified the products will determine the port to use. + format: uint16 + minimum: 0.0 + nullable: true + type: integer + tls: + description: If you want to use TLS when talking to S3 you can enable TLS encrypted communication with this setting. + nullable: true + properties: + verification: + description: The verification method used to verify the certificates of the server and/or the client + oneOf: + - required: + - none + - required: + - server + properties: + none: + description: Use TLS but don't verify certificates + type: object + server: + description: Use TLS and ca certificate to verify the server + properties: + caCert: + description: Ca cert to verify the server + oneOf: + - required: + - webPki + - required: + - secretClass + properties: + secretClass: + description: Name of the SecretClass which will provide the ca cert. Note that a SecretClass does not need to have a key but can also work with just a ca cert. So if you got provided with a ca cert but don't have access to the key you can still use this method. + type: string + webPki: + description: Use TLS and the ca certificates trusted by the common web browsers to verify the server. This can be useful when you e.g. use public AWS S3 or other public available services. + type: object + type: object + required: + - caCert + type: object + type: object + required: + - verification + type: object + type: object + reference: + type: string + type: object + type: object + reference: + type: string + type: object + prefix: + type: string + required: + - bucket + - prefix + type: object + type: object + nodes: + properties: + cliOverrides: + additionalProperties: + type: string + default: {} + type: object + config: + default: {} + properties: + cleaner: + nullable: true + type: boolean + resources: + default: + memory: + limit: null + runtimeLimits: {} + cpu: + min: null + max: null + storage: {} + properties: + cpu: + default: + min: null + max: null + properties: + max: + description: |- + Quantity is a fixed-point representation of a number. It provides convenient marshaling/unmarshaling in JSON and YAML, in addition to String() and AsInt64() accessors. + + The serialization format is: + + ::= + (Note that may be empty, from the "" case in .) + ::= 0 | 1 | ... | 9 ::= | ::= | . | . | . ::= "+" | "-" ::= | ::= | | ::= Ki | Mi | Gi | Ti | Pi | Ei + (International System of units; See: http://physics.nist.gov/cuu/Units/binary.html) + ::= m | "" | k | M | G | T | P | E + (Note that 1024 = 1Ki but 1000 = 1k; I didn't choose the capitalization.) + ::= "e" | "E" + + No matter which of the three exponent forms is used, no quantity may represent a number greater than 2^63-1 in magnitude, nor may it have more than 3 decimal places. Numbers larger or more precise will be capped or rounded up. (E.g.: 0.1m will rounded up to 1m.) This may be extended in the future if we require larger or smaller quantities. + + When a Quantity is parsed from a string, it will remember the type of suffix it had, and will use the same type again when it is serialized. + + Before serializing, Quantity will be put in "canonical form". This means that Exponent/suffix will be adjusted up or down (with a corresponding increase or decrease in Mantissa) such that: + a. No precision is lost + b. No fractional digits will be emitted + c. The exponent (or suffix) is as large as possible. + The sign will be omitted unless the number is negative. + + Examples: + 1.5 will be serialized as "1500m" + 1.5Gi will be serialized as "1536Mi" + + Note that the quantity will NEVER be internally represented by a floating point number. That is the whole point of this exercise. + + Non-canonical values will still parse as long as they are well formed, but will be re-emitted in their canonical form. (So always use canonical form, or don't diff.) + + This format is intended to make it difficult to use these numbers without writing some sort of special handling code in the hopes that that will cause implementors to also use a fixed point implementation. + nullable: true + type: string + min: + description: |- + Quantity is a fixed-point representation of a number. It provides convenient marshaling/unmarshaling in JSON and YAML, in addition to String() and AsInt64() accessors. + + The serialization format is: + + ::= + (Note that may be empty, from the "" case in .) + ::= 0 | 1 | ... | 9 ::= | ::= | . | . | . ::= "+" | "-" ::= | ::= | | ::= Ki | Mi | Gi | Ti | Pi | Ei + (International System of units; See: http://physics.nist.gov/cuu/Units/binary.html) + ::= m | "" | k | M | G | T | P | E + (Note that 1024 = 1Ki but 1000 = 1k; I didn't choose the capitalization.) + ::= "e" | "E" + + No matter which of the three exponent forms is used, no quantity may represent a number greater than 2^63-1 in magnitude, nor may it have more than 3 decimal places. Numbers larger or more precise will be capped or rounded up. (E.g.: 0.1m will rounded up to 1m.) This may be extended in the future if we require larger or smaller quantities. + + When a Quantity is parsed from a string, it will remember the type of suffix it had, and will use the same type again when it is serialized. + + Before serializing, Quantity will be put in "canonical form". This means that Exponent/suffix will be adjusted up or down (with a corresponding increase or decrease in Mantissa) such that: + a. No precision is lost + b. No fractional digits will be emitted + c. The exponent (or suffix) is as large as possible. + The sign will be omitted unless the number is negative. + + Examples: + 1.5 will be serialized as "1500m" + 1.5Gi will be serialized as "1536Mi" + + Note that the quantity will NEVER be internally represented by a floating point number. That is the whole point of this exercise. + + Non-canonical values will still parse as long as they are well formed, but will be re-emitted in their canonical form. (So always use canonical form, or don't diff.) + + This format is intended to make it difficult to use these numbers without writing some sort of special handling code in the hopes that that will cause implementors to also use a fixed point implementation. + nullable: true + type: string + type: object + memory: + properties: + limit: + description: |- + Quantity is a fixed-point representation of a number. It provides convenient marshaling/unmarshaling in JSON and YAML, in addition to String() and AsInt64() accessors. + + The serialization format is: + + ::= + (Note that may be empty, from the "" case in .) + ::= 0 | 1 | ... | 9 ::= | ::= | . | . | . ::= "+" | "-" ::= | ::= | | ::= Ki | Mi | Gi | Ti | Pi | Ei + (International System of units; See: http://physics.nist.gov/cuu/Units/binary.html) + ::= m | "" | k | M | G | T | P | E + (Note that 1024 = 1Ki but 1000 = 1k; I didn't choose the capitalization.) + ::= "e" | "E" + + No matter which of the three exponent forms is used, no quantity may represent a number greater than 2^63-1 in magnitude, nor may it have more than 3 decimal places. Numbers larger or more precise will be capped or rounded up. (E.g.: 0.1m will rounded up to 1m.) This may be extended in the future if we require larger or smaller quantities. + + When a Quantity is parsed from a string, it will remember the type of suffix it had, and will use the same type again when it is serialized. + + Before serializing, Quantity will be put in "canonical form". This means that Exponent/suffix will be adjusted up or down (with a corresponding increase or decrease in Mantissa) such that: + a. No precision is lost + b. No fractional digits will be emitted + c. The exponent (or suffix) is as large as possible. + The sign will be omitted unless the number is negative. + + Examples: + 1.5 will be serialized as "1500m" + 1.5Gi will be serialized as "1536Mi" + + Note that the quantity will NEVER be internally represented by a floating point number. That is the whole point of this exercise. + + Non-canonical values will still parse as long as they are well formed, but will be re-emitted in their canonical form. (So always use canonical form, or don't diff.) + + This format is intended to make it difficult to use these numbers without writing some sort of special handling code in the hopes that that will cause implementors to also use a fixed point implementation. + nullable: true + type: string + runtimeLimits: + type: object + type: object + storage: + type: object + type: object + type: object + configOverrides: + additionalProperties: + additionalProperties: + type: string + type: object + default: {} + type: object + envOverrides: + additionalProperties: + type: string + default: {} + type: object + roleGroups: + additionalProperties: + properties: + cliOverrides: + additionalProperties: + type: string + default: {} + type: object + config: + default: {} + properties: + cleaner: + nullable: true + type: boolean + resources: + default: + memory: + limit: null + runtimeLimits: {} + cpu: + min: null + max: null + storage: {} + properties: + cpu: + default: + min: null + max: null + properties: + max: + description: |- + Quantity is a fixed-point representation of a number. It provides convenient marshaling/unmarshaling in JSON and YAML, in addition to String() and AsInt64() accessors. + + The serialization format is: + + ::= + (Note that may be empty, from the "" case in .) + ::= 0 | 1 | ... | 9 ::= | ::= | . | . | . ::= "+" | "-" ::= | ::= | | ::= Ki | Mi | Gi | Ti | Pi | Ei + (International System of units; See: http://physics.nist.gov/cuu/Units/binary.html) + ::= m | "" | k | M | G | T | P | E + (Note that 1024 = 1Ki but 1000 = 1k; I didn't choose the capitalization.) + ::= "e" | "E" + + No matter which of the three exponent forms is used, no quantity may represent a number greater than 2^63-1 in magnitude, nor may it have more than 3 decimal places. Numbers larger or more precise will be capped or rounded up. (E.g.: 0.1m will rounded up to 1m.) This may be extended in the future if we require larger or smaller quantities. + + When a Quantity is parsed from a string, it will remember the type of suffix it had, and will use the same type again when it is serialized. + + Before serializing, Quantity will be put in "canonical form". This means that Exponent/suffix will be adjusted up or down (with a corresponding increase or decrease in Mantissa) such that: + a. No precision is lost + b. No fractional digits will be emitted + c. The exponent (or suffix) is as large as possible. + The sign will be omitted unless the number is negative. + + Examples: + 1.5 will be serialized as "1500m" + 1.5Gi will be serialized as "1536Mi" + + Note that the quantity will NEVER be internally represented by a floating point number. That is the whole point of this exercise. + + Non-canonical values will still parse as long as they are well formed, but will be re-emitted in their canonical form. (So always use canonical form, or don't diff.) + + This format is intended to make it difficult to use these numbers without writing some sort of special handling code in the hopes that that will cause implementors to also use a fixed point implementation. + nullable: true + type: string + min: + description: |- + Quantity is a fixed-point representation of a number. It provides convenient marshaling/unmarshaling in JSON and YAML, in addition to String() and AsInt64() accessors. + + The serialization format is: + + ::= + (Note that may be empty, from the "" case in .) + ::= 0 | 1 | ... | 9 ::= | ::= | . | . | . ::= "+" | "-" ::= | ::= | | ::= Ki | Mi | Gi | Ti | Pi | Ei + (International System of units; See: http://physics.nist.gov/cuu/Units/binary.html) + ::= m | "" | k | M | G | T | P | E + (Note that 1024 = 1Ki but 1000 = 1k; I didn't choose the capitalization.) + ::= "e" | "E" + + No matter which of the three exponent forms is used, no quantity may represent a number greater than 2^63-1 in magnitude, nor may it have more than 3 decimal places. Numbers larger or more precise will be capped or rounded up. (E.g.: 0.1m will rounded up to 1m.) This may be extended in the future if we require larger or smaller quantities. + + When a Quantity is parsed from a string, it will remember the type of suffix it had, and will use the same type again when it is serialized. + + Before serializing, Quantity will be put in "canonical form". This means that Exponent/suffix will be adjusted up or down (with a corresponding increase or decrease in Mantissa) such that: + a. No precision is lost + b. No fractional digits will be emitted + c. The exponent (or suffix) is as large as possible. + The sign will be omitted unless the number is negative. + + Examples: + 1.5 will be serialized as "1500m" + 1.5Gi will be serialized as "1536Mi" + + Note that the quantity will NEVER be internally represented by a floating point number. That is the whole point of this exercise. + + Non-canonical values will still parse as long as they are well formed, but will be re-emitted in their canonical form. (So always use canonical form, or don't diff.) + + This format is intended to make it difficult to use these numbers without writing some sort of special handling code in the hopes that that will cause implementors to also use a fixed point implementation. + nullable: true + type: string + type: object + memory: + properties: + limit: + description: |- + Quantity is a fixed-point representation of a number. It provides convenient marshaling/unmarshaling in JSON and YAML, in addition to String() and AsInt64() accessors. + + The serialization format is: + + ::= + (Note that may be empty, from the "" case in .) + ::= 0 | 1 | ... | 9 ::= | ::= | . | . | . ::= "+" | "-" ::= | ::= | | ::= Ki | Mi | Gi | Ti | Pi | Ei + (International System of units; See: http://physics.nist.gov/cuu/Units/binary.html) + ::= m | "" | k | M | G | T | P | E + (Note that 1024 = 1Ki but 1000 = 1k; I didn't choose the capitalization.) + ::= "e" | "E" + + No matter which of the three exponent forms is used, no quantity may represent a number greater than 2^63-1 in magnitude, nor may it have more than 3 decimal places. Numbers larger or more precise will be capped or rounded up. (E.g.: 0.1m will rounded up to 1m.) This may be extended in the future if we require larger or smaller quantities. + + When a Quantity is parsed from a string, it will remember the type of suffix it had, and will use the same type again when it is serialized. + + Before serializing, Quantity will be put in "canonical form". This means that Exponent/suffix will be adjusted up or down (with a corresponding increase or decrease in Mantissa) such that: + a. No precision is lost + b. No fractional digits will be emitted + c. The exponent (or suffix) is as large as possible. + The sign will be omitted unless the number is negative. + + Examples: + 1.5 will be serialized as "1500m" + 1.5Gi will be serialized as "1536Mi" + + Note that the quantity will NEVER be internally represented by a floating point number. That is the whole point of this exercise. + + Non-canonical values will still parse as long as they are well formed, but will be re-emitted in their canonical form. (So always use canonical form, or don't diff.) + + This format is intended to make it difficult to use these numbers without writing some sort of special handling code in the hopes that that will cause implementors to also use a fixed point implementation. + nullable: true + type: string + runtimeLimits: + type: object + type: object + storage: + type: object + type: object + type: object + configOverrides: + additionalProperties: + additionalProperties: + type: string + type: object + default: {} + type: object + envOverrides: + additionalProperties: + type: string + default: {} + type: object + replicas: + format: uint16 + minimum: 0.0 + nullable: true + type: integer + selector: + description: A label selector is a label query over a set of resources. The result of matchLabels and matchExpressions are ANDed. An empty label selector matches all objects. A null label selector matches no objects. + nullable: true + properties: + matchExpressions: + description: matchExpressions is a list of label selector requirements. The requirements are ANDed. + items: + description: A label selector requirement is a selector that contains values, a key, and an operator that relates the key and values. + properties: + key: + description: key is the label key that the selector applies to. + type: string + operator: + description: operator represents a key's relationship to a set of values. Valid operators are In, NotIn, Exists and DoesNotExist. + type: string + values: + description: values is an array of string values. If the operator is In or NotIn, the values array must be non-empty. If the operator is Exists or DoesNotExist, the values array must be empty. This array is replaced during a strategic merge patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + description: matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels map is equivalent to an element of matchExpressions, whose key field is "key", the operator is "In", and the values array contains only "value". The requirements are ANDed. + type: object + type: object + type: object + type: object + required: + - roleGroups + type: object + sparkConf: + additionalProperties: + type: string + nullable: true + type: object + required: + - image + - logFileDirectory + - nodes + type: object + required: + - spec + title: SparkHistoryServer + type: object + served: true + storage: true + subresources: {} diff --git a/deploy/helm/spark-k8s-operator/templates/roles.yaml b/deploy/helm/spark-k8s-operator/templates/roles.yaml index 3c74456f..934b920e 100644 --- a/deploy/helm/spark-k8s-operator/templates/roles.yaml +++ b/deploy/helm/spark-k8s-operator/templates/roles.yaml @@ -84,6 +84,7 @@ rules: - spark.stackable.tech resources: - sparkapplications + - sparkhistoryservers verbs: - get - list diff --git a/deploy/helm/spark-k8s-operator/templates/spark-clusterrole.yaml b/deploy/helm/spark-k8s-operator/templates/spark-clusterrole.yaml index 4828006c..1d985aa0 100644 --- a/deploy/helm/spark-k8s-operator/templates/spark-clusterrole.yaml +++ b/deploy/helm/spark-k8s-operator/templates/spark-clusterrole.yaml @@ -52,6 +52,7 @@ rules: - "" resources: - configmaps + - persistentvolumeclaims - pods - secrets - serviceaccounts diff --git a/docs/modules/ROOT/examples/example-history-app.yaml b/docs/modules/ROOT/examples/example-history-app.yaml new file mode 100644 index 00000000..aa36dd4a --- /dev/null +++ b/docs/modules/ROOT/examples/example-history-app.yaml @@ -0,0 +1,34 @@ +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkApplication +metadata: + name: spark-pi-s3-1 +spec: + version: "1.0" + sparkImage: docker.stackable.tech/stackable/spark-k8s:3.3.0-stackable0.3.0 + sparkImagePullPolicy: IfNotPresent + mode: cluster + mainClass: org.apache.spark.examples.SparkPi + mainApplicationFile: s3a://my-bucket/spark-examples_2.12-3.3.0.jar + s3connection: # <1> + inline: + host: test-minio + port: 9000 + accessStyle: Path + credentials: + secretClass: s3-credentials-class # <2> + logFileDirectory: # <3> + s3: + prefix: eventlogs/ # <4> + bucket: + inline: + bucketName: spark-logs # <5> + connection: + inline: + host: test-minio + port: 9000 + accessStyle: Path + credentials: + secretClass: history-credentials-class # <6> + executor: + instances: 1 diff --git a/docs/modules/ROOT/examples/example-history-server.yaml b/docs/modules/ROOT/examples/example-history-server.yaml new file mode 100644 index 00000000..40ab94c7 --- /dev/null +++ b/docs/modules/ROOT/examples/example-history-server.yaml @@ -0,0 +1,29 @@ +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkHistoryServer +metadata: + name: spark-history +spec: + image: + productVersion: 3.3.0 + stackableVersion: 0.3.0 + logFileDirectory: # <1> + s3: + prefix: eventlogs/ # <2> + bucket: # <3> + inline: + bucketName: spark-logs + connection: + inline: + host: test-minio + port: 9000 + accessStyle: Path + credentials: + secretClass: history-credentials-class + sparkConf: # <4> + nodes: + roleGroups: + cleaner: + replicas: 1 # <5> + config: + cleaner: true # <6> diff --git a/docs/modules/ROOT/examples/example-sparkapp-s3-private.yaml b/docs/modules/ROOT/examples/example-sparkapp-s3-private.yaml index 2110c1d4..d1152bb3 100644 --- a/docs/modules/ROOT/examples/example-sparkapp-s3-private.yaml +++ b/docs/modules/ROOT/examples/example-sparkapp-s3-private.yaml @@ -9,16 +9,13 @@ spec: mode: cluster mainApplicationFile: s3a://my-bucket/spark-examples_2.12-3.3.0.jar # <1> mainClass: org.apache.spark.examples.SparkPi # <2> - s3bucket: # <3> + s3connection: # <3> inline: - bucketName: my-bucket - connection: - inline: - host: test-minio - port: 9000 - accessStyle: Path - credentials: # <4> - secretClass: s3-credentials-class + host: test-minio + port: 9000 + accessStyle: Path + credentials: # <4> + secretClass: s3-credentials-class sparkConf: # <5> spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" # <6> spark.driver.extraClassPath: "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar" diff --git a/docs/modules/ROOT/images/history-server-ui.png b/docs/modules/ROOT/images/history-server-ui.png new file mode 100644 index 00000000..7f4b923d Binary files /dev/null and b/docs/modules/ROOT/images/history-server-ui.png differ diff --git a/docs/modules/ROOT/nav.adoc b/docs/modules/ROOT/nav.adoc index cca2c4a9..6cdac327 100644 --- a/docs/modules/ROOT/nav.adoc +++ b/docs/modules/ROOT/nav.adoc @@ -2,3 +2,4 @@ * xref:usage.adoc[] * xref:job_dependencies.adoc[] * xref:rbac.adoc[] +* xref:history_server.adoc[] diff --git a/docs/modules/ROOT/pages/history_server.adoc b/docs/modules/ROOT/pages/history_server.adoc new file mode 100644 index 00000000..32cf4788 --- /dev/null +++ b/docs/modules/ROOT/pages/history_server.adoc @@ -0,0 +1,66 @@ += Spark History Server + +== Overview + +The Stackable Spark-on-Kubernetes operator runs Apache Spark workloads in a Kubernetes cluster, whereby driver- and executor-pods are created for the duration of the job and then terminated. One or more Spark History Server instances can be deployed independently of `SparkApplication` jobs and used as an end-point for spark logging, so that job information can be viewed once the job pods are no longer available. + +== Deployment + +The example below demonstrates how to set up the history server running in one Pod with scheduled cleanups of the event logs. The event logs are loaded from an S3 bucket named `spark-logs` and the folder `eventlogs/`. The credentials for this bucket are provided by the secret class `s3-credentials-class`. For more details on how the Stackable Data Platform manages S3 resources see the xref:home:concepts:s3.adoc[S3 resources] page. + + +[source,yaml] +---- +include::example$example-history-server.yaml[] +---- + +<1> The location of the event logs. Must be a S3 bucket. Future implementations might add support for other shared filesystems such as HDFS. +<2> Folder within the S3 bucket where the log files are located. This folder is required and must exist before setting up the history server. +<3> The S3 bucket definition, here provided in-line. +<4> Additional history server configuration properties can be provided here as a map. For possible properties see: https://spark.apache.org/docs/latest/monitoring.html#spark-history-server-configuration-options +<5> This deployment has only one Pod. Multiple history servers can be started, all reading the same event logs by increasing the replica count. +<6> This history server will automatically clean up old log files by using default properties. You can change any of these by using the `sparkConf` map. + +NOTE: Only one role group can have scheduled cleanups enabled (`cleaner: true`) and this role group cannot have more than 1 replica. + +The secret with S3 credentials must contain at least the following two keys: + +* `accessKey` - the access key of a user with read and write access to the event log bucket. +* `secretKey` - the secret key of a user with read and write access to the event log bucket. + +Any other entries of the Secret are ignored by the operator. + +== Application configuration + + +The example below demonstrates how to configure Spark applications to write log events to an S3 bucket. + +[source,yaml] +---- +include::example$example-history-app.yaml[] +---- + +<1> Location of the data that is being processed by the application. +<2> Credentials used to access the data above. +<3> Instruct the operator to configure the application with logging enabled. +<4> Folder to store logs. This must match the prefix used by the history server. +<5> Bucket to store logs. This must match the bucket used by the history server. +<6> Credentials used to write event logs. These can, of course, differ from the credentials used to process data. + + + +== History Web UI + +To access the history server web UI, use one of the `NodePort` services created by the operator. For the example above, the operator created two services as shown: + +[source,bash] +---- +$ kubectl get svc +NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE +spark-history-node NodePort 10.96.222.233 18080:30136/TCP 52m +spark-history-node-cleaner NodePort 10.96.203.43 18080:32585/TCP 52m +---- + +By setting up port forwarding on 18080 the UI can be opened by pointing your browser to `http://localhost:18080`: + +image::history-server-ui.png[History Server Console] diff --git a/docs/modules/ROOT/pages/usage.adoc b/docs/modules/ROOT/pages/usage.adoc index 7e67b28d..1a170e2c 100644 --- a/docs/modules/ROOT/pages/usage.adoc +++ b/docs/modules/ROOT/pages/usage.adoc @@ -92,57 +92,50 @@ include::example$example-sparkapp-configmap.yaml[] You can specify S3 connection details directly inside the `SparkApplication` specification or by referring to an external `S3Bucket` custom resource. -To specify S3 connection details directly as part of the `SparkApplication` resource you add an inline bucket configuration as shown below. +To specify S3 connection details directly as part of the `SparkApplication` resource you add an inline connection configuration as shown below. [source,yaml] ---- -s3bucket: # <1> +s3connection: # <1> inline: - bucketName: my-bucket # <2> - connection: - inline: - host: test-minio # <3> - port: 9000 # <4> - accessStyle: Path - credentials: - secretClass: s3-credentials-class # <5> + host: test-minio # <2> + port: 9000 # <3> + accessStyle: Path + credentials: + secretClass: s3-credentials-class # <4> ---- -<1> Entry point for the bucket configuration. -<2> Bucket name. -<3> Bucket host. -<4> Optional bucket port. -<5> Name of the `Secret` object expected to contain the following keys: `ACCESS_KEY_ID` and `SECRET_ACCESS_KEY` +<1> Entry point for the S3 connection configuration. +<2> Connection host. +<3> Optional connection port. +<4> Name of the `Secret` object expected to contain the following keys: `ACCESS_KEY_ID` and `SECRET_ACCESS_KEY` -It is also possible to configure the bucket connection details as a separate Kubernetes resource and only refer to that object from the `SparkApplication` like this: +It is also possible to configure the connection details as a separate Kubernetes resource and only refer to that object from the `SparkApplication` like this: [source,yaml] ---- -s3bucket: - reference: my-bucket-resource # <1> +s3connection: + reference: s3-connection-resource # <1> ---- -<1> Name of the bucket resource with connection details. +<1> Name of the connection resource with connection details. -The resource named `my-bucket-resource` is then defined as shown below: +The resource named `s3-connection-resource` is then defined as shown below: [source,yaml] ---- --- apiVersion: s3.stackable.tech/v1alpha1 -kind: S3Bucket +kind: S3Connection metadata: - name: my-bucket-resource + name: s3-connection-resource spec: - bucketName: my-bucket-name - connection: - inline: - host: test-minio - port: 9000 - accessStyle: Path - credentials: - secretClass: minio-credentials-class + host: test-minio + port: 9000 + accessStyle: Path + credentials: + secretClass: minio-credentials-class ---- -This has the advantage that bucket configuration can be shared across `SparkApplication`s and reduces the cost of updating these details. +This has the advantage that one connection configuration can be shared across `SparkApplications` and reduces the cost of updating these details. == Resource Requests @@ -228,8 +221,8 @@ Below are listed the CRD fields that can be defined by the user: |`spec.args` |Arguments passed directly to the job artifact -|`spec.s3bucket` -|S3 bucket and connection specification. See the <> for more details. +|`spec.s3connection` +|S3 connection specification. See the <> for more details. |`spec.sparkConf` |A map of key/value strings that will be passed directly to `spark-submit` diff --git a/examples/ny-tlc-report-external-dependencies.yaml b/examples/ny-tlc-report-external-dependencies.yaml index 7aae0881..3f9ee16e 100644 --- a/examples/ny-tlc-report-external-dependencies.yaml +++ b/examples/ny-tlc-report-external-dependencies.yaml @@ -16,14 +16,11 @@ spec: deps: requirements: - tabulate==0.8.9 - s3bucket: + s3connection: inline: - bucketName: my-bucket - connection: - inline: - host: test-minio - port: 9000 - accessStyle: Path + host: test-minio + port: 9000 + accessStyle: Path sparkConf: spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider" spark.driver.extraClassPath: "/dependencies/jars/*" diff --git a/examples/ny-tlc-report-image.yaml b/examples/ny-tlc-report-image.yaml index db441310..db5d8e10 100644 --- a/examples/ny-tlc-report-image.yaml +++ b/examples/ny-tlc-report-image.yaml @@ -17,14 +17,11 @@ spec: deps: requirements: - tabulate==0.8.9 - s3bucket: + s3connection: inline: - bucketName: my-bucket - connection: - inline: - host: test-minio - port: 9000 - accessStyle: Path + host: test-minio + port: 9000 + accessStyle: Path sparkConf: spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider" executor: diff --git a/examples/ny-tlc-report.yaml b/examples/ny-tlc-report.yaml index ab13d52e..0d281e1c 100644 --- a/examples/ny-tlc-report.yaml +++ b/examples/ny-tlc-report.yaml @@ -23,14 +23,11 @@ spec: name: cm-job-arguments args: - "--input /arguments/job-args.txt" - s3bucket: + s3connection: inline: - bucketName: my-bucket - connection: - inline: - host: test-minio - port: 9000 - accessStyle: Path + host: test-minio + port: 9000 + accessStyle: Path sparkConf: spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider" driver: diff --git a/rust/crd/Cargo.toml b/rust/crd/Cargo.toml index 834c30a7..48f9378a 100644 --- a/rust/crd/Cargo.toml +++ b/rust/crd/Cargo.toml @@ -9,10 +9,10 @@ version = "0.7.0-nightly" publish = false [dependencies] -stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag="0.27.1" } +stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag="0.30.2" } semver = "1.0" -serde = { version = "1.0", features = ["derive"] } +serde = "1.0" serde_json = "1.0" serde_yaml = "0.8" snafu = "0.7" diff --git a/rust/crd/src/constants.rs b/rust/crd/src/constants.rs index 0fc2d5d4..b1f67887 100644 --- a/rust/crd/src/constants.rs +++ b/rust/crd/src/constants.rs @@ -17,10 +17,24 @@ pub const CONTAINER_NAME_DRIVER: &str = "spark-driver"; pub const CONTAINER_IMAGE_NAME_EXECUTOR: &str = "dummy-overwritten-by-command-line"; pub const CONTAINER_NAME_EXECUTOR: &str = "spark-executor"; -pub const ACCESS_KEY_ID: &str = "accessKeyId"; -pub const SECRET_ACCESS_KEY: &str = "secretAccessKey"; +pub const ACCESS_KEY_ID: &str = "accessKey"; +pub const SECRET_ACCESS_KEY: &str = "secretKey"; pub const S3_SECRET_DIR_NAME: &str = "/stackable/secrets"; pub const MIN_MEMORY_OVERHEAD: u32 = 384; pub const JVM_OVERHEAD_FACTOR: f32 = 0.1; pub const NON_JVM_OVERHEAD_FACTOR: f32 = 0.4; + +pub const OPERATOR_NAME: &str = "spark.stackable.tech"; +pub const CONTROLLER_NAME: &str = "sparkapplication"; +pub const POD_DRIVER_CONTROLLER_NAME: &str = "pod-driver"; +pub const HISTORY_CONTROLLER_NAME: &str = "history"; + +pub const HISTORY_ROLE_NAME: &str = "node"; + +pub const HISTORY_IMAGE_BASE_NAME: &str = "spark-k8s"; + +pub const HISTORY_CONFIG_FILE_NAME: &str = "spark-defaults.conf"; +pub const HISTORY_CONFIG_FILE_NAME_FULL: &str = "/stackable/spark/conf/spark-defaults.conf"; + +pub const SPARK_CLUSTER_ROLE: &str = "spark-k8s-clusterrole"; diff --git a/rust/crd/src/history.rs b/rust/crd/src/history.rs new file mode 100644 index 00000000..5c1de741 --- /dev/null +++ b/rust/crd/src/history.rs @@ -0,0 +1,250 @@ +use crate::constants::*; +use stackable_operator::commons::product_image_selection::{ProductImage, ResolvedProductImage}; +use stackable_operator::commons::resources::{ + CpuLimitsFragment, MemoryLimitsFragment, NoRuntimeLimitsFragment, +}; +use stackable_operator::commons::s3::S3BucketDef; +use stackable_operator::config::fragment::ValidationError; +use stackable_operator::k8s_openapi::apimachinery::pkg::api::resource::Quantity; +use stackable_operator::kube::runtime::reflector::ObjectRef; +use stackable_operator::product_config::types::PropertyNameKind; +use stackable_operator::product_config::ProductConfigManager; +use stackable_operator::product_config_utils::{ + transform_all_roles_to_config, validate_all_roles_and_groups_config, Configuration, + ValidatedRoleConfigByPropertyKind, +}; +use stackable_operator::role_utils::{Role, RoleGroupRef}; + +use std::collections::{BTreeMap, HashMap}; + +use serde::{Deserialize, Serialize}; +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + commons::resources::{NoRuntimeLimits, Resources, ResourcesFragment}, + config::{fragment, fragment::Fragment, merge::Merge}, +}; +use stackable_operator::{ + kube::CustomResource, + schemars::{self, JsonSchema}, +}; +use strum::Display; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("failed to transform configs"))] + ProductConfigTransform { + source: stackable_operator::product_config_utils::ConfigError, + }, + #[snafu(display("invalid product config"))] + InvalidProductConfig { + source: stackable_operator::error::Error, + }, + #[snafu(display("fragment validation failure"))] + FragmentValidationFailure { source: ValidationError }, +} + +#[derive(Clone, CustomResource, Debug, Deserialize, JsonSchema, Serialize)] +#[kube( + group = "spark.stackable.tech", + version = "v1alpha1", + kind = "SparkHistoryServer", + shortname = "shs", + namespaced, + crates( + kube_core = "stackable_operator::kube::core", + k8s_openapi = "stackable_operator::k8s_openapi", + schemars = "stackable_operator::schemars" + ) +)] +#[serde(rename_all = "camelCase")] +pub struct SparkHistoryServerSpec { + pub image: ProductImage, + pub log_file_directory: LogFileDirectorySpec, + #[serde(skip_serializing_if = "Option::is_none")] + pub spark_conf: Option>, + pub nodes: Role, +} + +impl SparkHistoryServer { + pub fn merged_config( + &self, + rolegroup_ref: &RoleGroupRef, + ) -> Result { + // Initialize the result with all default values as baseline + let conf_defaults = HistoryConfig::default_config(); + + let role = &self.spec.nodes; + + // Retrieve role resource config + let mut conf_role = role.config.config.to_owned(); + + // Retrieve rolegroup specific resource config + let mut conf_rolegroup = role + .role_groups + .get(&rolegroup_ref.role_group) + .map(|rg| rg.config.config.clone()) + .unwrap_or_default(); + + conf_role.merge(&conf_defaults); + conf_rolegroup.merge(&conf_role); + + fragment::validate(conf_defaults).context(FragmentValidationFailureSnafu) + } + + pub fn replicas(&self, rolegroup_ref: &RoleGroupRef) -> Option { + self.spec + .nodes + .role_groups + .get(&rolegroup_ref.role_group) + .and_then(|rg| rg.replicas) + .map(i32::from) + } + + pub fn cleaner_rolegroups(&self) -> Vec> { + let mut rgs = vec![]; + for (rg_name, rg_config) in &self.spec.nodes.role_groups { + if let Some(true) = rg_config.config.config.cleaner { + rgs.push(RoleGroupRef { + cluster: ObjectRef::from_obj(self), + role: HISTORY_ROLE_NAME.into(), + role_group: rg_name.into(), + }); + } + } + rgs + } + + pub fn validated_role_config( + &self, + resolved_product_image: &ResolvedProductImage, + product_config: &ProductConfigManager, + ) -> Result { + let roles_to_validate: HashMap< + String, + (Vec, Role), + > = vec![( + HISTORY_ROLE_NAME.to_string(), + ( + vec![PropertyNameKind::File(HISTORY_CONFIG_FILE_NAME.to_string())], + self.spec.nodes.clone(), + ), + )] + .into_iter() + .collect(); + + let role_config = transform_all_roles_to_config(self, roles_to_validate); + + validate_all_roles_and_groups_config( + &resolved_product_image.product_version, + &role_config.context(ProductConfigTransformSnafu)?, + product_config, + false, + false, + ) + .context(InvalidProductConfigSnafu) + } +} + +#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize, Display)] +#[serde(rename_all = "camelCase")] +pub enum LogFileDirectorySpec { + #[strum(serialize = "s3")] + S3(S3LogFileDirectorySpec), +} + +#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct S3LogFileDirectorySpec { + pub prefix: String, + pub bucket: S3BucketDef, +} + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)] +#[fragment_attrs( + allow(clippy::derive_partial_eq_without_eq), + derive( + Clone, + Debug, + Default, + Deserialize, + Merge, + JsonSchema, + PartialEq, + Serialize + ), + serde(rename_all = "camelCase") +)] +pub struct HistoryStorageConfig {} + +#[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)] +#[fragment_attrs( + derive( + Clone, + Debug, + Default, + Deserialize, + Merge, + JsonSchema, + PartialEq, + Serialize + ), + serde(rename_all = "camelCase") +)] +pub struct HistoryConfig { + #[serde(skip_serializing_if = "Option::is_none")] + pub cleaner: Option, + #[fragment_attrs(serde(default))] + pub resources: Resources, +} + +impl HistoryConfig { + fn default_config() -> HistoryConfigFragment { + HistoryConfigFragment { + cleaner: None, + resources: ResourcesFragment { + cpu: CpuLimitsFragment { + min: Some(Quantity("200m".to_owned())), + max: Some(Quantity("4".to_owned())), + }, + memory: MemoryLimitsFragment { + limit: Some(Quantity("2Gi".to_owned())), + runtime_limits: NoRuntimeLimitsFragment {}, + }, + storage: HistoryStorageConfigFragment {}, + }, + } + } +} + +impl Configuration for HistoryConfigFragment { + type Configurable = SparkHistoryServer; + + fn compute_env( + &self, + _resource: &Self::Configurable, + _role_name: &str, + ) -> stackable_operator::product_config_utils::ConfigResult>> + { + Ok(BTreeMap::new()) + } + + fn compute_cli( + &self, + _resource: &Self::Configurable, + _role_name: &str, + ) -> stackable_operator::product_config_utils::ConfigResult>> + { + Ok(BTreeMap::new()) + } + + fn compute_files( + &self, + _resource: &Self::Configurable, + _role_name: &str, + _file: &str, + ) -> stackable_operator::product_config_utils::ConfigResult>> + { + Ok(BTreeMap::new()) + } +} diff --git a/rust/crd/src/lib.rs b/rust/crd/src/lib.rs index 805620f4..370e5607 100644 --- a/rust/crd/src/lib.rs +++ b/rust/crd/src/lib.rs @@ -1,12 +1,14 @@ //! This module provides all required CRD definitions and additional helper methods. pub mod constants; +pub mod history; +pub mod s3logdir; use constants::*; +use history::LogFileDirectorySpec; +use s3logdir::S3LogDir; use stackable_operator::builder::VolumeBuilder; -use stackable_operator::commons::s3::{ - InlinedS3BucketSpec, S3AccessStyle, S3BucketDef, S3ConnectionSpec, -}; +use stackable_operator::commons::s3::{S3AccessStyle, S3ConnectionDef, S3ConnectionSpec}; use stackable_operator::k8s_openapi::api::core::v1::{ EmptyDirVolumeSource, EnvVar, LocalObjectReference, Volume, VolumeMount, }; @@ -34,9 +36,6 @@ use stackable_operator::{ }; use strum::{Display, EnumString}; -pub const OPERATOR_NAME: &str = "spark.stackable.tech"; -pub const CONTROLLER_NAME: &str = "sparkapplication"; - #[derive(Snafu, Debug)] pub enum Error { #[snafu(display("object has no namespace associated"))] @@ -113,7 +112,7 @@ impl SparkConfig { } } -#[derive(Clone, CustomResource, Debug, Default, Deserialize, JsonSchema, PartialEq, Serialize)] +#[derive(Clone, CustomResource, Debug, Default, Deserialize, JsonSchema, Serialize)] #[kube( group = "spark.stackable.tech", version = "v1alpha1", @@ -160,13 +159,15 @@ pub struct SparkApplicationSpec { #[serde(default, skip_serializing_if = "Option::is_none")] pub deps: Option, #[serde(default, skip_serializing_if = "Option::is_none")] - pub s3bucket: Option, + pub s3connection: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub args: Option>, #[serde(default, skip_serializing_if = "Option::is_none")] pub volumes: Option>, #[serde(default, skip_serializing_if = "Option::is_none")] pub env: Option>, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub log_file_directory: Option, } #[derive(Clone, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize, Display, EnumString)] @@ -234,7 +235,11 @@ impl SparkApplication { .map(|req| req.join(" ")) } - pub fn volumes(&self, s3bucket: &Option) -> Vec { + pub fn volumes( + &self, + s3conn: &Option, + s3logdir: &Option, + ) -> Vec { let mut result: Vec = self .spec .volumes @@ -260,21 +265,25 @@ impl SparkApplication { ); } - let s3_conn = s3bucket.as_ref().and_then(|i| i.connection.as_ref()); - if let Some(S3ConnectionSpec { - credentials: Some(credentials), + credentials: Some(secret_class_volume), .. - }) = s3_conn + }) = s3conn { - result.push(credentials.to_volume("s3-credentials")); + result.push(secret_class_volume.to_volume(secret_class_volume.secret_class.as_ref())); } + + if let Some(v) = s3logdir.as_ref().and_then(|o| o.credentials_volume()) { + result.push(v); + } + result } pub fn executor_volume_mounts( &self, - s3bucket: &Option, + s3conn: &Option, + s3logdir: &Option, ) -> Vec { let result: Vec = self .spec @@ -286,10 +295,14 @@ impl SparkApplication { .cloned() .collect(); - self.add_common_volume_mounts(result, s3bucket) + self.add_common_volume_mounts(result, s3conn, s3logdir) } - pub fn driver_volume_mounts(&self, s3bucket: &Option) -> Vec { + pub fn driver_volume_mounts( + &self, + s3conn: &Option, + s3logdir: &Option, + ) -> Vec { let result: Vec = self .spec .driver @@ -300,13 +313,14 @@ impl SparkApplication { .cloned() .collect(); - self.add_common_volume_mounts(result, s3bucket) + self.add_common_volume_mounts(result, s3conn, s3logdir) } fn add_common_volume_mounts( &self, mut mounts: Vec, - s3bucket: &Option, + s3conn: &Option, + s3logdir: &Option, ) -> Vec { if self.spec.image.is_some() { mounts.push(VolumeMount { @@ -322,19 +336,26 @@ impl SparkApplication { ..VolumeMount::default() }); } - let s3_conn = s3bucket.as_ref().and_then(|i| i.connection.as_ref()); if let Some(S3ConnectionSpec { - credentials: Some(_credentials), + credentials: Some(secret_class_volume), .. - }) = s3_conn + }) = s3conn { + let secret_class_name = secret_class_volume.secret_class.clone(); + let secret_dir = format!("{S3_SECRET_DIR_NAME}/{secret_class_name}"); + mounts.push(VolumeMount { - name: "s3-credentials".into(), - mount_path: S3_SECRET_DIR_NAME.into(), + name: secret_class_name, + mount_path: secret_dir, ..VolumeMount::default() }); } + + if let Some(vm) = s3logdir.as_ref().and_then(|o| o.credentials_volume_mount()) { + mounts.push(vm); + } + mounts } @@ -353,7 +374,8 @@ impl SparkApplication { pub fn build_command( &self, serviceaccount_name: &str, - s3bucket: &Option, + s3conn: &Option, + s3_log_dir: &Option, ) -> Result, Error> { // mandatory properties let mode = self.mode().context(ObjectHasNoDeployModeSnafu)?; @@ -379,11 +401,11 @@ impl SparkApplication { // See https://spark.apache.org/docs/latest/running-on-kubernetes.html#dependency-management // for possible S3 related properties - if let Some(endpoint) = s3bucket.as_ref().and_then(|s3| s3.endpoint()) { + if let Some(endpoint) = s3conn.as_ref().and_then(|conn| conn.endpoint()) { submit_cmd.push(format!("--conf spark.hadoop.fs.s3a.endpoint={}", endpoint)); } - if let Some(conn) = s3bucket.as_ref().and_then(|i| i.connection.as_ref()) { + if let Some(conn) = s3conn.as_ref() { match conn.access_style { Some(S3AccessStyle::Path) => { submit_cmd @@ -392,18 +414,20 @@ impl SparkApplication { Some(S3AccessStyle::VirtualHosted) => {} None => {} } - if conn.credentials.as_ref().is_some() { + if let Some(credentials) = &conn.credentials { + let secret_class_name = credentials.secret_class.clone(); + let secret_dir = format!("{S3_SECRET_DIR_NAME}/{secret_class_name}"); + // We don't use the credentials at all here but assume they are available submit_cmd.push(format!( - "--conf spark.hadoop.fs.s3a.access.key=$(cat {secret_dir}/{file_name})", - secret_dir = S3_SECRET_DIR_NAME, - file_name = ACCESS_KEY_ID + "--conf spark.hadoop.fs.s3a.access.key=$(cat {secret_dir}/{ACCESS_KEY_ID})" )); submit_cmd.push(format!( - "--conf spark.hadoop.fs.s3a.secret.key=$(cat {secret_dir}/{file_name})", - secret_dir = S3_SECRET_DIR_NAME, - file_name = SECRET_ACCESS_KEY + "--conf spark.hadoop.fs.s3a.secret.key=$(cat {secret_dir}/{SECRET_ACCESS_KEY})" )); + submit_cmd.push("--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider".to_string()); + } else { + submit_cmd.push("--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider".to_string()); } } @@ -504,6 +528,10 @@ impl SparkApplication { } } + if let Some(log_dir) = s3_log_dir { + submit_conf.extend(log_dir.application_spark_config()); + } + // conf arguments: these should follow - and thus override - values set from resource limits above if let Some(spark_conf) = self.spec.spark_conf.clone() { submit_conf.extend(spark_conf); diff --git a/rust/crd/src/s3logdir.rs b/rust/crd/src/s3logdir.rs new file mode 100644 index 00000000..3f3b029a --- /dev/null +++ b/rust/crd/src/s3logdir.rs @@ -0,0 +1,211 @@ +use crate::{ + constants::*, + history::{ + LogFileDirectorySpec::{self, S3}, + S3LogFileDirectorySpec, + }, +}; +use stackable_operator::{ + commons::{ + s3::{InlinedS3BucketSpec, S3AccessStyle}, + secret_class::SecretClassVolume, + tls::{CaCert, TlsVerification}, + }, + k8s_openapi::api::core::v1::{Volume, VolumeMount}, +}; +use std::collections::BTreeMap; + +use snafu::{OptionExt, ResultExt, Snafu}; +use strum::{EnumDiscriminants, IntoStaticStr}; + +#[derive(Snafu, Debug, EnumDiscriminants)] +#[strum_discriminants(derive(IntoStaticStr))] +#[allow(clippy::enum_variant_names)] +pub enum Error { + #[snafu(display("s3 bucket error"))] + S3Bucket { + source: stackable_operator::error::Error, + }, + #[snafu(display("missing bucket name for history logs"))] + BucketNameMissing, + #[snafu(display("tls non-verification not supported"))] + S3TlsNoVerificationNotSupported, + #[snafu(display("ca-cert verification not supported"))] + S3TlsCaVerificationNotSupported, +} + +pub struct S3LogDir { + pub bucket: InlinedS3BucketSpec, + pub prefix: String, +} + +impl S3LogDir { + pub async fn resolve( + log_file_dir: Option<&LogFileDirectorySpec>, + namespace: Option, + client: &stackable_operator::client::Client, + ) -> Result, Error> { + #[allow(irrefutable_let_patterns)] + let (s3bucket, prefix) = + if let Some(S3(S3LogFileDirectorySpec { bucket, prefix })) = log_file_dir { + ( + bucket + .resolve(client, namespace.unwrap().as_str()) + .await + .context(S3BucketSnafu) + .ok(), + prefix.clone(), + ) + } else { + // !!!!! + // Ugliness alert! + // No point in trying to resolve the connection anymore since there is no + // log_file_dir in the first place. + // This can casually happen for Spark applications that don't use a history server + // !!!!! + return Ok(None); + }; + + // Check that a bucket name has been defined + s3bucket + .as_ref() + .and_then(|i| i.bucket_name.as_ref()) + .context(BucketNameMissingSnafu)?; + + if let Some(conn) = s3bucket.as_ref().and_then(|i| i.connection.as_ref()) { + if let Some(tls) = &conn.tls { + match &tls.verification { + TlsVerification::None {} => return S3TlsNoVerificationNotSupportedSnafu.fail(), + TlsVerification::Server(server_verification) => { + match &server_verification.ca_cert { + CaCert::WebPki {} => {} + CaCert::SecretClass(_) => { + return S3TlsCaVerificationNotSupportedSnafu.fail() + } + } + } + } + } + } + + Ok(Some(S3LogDir { + bucket: s3bucket.unwrap(), + prefix, + })) + } + + /// Constructs the properties needed for loading event logs from S3. + /// These properties are later written in the `HISTORY_CONFIG_FILE_NAME_FULL` file. + /// + /// The following properties related to credentials are not included: + /// * spark.hadoop.fs.s3a.aws.credentials.provider + /// * spark.hadoop.fs.s3a.access.key + /// * spark.hadoop.fs.s3a.secret.key + /// instead, the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY are set + /// on the container start command. + pub fn history_server_spark_config(&self) -> BTreeMap { + let mut result = BTreeMap::new(); + + result.insert("spark.history.fs.logDirectory".to_string(), self.url()); + + if let Some(endpoint) = self.bucket.endpoint() { + result.insert("spark.hadoop.fs.s3a.endpoint".to_string(), endpoint); + } + + if let Some(conn) = self.bucket.connection.as_ref() { + if let Some(S3AccessStyle::Path) = conn.access_style { + result.insert( + "spark.hadoop.fs.s3a.path.style.access".to_string(), + "true".to_string(), + ); + } + } + result + } + + pub fn application_spark_config(&self) -> BTreeMap { + let mut result = BTreeMap::new(); + result.insert("spark.eventLog.enabled".to_string(), "true".to_string()); + result.insert("spark.eventLog.dir".to_string(), self.url()); + + let bucket_name = self.bucket.bucket_name.as_ref().unwrap().clone(); + if let Some(endpoint) = self.bucket.endpoint() { + result.insert( + format!("spark.hadoop.fs.s3a.bucket.{bucket_name}.endpoint"), + endpoint, + ); + } + + if let Some(conn) = self.bucket.connection.as_ref() { + if let Some(S3AccessStyle::Path) = conn.access_style { + result.insert( + format!("spark.hadoop.fs.s3a.bucket.{bucket_name}.path.style.access"), + "true".to_string(), + ); + } + + if let Some(secret_dir) = self.credentials_mount_path() { + // We don't use the credentials at all here but assume they are available + result.insert( + format!("spark.hadoop.fs.s3a.bucket.{bucket_name}.access.key"), + format!("$(cat {secret_dir}/{ACCESS_KEY_ID})"), + ); + result.insert( + format!("spark.hadoop.fs.s3a.bucket.{bucket_name}.secret.key"), + format!("$(cat {secret_dir}/{SECRET_ACCESS_KEY})"), + ); + result.insert( + format!("spark.hadoop.fs.s3a.bucket.{bucket_name}.aws.credentials.provider"), + "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider".to_string(), + ); + } else { + result.insert( + format!("spark.hadoop.fs.s3a.bucket.{bucket_name}.aws.credentials.provider"), + "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider".to_string(), + ); + } + } + + result + } + + fn url(&self) -> String { + format!( + "s3a://{}/{}", + self.bucket.bucket_name.as_ref().unwrap().clone(), // this is guaranteed to exist at this point + self.prefix + ) + } + + pub fn credentials_volume(&self) -> Option { + self.credentials() + .map(|credentials| credentials.to_volume(credentials.secret_class.as_ref())) + } + + pub fn credentials_volume_mount(&self) -> Option { + self.credentials().map(|secret_class_volume| VolumeMount { + name: secret_class_volume.secret_class.clone(), + mount_path: format!( + "{}/{}", + S3_SECRET_DIR_NAME, secret_class_volume.secret_class + ), + ..VolumeMount::default() + }) + } + + pub fn credentials(&self) -> Option { + self.bucket + .connection + .as_ref() + .and_then(|conn| conn.credentials.clone()) + } + + pub fn credentials_mount_path(&self) -> Option { + self.credentials().map(|secret_class_volume| { + format!( + "{}/{}", + S3_SECRET_DIR_NAME, secret_class_volume.secret_class + ) + }) + } +} diff --git a/rust/operator-binary/Cargo.toml b/rust/operator-binary/Cargo.toml index a220ec63..6e823555 100644 --- a/rust/operator-binary/Cargo.toml +++ b/rust/operator-binary/Cargo.toml @@ -9,7 +9,7 @@ version = "0.7.0-nightly" publish = false [dependencies] -stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag="0.27.1" } +stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag="0.30.2" } stackable-spark-k8s-crd = { path = "../crd" } anyhow = "1.0" clap = "4.0" @@ -24,5 +24,5 @@ tracing-futures = { version = "0.2", features = ["futures-03"] } [build-dependencies] built = { version = "0.5", features = ["chrono", "git2"] } -stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag="0.27.1" } +stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag="0.30.2" } stackable-spark-k8s-crd = { path = "../crd" } diff --git a/rust/operator-binary/src/history_controller.rs b/rust/operator-binary/src/history_controller.rs new file mode 100644 index 00000000..2a0e343c --- /dev/null +++ b/rust/operator-binary/src/history_controller.rs @@ -0,0 +1,503 @@ +use stackable_operator::{ + builder::{ConfigMapBuilder, ContainerBuilder, ObjectMetaBuilder, PodBuilder, VolumeBuilder}, + cluster_resources::ClusterResources, + commons::{ + product_image_selection::ResolvedProductImage, + resources::{NoRuntimeLimits, Resources}, + }, + k8s_openapi::{ + api::{ + apps::v1::{StatefulSet, StatefulSetSpec}, + core::v1::{ + ConfigMap, PodSecurityContext, Service, ServiceAccount, ServicePort, ServiceSpec, + }, + rbac::v1::{ClusterRole, RoleBinding, RoleRef, Subject}, + }, + apimachinery::pkg::apis::meta::v1::LabelSelector, + }, + kube::{ + runtime::{controller::Action, reflector::ObjectRef}, + Resource, ResourceExt, + }, + labels::{role_group_selector_labels, role_selector_labels, ObjectLabels}, + product_config::ProductConfigManager, + role_utils::RoleGroupRef, +}; +use stackable_spark_k8s_crd::{ + constants::*, + history::{HistoryStorageConfig, SparkHistoryServer}, + s3logdir::S3LogDir, +}; +use std::time::Duration; +use std::{collections::BTreeMap, sync::Arc}; + +use snafu::{ResultExt, Snafu}; +use stackable_operator::logging::controller::ReconcilerError; +use strum::{EnumDiscriminants, IntoStaticStr}; + +pub struct Ctx { + pub client: stackable_operator::client::Client, + pub product_config: ProductConfigManager, +} + +#[derive(Snafu, Debug, EnumDiscriminants)] +#[strum_discriminants(derive(IntoStaticStr))] +#[allow(clippy::enum_variant_names)] +pub enum Error { + #[snafu(display("invalid config map {name}"))] + InvalidConfigMap { + source: stackable_operator::error::Error, + name: String, + }, + #[snafu(display("invalid history container name {name}"))] + InvalidContainerName { + source: stackable_operator::error::Error, + name: String, + }, + #[snafu(display("object is missing metadata to build owner reference"))] + ObjectMissingMetadataForOwnerRef { + source: stackable_operator::error::Error, + }, + #[snafu(display("failed to update the history server deployment"))] + ApplyDeployment { + source: stackable_operator::error::Error, + }, + #[snafu(display("failed to update history server config map"))] + ApplyConfigMap { + source: stackable_operator::error::Error, + }, + #[snafu(display("failed to update history server service"))] + ApplyService { + source: stackable_operator::error::Error, + }, + #[snafu(display("failed to apply role ServiceAccount"))] + ApplyServiceAccount { + source: stackable_operator::error::Error, + }, + #[snafu(display("failed to apply global RoleBinding"))] + ApplyRoleBinding { + source: stackable_operator::error::Error, + }, + #[snafu(display("product config validation failed"))] + ProductConfigValidation { + source: stackable_spark_k8s_crd::history::Error, + }, + #[snafu(display("failed to resolve and merge config for role and role group"))] + FailedToResolveConfig { + source: stackable_spark_k8s_crd::history::Error, + }, + #[snafu(display("number of cleaner rolegroups exceeds 1"))] + TooManyCleanerRoleGroups, + #[snafu(display("number of cleaner replicas exceeds 1"))] + TooManyCleanerReplicas, + #[snafu(display("failed to resolve the s3 log dir confirguration"))] + S3LogDir { + source: stackable_spark_k8s_crd::s3logdir::Error, + }, + #[snafu(display("failed to create cluster resources"))] + CreateClusterResources { + source: stackable_operator::error::Error, + }, + #[snafu(display("failed to delete orphaned resources"))] + DeleteOrphanedResources { + source: stackable_operator::error::Error, + }, +} + +type Result = std::result::Result; + +impl ReconcilerError for Error { + fn category(&self) -> &'static str { + ErrorDiscriminants::from(self).into() + } +} +/// Updates the status of the SparkApplication that started the pod. +pub async fn reconcile(shs: Arc, ctx: Arc) -> Result { + tracing::info!("Starting reconcile history server"); + + let client = &ctx.client; + + let mut cluster_resources = ClusterResources::new( + APP_NAME, + OPERATOR_NAME, + HISTORY_CONTROLLER_NAME, + &shs.object_ref(&()), + ) + .context(CreateClusterResourcesSnafu)?; + + let resolved_product_image = shs.spec.image.resolve(HISTORY_IMAGE_BASE_NAME); + let s3_log_dir = S3LogDir::resolve( + Some(&shs.spec.log_file_directory), + shs.metadata.namespace.clone(), + client, + ) + .await + .context(S3LogDirSnafu)?; + + // Use a dedicated service account for history server pods. + let (serviceaccount, rolebinding) = + build_history_role_serviceaccount(&shs, &resolved_product_image.app_version_label)?; + cluster_resources + .add(client, &serviceaccount) + .await + .context(ApplyServiceAccountSnafu)?; + cluster_resources + .add(client, &rolebinding) + .await + .context(ApplyRoleBindingSnafu)?; + + // The role_name is always HISTORY_ROLE_NAME + for (role_name, role_config) in shs + .validated_role_config(&resolved_product_image, &ctx.product_config) + .context(ProductConfigValidationSnafu)? + .iter() + { + let service = build_service( + &shs, + &resolved_product_image.app_version_label, + role_name, + None, + )?; + cluster_resources + .add(client, &service) + .await + .context(ApplyServiceSnafu)?; + + for (rolegroup_name, _rolegroup_config) in role_config.iter() { + let rgr = RoleGroupRef { + cluster: ObjectRef::from_obj(&*shs), + role: role_name.into(), + role_group: rolegroup_name.into(), + }; + + let config = shs + .merged_config(&rgr) + .context(FailedToResolveConfigSnafu)?; + + let service = build_service( + &shs, + &resolved_product_image.app_version_label, + role_name, + Some(&rgr), + )?; + cluster_resources + .add(client, &service) + .await + .context(ApplyServiceSnafu)?; + + let config_map = build_config_map( + &shs, + &resolved_product_image.app_version_label, + &rgr, + s3_log_dir.as_ref().unwrap(), + )?; + cluster_resources + .add(client, &config_map) + .await + .context(ApplyConfigMapSnafu)?; + + let sts = build_stateful_set( + &shs, + &resolved_product_image, + &rgr, + s3_log_dir.as_ref().unwrap(), + &config.resources, + )?; + cluster_resources + .add(client, &sts) + .await + .context(ApplyDeploymentSnafu)?; + } + } + + cluster_resources + .delete_orphaned_resources(client) + .await + .context(DeleteOrphanedResourcesSnafu)?; + + Ok(Action::await_change()) +} + +pub fn error_policy(_obj: Arc, _error: &Error, _ctx: Arc) -> Action { + Action::requeue(Duration::from_secs(5)) +} + +fn build_config_map( + shs: &SparkHistoryServer, + app_version_label: &str, + rolegroupref: &RoleGroupRef, + s3_log_dir: &S3LogDir, +) -> Result { + let spark_config = spark_config(shs, s3_log_dir, rolegroupref)?; + + let result = ConfigMapBuilder::new() + .metadata( + ObjectMetaBuilder::new() + .name_and_namespace(shs) + .name(rolegroupref.object_name()) + .ownerreference_from_resource(shs, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(labels(shs, app_version_label, &rolegroupref.role_group)) + .build(), + ) + .add_data(HISTORY_CONFIG_FILE_NAME, spark_config) + .build() + .context(InvalidConfigMapSnafu { + name: "spark-history-config".to_string(), + })?; + + Ok(result) +} + +fn build_stateful_set( + shs: &SparkHistoryServer, + resolved_product_image: &ResolvedProductImage, + rolegroupref: &RoleGroupRef, + s3_log_dir: &S3LogDir, + resources: &Resources, +) -> Result { + let container_name = "spark-history"; + let container = ContainerBuilder::new(container_name) + .context(InvalidContainerNameSnafu { + name: String::from(container_name), + })? + .image_from_product_image(resolved_product_image) + .resources(resources.clone().into()) + .command(vec!["/bin/bash".to_string()]) + .args(command_args(s3_log_dir)) + .add_container_port("http", 18080) + // This env var prevents the history server from detaching itself from the + // start script because this leads to the Pod terminating immediately. + .add_env_var("SPARK_NO_DAEMONIZE", "true") + .add_volume_mounts(s3_log_dir.credentials_volume_mount().into_iter()) + .add_volume_mount("config", "/stackable/spark/conf") + .build(); + + let template = PodBuilder::new() + .add_container(container) + .image_pull_secrets_from_product_image(resolved_product_image) + .add_volume( + VolumeBuilder::new("config") + .with_config_map(rolegroupref.object_name()) + .build(), + ) + .add_volumes(s3_log_dir.credentials_volume().into_iter().collect()) + .metadata_builder(|m| { + m.with_recommended_labels(labels( + shs, + &resolved_product_image.app_version_label, + &rolegroupref.role_group, + )) + }) + .security_context(PodSecurityContext { + run_as_user: Some(1000), + run_as_group: Some(1000), + fs_group: Some(1000), + ..PodSecurityContext::default() + }) + .build_template(); + + Ok(StatefulSet { + metadata: ObjectMetaBuilder::new() + .name_and_namespace(shs) + .ownerreference_from_resource(shs, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(labels( + shs, + &resolved_product_image.app_version_label, + rolegroupref.role_group.as_ref(), + )) + .build(), + spec: Some(StatefulSetSpec { + template, + replicas: shs.replicas(rolegroupref), + selector: LabelSelector { + match_labels: Some(role_group_selector_labels( + shs, + APP_NAME, + &rolegroupref.role, + &rolegroupref.role_group, + )), + ..LabelSelector::default() + }, + ..StatefulSetSpec::default() + }), + ..StatefulSet::default() + }) +} + +fn build_service( + shs: &SparkHistoryServer, + app_version_label: &str, + role: &str, + group: Option<&RoleGroupRef>, +) -> Result { + let group_name = match group { + Some(rgr) => rgr.role_group.clone(), + None => "global".to_owned(), + }; + + let (service_name, service_type) = match group { + Some(rgr) => (rgr.object_name(), "ClusterIP".to_string()), + None => ( + format!("{}-{}", shs.metadata.name.as_ref().unwrap(), role), + "NodePort".to_string(), + ), + }; + + let selector = match group { + Some(rgr) => role_group_selector_labels(shs, APP_NAME, &rgr.role, &rgr.role_group), + None => role_selector_labels(shs, APP_NAME, role), + }; + + Ok(Service { + metadata: ObjectMetaBuilder::new() + .name_and_namespace(shs) + .name(service_name) + .ownerreference_from_resource(shs, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(labels(shs, app_version_label, &group_name)) + .build(), + spec: Some(ServiceSpec { + ports: Some(vec![ServicePort { + name: Some(String::from("http")), + port: 18080, + ..ServicePort::default() + }]), + selector: Some(selector), + type_: Some(service_type), + ..ServiceSpec::default() + }), + status: None, + }) +} + +fn build_history_role_serviceaccount( + shs: &SparkHistoryServer, + app_version_label: &str, +) -> Result<(ServiceAccount, RoleBinding)> { + let sa = ServiceAccount { + metadata: ObjectMetaBuilder::new() + .name_and_namespace(shs) + .ownerreference_from_resource(shs, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(labels(shs, app_version_label, HISTORY_CONTROLLER_NAME)) + .build(), + ..ServiceAccount::default() + }; + let binding = RoleBinding { + metadata: ObjectMetaBuilder::new() + .name_and_namespace(shs) + .ownerreference_from_resource(shs, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(labels(shs, app_version_label, HISTORY_CONTROLLER_NAME)) + .build(), + role_ref: RoleRef { + api_group: ::GROUP // need to fully qualify because of "Resource" name clash + .to_string(), + kind: ::KIND.to_string(), + name: SPARK_CLUSTER_ROLE.to_string(), + }, + subjects: Some(vec![Subject { + api_group: Some( + ::GROUP.to_string(), + ), + kind: ::KIND.to_string(), + name: sa.name_any(), + namespace: sa.namespace(), + }]), + }; + Ok((sa, binding)) +} + +fn spark_config( + shs: &SparkHistoryServer, + s3_log_dir: &S3LogDir, + rolegroupref: &RoleGroupRef, +) -> Result { + let mut log_dir_settings = s3_log_dir.history_server_spark_config(); + + // add cleaner spark settings if requested + log_dir_settings.extend(cleaner_config(shs, rolegroupref)?); + + // add user provided configuration. These can overwrite everything. + log_dir_settings.extend(shs.spec.spark_conf.clone().unwrap_or_default()); + + // stringify the spark configuration for the ConfigMap + Ok(log_dir_settings + .iter() + .map(|(k, v)| format!("{k} {v}")) + .collect::>() + .join("\n")) +} + +fn command_args(s3logdir: &S3LogDir) -> Vec { + let mut command = vec![]; + + if let Some(secret_dir) = s3logdir.credentials_mount_path() { + command.extend(vec![ + format!("export AWS_ACCESS_KEY_ID=$(cat {secret_dir}/{ACCESS_KEY_ID})"), + "&&".to_string(), + format!("export AWS_SECRET_ACCESS_KEY=$(cat {secret_dir}/{SECRET_ACCESS_KEY})"), + "&&".to_string(), + ]); + } + command.extend(vec![ + "/stackable/spark/sbin/start-history-server.sh".to_string(), + "--properties-file".to_string(), + HISTORY_CONFIG_FILE_NAME_FULL.to_string(), + ]); + + vec![String::from("-c"), command.join(" ")] +} + +fn labels<'a, T>( + shs: &'a T, + app_version_label: &'a str, + role_group: &'a str, +) -> ObjectLabels<'a, T> { + ObjectLabels { + owner: shs, + app_name: APP_NAME, + app_version: app_version_label, + operator_name: OPERATOR_NAME, + controller_name: HISTORY_CONTROLLER_NAME, + role: HISTORY_ROLE_NAME, + role_group, + } +} + +/// Return the Spark properties for the cleaner role group (if any). +/// There should be only one role group with "cleaner=true" and this +/// group should have a replica count of 0 or 1. +fn cleaner_config( + shs: &SparkHistoryServer, + rolegroup_ref: &RoleGroupRef, +) -> Result, Error> { + let mut result = BTreeMap::new(); + + // all role groups with "cleaner=true" + let cleaner_rolegroups = shs.cleaner_rolegroups(); + + // should have max of one + if cleaner_rolegroups.len() > 1 { + return TooManyCleanerRoleGroupsSnafu.fail(); + } + + // check if cleaner is set for this rolegroup ref + if cleaner_rolegroups.len() == 1 && cleaner_rolegroups[0].role_group == rolegroup_ref.role_group + { + if let Some(replicas) = shs.replicas(rolegroup_ref) { + if replicas > 1 { + return TooManyCleanerReplicasSnafu.fail(); + } else { + result.insert( + "spark.history.fs.cleaner.enabled".to_string(), + "true".to_string(), + ); + } + } + } + + Ok(result) +} diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index 055e13e9..3b1c670f 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -1,3 +1,4 @@ +mod history_controller; mod pod_driver_controller; mod spark_k8s_controller; @@ -6,14 +7,18 @@ use std::sync::Arc; use clap::Parser; use futures::StreamExt; use stackable_operator::cli::{Command, ProductOperatorRun}; -use stackable_operator::k8s_openapi::api::core::v1::ConfigMap; +use stackable_operator::k8s_openapi::api::apps::v1::StatefulSet; use stackable_operator::k8s_openapi::api::core::v1::Pod; +use stackable_operator::k8s_openapi::api::core::v1::{ConfigMap, Service}; use stackable_operator::kube::api::ListParams; use stackable_operator::kube::runtime::controller::Controller; use stackable_operator::logging::controller::report_controller_reconciled; use stackable_operator::CustomResourceExt; -use stackable_spark_k8s_crd::CONTROLLER_NAME; -use stackable_spark_k8s_crd::{SparkApplication, OPERATOR_NAME}; +use stackable_spark_k8s_crd::constants::{ + CONTROLLER_NAME, HISTORY_CONTROLLER_NAME, OPERATOR_NAME, POD_DRIVER_CONTROLLER_NAME, +}; +use stackable_spark_k8s_crd::history::SparkHistoryServer; +use stackable_spark_k8s_crd::SparkApplication; use tracing::info_span; use tracing_futures::Instrument; @@ -21,8 +26,6 @@ mod built_info { include!(concat!(env!("OUT_DIR"), "/built.rs")); } -use crate::pod_driver_controller::POD_DRIVER_CONTROLLER_NAME; - #[derive(Parser)] #[clap(about = built_info::PKG_DESCRIPTION, author = stackable_operator::cli::AUTHOR)] struct Opts { @@ -36,9 +39,10 @@ async fn main() -> anyhow::Result<()> { match opts.cmd { Command::Crd => { SparkApplication::print_yaml_schema()?; + SparkHistoryServer::print_yaml_schema()?; } Command::Run(ProductOperatorRun { - product_config: _, + product_config, watch_namespace, tracing_target, }) => { @@ -56,6 +60,11 @@ async fn main() -> anyhow::Result<()> { built_info::RUSTC_VERSION, ); + let product_config = product_config.load(&[ + "deploy/config-spec/properties.yaml", + "/etc/stackable/spark-k8s-operator/config-spec/properties.yaml", + ])?; + let client = stackable_operator::client::create_client(Some(OPERATOR_NAME.to_string())).await?; @@ -101,12 +110,53 @@ async fn main() -> anyhow::Result<()> { client: client.clone(), }), ) - .map(|res| report_controller_reconciled(&client, POD_DRIVER_CONTROLLER_NAME, &res)) + .map(|res| report_controller_reconciled(&client, &format!("{OPERATOR_NAME}.{POD_DRIVER_CONTROLLER_NAME}"), &res)) .instrument(info_span!("pod_driver_controller")); - futures::stream::select(app_controller, pod_driver_controller) - .collect::<()>() - .await; + let history_controller = Controller::new( + watch_namespace.get_api::(&client), + ListParams::default(), + ) + .owns( + watch_namespace.get_api::(&client), + ListParams::default(), + ) + .owns( + watch_namespace.get_api::(&client), + ListParams::default(), + ) + .owns( + watch_namespace.get_api::(&client), + ListParams::default(), + ) + .owns( + watch_namespace.get_api::(&client), + ListParams::default(), + ) + .shutdown_on_signal() + .run( + history_controller::reconcile, + history_controller::error_policy, + Arc::new(history_controller::Ctx { + client: client.clone(), + product_config, + }), + ) + .map(|res| { + report_controller_reconciled( + &client, + &format!("{OPERATOR_NAME}.{HISTORY_CONTROLLER_NAME}"), + &res, + ) + }) + .instrument(info_span!("history_controller")); + + futures::stream::select( + futures::stream::select(app_controller, pod_driver_controller), + history_controller, + ) + .collect::<()>() + .await; } } Ok(()) diff --git a/rust/operator-binary/src/pod_driver_controller.rs b/rust/operator-binary/src/pod_driver_controller.rs index 20f870c3..9b420295 100644 --- a/rust/operator-binary/src/pod_driver_controller.rs +++ b/rust/operator-binary/src/pod_driver_controller.rs @@ -1,5 +1,7 @@ use stackable_operator::{k8s_openapi::api::core::v1::Pod, kube::runtime::controller::Action}; -use stackable_spark_k8s_crd::{SparkApplication, SparkApplicationStatus}; +use stackable_spark_k8s_crd::{ + constants::POD_DRIVER_CONTROLLER_NAME, SparkApplication, SparkApplicationStatus, +}; use std::sync::Arc; use std::time::Duration; @@ -7,8 +9,6 @@ use snafu::{OptionExt, ResultExt, Snafu}; use stackable_operator::logging::controller::ReconcilerError; use strum::{EnumDiscriminants, IntoStaticStr}; -pub const POD_DRIVER_CONTROLLER_NAME: &str = "pod-driver"; - const LABEL_NAME_INSTANCE: &str = "app.kubernetes.io/instance"; pub struct Ctx { diff --git a/rust/operator-binary/src/spark_k8s_controller.rs b/rust/operator-binary/src/spark_k8s_controller.rs index f2aebea6..408c2721 100644 --- a/rust/operator-binary/src/spark_k8s_controller.rs +++ b/rust/operator-binary/src/spark_k8s_controller.rs @@ -1,7 +1,7 @@ use snafu::{OptionExt, ResultExt, Snafu}; use stackable_operator::builder::{ConfigMapBuilder, ContainerBuilder, ObjectMetaBuilder}; -use stackable_operator::commons::s3::InlinedS3BucketSpec; +use stackable_operator::commons::s3::S3ConnectionSpec; use stackable_operator::commons::tls::{CaCert, TlsVerification}; use stackable_operator::k8s_openapi::api::batch::v1::{Job, JobSpec}; use stackable_operator::k8s_openapi::api::core::v1::{ @@ -12,13 +12,13 @@ use stackable_operator::k8s_openapi::api::rbac::v1::{ClusterRole, RoleBinding, R use stackable_operator::k8s_openapi::Resource; use stackable_operator::kube::runtime::controller::Action; use stackable_operator::logging::controller::ReconcilerError; +use stackable_spark_k8s_crd::constants::*; use stackable_spark_k8s_crd::SparkApplication; -use stackable_spark_k8s_crd::{constants::*, CONTROLLER_NAME}; use std::collections::BTreeMap; use std::{sync::Arc, time::Duration}; use strum::{EnumDiscriminants, IntoStaticStr}; -const SPARK_CLUSTER_ROLE: &str = "spark-k8s-clusterrole"; +use stackable_spark_k8s_crd::s3logdir::S3LogDir; pub struct Ctx { pub client: stackable_operator::client::Client, @@ -87,6 +87,10 @@ pub enum Error { source: stackable_operator::error::Error, container_name: String, }, + #[snafu(display("failed to resolve the s3 log dir configuration"))] + S3LogDir { + source: stackable_spark_k8s_crd::s3logdir::Error, + }, } type Result = std::result::Result; @@ -102,7 +106,7 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) let client = &ctx.client; - let s3bucket = match spark_application.spec.s3bucket.as_ref() { + let opt_s3conn = match spark_application.spec.s3connection.as_ref() { Some(s3bd) => s3bd .resolve( client, @@ -114,7 +118,7 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) _ => None, }; - if let Some(conn) = s3bucket.as_ref().and_then(|i| i.connection.as_ref()) { + if let Some(conn) = opt_s3conn.as_ref() { if let Some(tls) = &conn.tls { match &tls.verification { TlsVerification::None {} => return S3TlsNoVerificationNotSupportedSnafu.fail(), @@ -130,12 +134,13 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) } } - if let Some(conn) = s3bucket.as_ref().and_then(|i| i.connection.as_ref()) { - if conn.tls.as_ref().is_some() { - tracing::warn!("The resource indicates S3-access should use TLS: TLS-verification has not yet been implemented \ - but an HTTPS-endpoint will be used!"); - } - } + let s3logdir = S3LogDir::resolve( + spark_application.spec.log_file_directory.as_ref(), + spark_application.metadata.namespace.clone(), + client, + ) + .await + .context(S3LogDirSnafu)?; let (serviceaccount, rolebinding) = build_spark_role_serviceaccount(&spark_application)?; client @@ -198,7 +203,8 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) &spark_application, init_containers.as_ref(), &env_vars, - &s3bucket, + &opt_s3conn, + &s3logdir, )?; client .apply_patch( @@ -210,7 +216,11 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) .context(ApplyApplicationSnafu)?; let job_commands = spark_application - .build_command(serviceaccount.metadata.name.as_ref().unwrap(), &s3bucket) + .build_command( + serviceaccount.metadata.name.as_ref().unwrap(), + &opt_s3conn, + &s3logdir, + ) .context(BuildCommandSnafu)?; let job = spark_job( @@ -220,7 +230,8 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) &job_container, &env_vars, &job_commands, - &s3bucket, + &opt_s3conn, + &s3logdir, )?; client .apply_patch(CONTROLLER_NAME, &job, &job) @@ -298,16 +309,19 @@ fn pod_template_config_map( spark_application: &SparkApplication, init_containers: &[Container], env: &[EnvVar], - s3bucket: &Option, + s3conn: &Option, + s3logdir: &Option, ) -> Result { - let volumes = spark_application.volumes(s3bucket); + let volumes = spark_application.volumes(s3conn, s3logdir); let driver_template = pod_template( spark_application, CONTAINER_NAME_DRIVER, init_containers, volumes.as_ref(), - spark_application.driver_volume_mounts(s3bucket).as_ref(), + spark_application + .driver_volume_mounts(s3conn, s3logdir) + .as_ref(), env, spark_application.driver_node_selector(), )?; @@ -316,7 +330,9 @@ fn pod_template_config_map( CONTAINER_NAME_EXECUTOR, init_containers, volumes.as_ref(), - spark_application.executor_volume_mounts(s3bucket).as_ref(), + spark_application + .executor_volume_mounts(s3conn, s3logdir) + .as_ref(), env, spark_application.executor_node_selector(), )?; @@ -345,6 +361,7 @@ fn pod_template_config_map( .context(PodTemplateConfigMapSnafu) } +#[allow(clippy::too_many_arguments)] fn spark_job( spark_application: &SparkApplication, spark_image: &str, @@ -352,14 +369,15 @@ fn spark_job( job_container: &Option, env: &[EnvVar], job_commands: &[String], - s3bucket: &Option, + s3conn: &Option, + s3logdir: &Option, ) -> Result { let mut volume_mounts = vec![VolumeMount { name: VOLUME_MOUNT_NAME_POD_TEMPLATES.into(), mount_path: VOLUME_MOUNT_PATH_POD_TEMPLATES.into(), ..VolumeMount::default() }]; - volume_mounts.extend(spark_application.driver_volume_mounts(s3bucket)); + volume_mounts.extend(spark_application.driver_volume_mounts(s3conn, s3logdir)); let mut cb = ContainerBuilder::new("spark-submit").with_context(|_| IllegalContainerNameSnafu { @@ -394,7 +412,7 @@ fn spark_job( }), ..Volume::default() }]; - volumes.extend(spark_application.volumes(s3bucket)); + volumes.extend(spark_application.volumes(s3conn, s3logdir)); let pod = PodTemplateSpec { metadata: Some( diff --git a/tests/templates/kuttl/pyspark-ny-public-s3-image/01-prepare-bucket.yaml.j2 b/tests/templates/kuttl/pyspark-ny-public-s3-image/01-prepare-bucket.yaml.j2 index 9b84c9e9..8a90c2c5 100644 --- a/tests/templates/kuttl/pyspark-ny-public-s3-image/01-prepare-bucket.yaml.j2 +++ b/tests/templates/kuttl/pyspark-ny-public-s3-image/01-prepare-bucket.yaml.j2 @@ -2,6 +2,8 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep commands: + # give minio enough time to start + - command: sleep 5 - command: kubectl cp -n $NAMESPACE yellow_tripdata_2021-07.csv minio-client:/tmp - command: kubectl exec -n $NAMESPACE minio-client -- sh -c 'mc alias set test-minio http://test-minio:9000 $$MINIO_SERVER_ACCESS_KEY $$MINIO_SERVER_SECRET_KEY' - command: kubectl exec -n $NAMESPACE minio-client -- mc mb test-minio/my-bucket diff --git a/tests/templates/kuttl/pyspark-ny-public-s3-image/10-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/pyspark-ny-public-s3-image/10-deploy-spark-app.yaml.j2 index 3d4528bb..3895b3d8 100644 --- a/tests/templates/kuttl/pyspark-ny-public-s3-image/10-deploy-spark-app.yaml.j2 +++ b/tests/templates/kuttl/pyspark-ny-public-s3-image/10-deploy-spark-app.yaml.j2 @@ -16,15 +16,10 @@ spec: deps: requirements: - tabulate==0.8.9 - s3bucket: + s3connection: inline: - bucketName: my-bucket - connection: - inline: - host: test-minio - port: 9000 - accessStyle: Path - sparkConf: - spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider" + host: test-minio + port: 9000 + accessStyle: Path executor: instances: 3 diff --git a/tests/templates/kuttl/pyspark-ny-public-s3/01-prepare-bucket.yaml.j2 b/tests/templates/kuttl/pyspark-ny-public-s3/01-prepare-bucket.yaml.j2 index aa38a10b..0750d450 100644 --- a/tests/templates/kuttl/pyspark-ny-public-s3/01-prepare-bucket.yaml.j2 +++ b/tests/templates/kuttl/pyspark-ny-public-s3/01-prepare-bucket.yaml.j2 @@ -2,6 +2,8 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep commands: + # give minio enough time to start + - command: sleep 5 - command: kubectl cp -n $NAMESPACE ny_tlc_report.py minio-client:/tmp - command: kubectl cp -n $NAMESPACE yellow_tripdata_2021-07.csv minio-client:/tmp - command: kubectl exec -n $NAMESPACE minio-client -- sh -c 'mc alias set test-minio http://test-minio:9000 $$MINIO_SERVER_ACCESS_KEY $$MINIO_SERVER_SECRET_KEY' diff --git a/tests/templates/kuttl/pyspark-ny-public-s3/10-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/pyspark-ny-public-s3/10-deploy-spark-app.yaml.j2 index 43880f56..5bc5d917 100644 --- a/tests/templates/kuttl/pyspark-ny-public-s3/10-deploy-spark-app.yaml.j2 +++ b/tests/templates/kuttl/pyspark-ny-public-s3/10-deploy-spark-app.yaml.j2 @@ -15,15 +15,10 @@ spec: deps: requirements: - tabulate==0.8.9 - s3bucket: + s3connection: inline: - bucketName: my-bucket - connection: - inline: - host: test-minio - port: 9000 - accessStyle: Path - sparkConf: - spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider" + host: test-minio + port: 9000 + accessStyle: Path executor: instances: 3 diff --git a/tests/templates/kuttl/spark-history-server/00-assert.yaml b/tests/templates/kuttl/spark-history-server/00-assert.yaml new file mode 100644 index 00000000..863f6070 --- /dev/null +++ b/tests/templates/kuttl/spark-history-server/00-assert.yaml @@ -0,0 +1,25 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 900 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: test-minio +status: + readyReplicas: 1 +--- +apiVersion: v1 +kind: Pod +metadata: + name: minio-client + labels: + app: minio-client +status: + phase: Running +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: integration-tests-sa diff --git a/tests/templates/kuttl/spark-history-server/00-s3-connection.yaml b/tests/templates/kuttl/spark-history-server/00-s3-connection.yaml new file mode 100644 index 00000000..6a4d513e --- /dev/null +++ b/tests/templates/kuttl/spark-history-server/00-s3-connection.yaml @@ -0,0 +1,31 @@ +--- +apiVersion: s3.stackable.tech/v1alpha1 +kind: S3Connection +metadata: + name: spark-data-s3-connection +spec: + host: test-minio + port: 9000 + accessStyle: Path + credentials: + secretClass: s3-credentials-class +--- +apiVersion: s3.stackable.tech/v1alpha1 +kind: S3Connection +metadata: + name: spark-history-s3-connection +spec: + host: eventlog-minio + port: 9000 + accessStyle: Path + credentials: + secretClass: history-credentials-class +--- +apiVersion: s3.stackable.tech/v1alpha1 +kind: S3Bucket +metadata: + name: spark-history-s3-bucket +spec: + bucketName: spark-logs + connection: + reference: spark-history-s3-connection diff --git a/tests/templates/kuttl/spark-history-server/00-s3-secret.yaml b/tests/templates/kuttl/spark-history-server/00-s3-secret.yaml new file mode 100644 index 00000000..2e3675f7 --- /dev/null +++ b/tests/templates/kuttl/spark-history-server/00-s3-secret.yaml @@ -0,0 +1,48 @@ +--- +apiVersion: v1 +kind: Secret +metadata: + name: minio-credentials + labels: + secrets.stackable.tech/class: s3-credentials-class +stringData: + accessKey: minioAccessKey + secretKey: minioSecretKey + # The following two entries are used by the Bitnami chart for MinIO to + # set up credentials for accessing buckets managed by the MinIO tenant. + root-user: minioAccessKey + root-password: minioSecretKey +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: s3-credentials-class +spec: + backend: + k8sSearch: + searchNamespace: + pod: {} +--- +apiVersion: v1 +kind: Secret +metadata: + name: history-credentials + labels: + secrets.stackable.tech/class: history-credentials-class +stringData: + accessKey: eventLogAccessKey + secretKey: eventLogSecretKey + # The following two entries are used by the Bitnami chart for MinIO to + # set up credentials for accessing buckets managed by the MinIO tenant. + root-user: eventLogAccessKey + root-password: eventLogSecretKey +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: history-credentials-class +spec: + backend: + k8sSearch: + searchNamespace: + pod: {} diff --git a/tests/templates/kuttl/spark-history-server/00-serviceaccount.yaml.j2 b/tests/templates/kuttl/spark-history-server/00-serviceaccount.yaml.j2 new file mode 100644 index 00000000..9cbf0351 --- /dev/null +++ b/tests/templates/kuttl/spark-history-server/00-serviceaccount.yaml.j2 @@ -0,0 +1,29 @@ +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: use-integration-tests-scc +rules: +{% if test_scenario['values']['openshift'] == "true" %} + - apiGroups: ["security.openshift.io"] + resources: ["securitycontextconstraints"] + resourceNames: ["privileged"] + verbs: ["use"] +{% endif %} +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: integration-tests-sa +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: use-integration-tests-scc +subjects: + - kind: ServiceAccount + name: integration-tests-sa +roleRef: + kind: Role + name: use-integration-tests-scc + apiGroup: rbac.authorization.k8s.io diff --git a/tests/templates/kuttl/spark-history-server/00-setup-minio.yaml b/tests/templates/kuttl/spark-history-server/00-setup-minio.yaml new file mode 100644 index 00000000..c88715cc --- /dev/null +++ b/tests/templates/kuttl/spark-history-server/00-setup-minio.yaml @@ -0,0 +1,84 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: >- + helm install test-minio + --namespace $NAMESPACE + --version 11.9.2 + -f helm-bitnami-minio-values.yaml + --repo https://charts.bitnami.com/bitnami minio + - script: >- + helm install eventlog-minio + --namespace $NAMESPACE + --version 11.9.2 + -f helm-bitnami-eventlog-minio-values.yaml + --repo https://charts.bitnami.com/bitnami minio +--- +apiVersion: v1 +kind: Pod +metadata: + name: minio-client + labels: + app: minio-client +spec: + restartPolicy: Never + containers: + - name: minio-client + image: docker.io/bitnami/minio-client:2022.8.11-debian-11-r3 + command: ["bash", "-c", "sleep infinity"] + stdin: true + tty: true + env: + - name: MINIO_SERVER_ACCESS_KEY + valueFrom: + secretKeyRef: + name: minio-credentials + key: root-user + optional: false + - name: MINIO_SERVER_SECRET_KEY + valueFrom: + secretKeyRef: + name: minio-credentials + key: root-password + optional: false + - name: MINIO_SERVER_HOST + value: test-minio + - name: MINIO_SERVER_PORT_NUMBER + value: "9000" + - name: MINIO_SERVER_SCHEME + value: http +--- +apiVersion: v1 +kind: Pod +metadata: + name: eventlog-minio-client + labels: + app: eventlog-minio-client +spec: + restartPolicy: Never + containers: + - name: minio-client + image: docker.io/bitnami/minio-client:2022.8.11-debian-11-r3 + command: ["bash", "-c", "sleep infinity"] + stdin: true + tty: true + env: + - name: MINIO_SERVER_ACCESS_KEY + valueFrom: + secretKeyRef: + name: history-credentials + key: root-user + optional: false + - name: MINIO_SERVER_SECRET_KEY + valueFrom: + secretKeyRef: + name: history-credentials + key: root-password + optional: false + - name: MINIO_SERVER_HOST + value: eventlog-minio + - name: MINIO_SERVER_PORT_NUMBER + value: "9000" + - name: MINIO_SERVER_SCHEME + value: http diff --git a/tests/templates/kuttl/spark-history-server/01-prepare-bucket.yaml.j2 b/tests/templates/kuttl/spark-history-server/01-prepare-bucket.yaml.j2 new file mode 100644 index 00000000..e85687a1 --- /dev/null +++ b/tests/templates/kuttl/spark-history-server/01-prepare-bucket.yaml.j2 @@ -0,0 +1,13 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + # give minio enough time to start + - command: sleep 5 + - command: kubectl cp -n $NAMESPACE spark-examples_2.12-{{ test_scenario['values']['spark'] }}.jar minio-client:/tmp + - command: kubectl exec -n $NAMESPACE minio-client -- sh -c 'mc alias set test-minio http://test-minio:9000 $$MINIO_SERVER_ACCESS_KEY $$MINIO_SERVER_SECRET_KEY' + - command: kubectl exec -n $NAMESPACE minio-client -- mc mb test-minio/my-bucket + - command: kubectl exec -n $NAMESPACE eventlog-minio-client -- sh -c 'mc alias set eventlog-minio http://eventlog-minio:9000 $$MINIO_SERVER_ACCESS_KEY $$MINIO_SERVER_SECRET_KEY' + - command: kubectl exec -n $NAMESPACE eventlog-minio-client -- mc mb eventlog-minio/spark-logs/eventlogs + - script: >- + kubectl exec -n $NAMESPACE minio-client -- mc cp /tmp/spark-examples_2.12-{{ test_scenario['values']['spark'] }}.jar test-minio/my-bucket diff --git a/tests/templates/kuttl/spark-history-server/05-assert.yaml b/tests/templates/kuttl/spark-history-server/05-assert.yaml new file mode 100644 index 00000000..af9c593a --- /dev/null +++ b/tests/templates/kuttl/spark-history-server/05-assert.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 900 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: spark-history +status: + readyReplicas: 1 diff --git a/tests/templates/kuttl/spark-history-server/05-deploy-history-server.yaml.j2 b/tests/templates/kuttl/spark-history-server/05-deploy-history-server.yaml.j2 new file mode 100644 index 00000000..8efc1d1a --- /dev/null +++ b/tests/templates/kuttl/spark-history-server/05-deploy-history-server.yaml.j2 @@ -0,0 +1,22 @@ +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkHistoryServer +metadata: + name: spark-history +spec: + image: + productVersion: "{{ test_scenario['values']['spark'] }}" + stackableVersion: "{{ test_scenario['values']['stackable'] }}" + logFileDirectory: + s3: + prefix: eventlogs/ + bucket: + reference: spark-history-s3-bucket + # For possible properties see: https://spark.apache.org/docs/latest/monitoring.html#spark-history-server-configuration-options + #sparkConf: + nodes: + roleGroups: + cleaner: + replicas: 1 + config: + cleaner: true diff --git a/tests/templates/kuttl/spark-history-server/10-assert.yaml b/tests/templates/kuttl/spark-history-server/10-assert.yaml new file mode 100644 index 00000000..38367193 --- /dev/null +++ b/tests/templates/kuttl/spark-history-server/10-assert.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 900 +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkApplication +metadata: + name: spark-pi-s3-1 +status: + phase: Succeeded diff --git a/tests/templates/kuttl/spark-history-server/10-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/spark-history-server/10-deploy-spark-app.yaml.j2 new file mode 100644 index 00000000..348554ed --- /dev/null +++ b/tests/templates/kuttl/spark-history-server/10-deploy-spark-app.yaml.j2 @@ -0,0 +1,21 @@ +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkApplication +metadata: + name: spark-pi-s3-1 +spec: + version: "1.0" + sparkImage: docker.stackable.tech/stackable/spark-k8s:{{ test_scenario['values']['spark'] }}-stackable{{ test_scenario['values']['stackable'] }} + sparkImagePullPolicy: IfNotPresent + mode: cluster + mainClass: org.apache.spark.examples.SparkPi + mainApplicationFile: s3a://my-bucket/spark-examples_2.12-{{ test_scenario['values']['spark'] }}.jar + s3connection: + reference: spark-data-s3-connection + logFileDirectory: + s3: + prefix: eventlogs/ + bucket: + reference: spark-history-s3-bucket + executor: + instances: 1 diff --git a/tests/templates/kuttl/spark-history-server/12-assert.yaml b/tests/templates/kuttl/spark-history-server/12-assert.yaml new file mode 100644 index 00000000..f257e6cd --- /dev/null +++ b/tests/templates/kuttl/spark-history-server/12-assert.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 900 +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkApplication +metadata: + name: spark-pi-s3-2 +status: + phase: Succeeded diff --git a/tests/templates/kuttl/spark-history-server/12-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/spark-history-server/12-deploy-spark-app.yaml.j2 new file mode 100644 index 00000000..19b12422 --- /dev/null +++ b/tests/templates/kuttl/spark-history-server/12-deploy-spark-app.yaml.j2 @@ -0,0 +1,21 @@ +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkApplication +metadata: + name: spark-pi-s3-2 +spec: + version: "1.0" + sparkImage: docker.stackable.tech/stackable/spark-k8s:{{ test_scenario['values']['spark'] }}-stackable{{ test_scenario['values']['stackable'] }} + sparkImagePullPolicy: IfNotPresent + mode: cluster + mainClass: org.apache.spark.examples.SparkPi + mainApplicationFile: s3a://my-bucket/spark-examples_2.12-{{ test_scenario['values']['spark'] }}.jar + s3connection: + reference: spark-data-s3-connection + logFileDirectory: + s3: + prefix: eventlogs/ + bucket: + reference: spark-history-s3-bucket + executor: + instances: 1 diff --git a/tests/templates/kuttl/spark-history-server/20-assert.yaml b/tests/templates/kuttl/spark-history-server/20-assert.yaml new file mode 100644 index 00000000..661f45eb --- /dev/null +++ b/tests/templates/kuttl/spark-history-server/20-assert.yaml @@ -0,0 +1,13 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: history-api-check +timeout: 180 +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: history-api-check +status: + succeeded: 1 diff --git a/tests/templates/kuttl/spark-history-server/20-test-logs.yaml b/tests/templates/kuttl/spark-history-server/20-test-logs.yaml new file mode 100644 index 00000000..f7d8986d --- /dev/null +++ b/tests/templates/kuttl/spark-history-server/20-test-logs.yaml @@ -0,0 +1,20 @@ +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: history-api-check +spec: + template: + spec: + restartPolicy: OnFailure + activeDeadlineSeconds: 100 + containers: + - name: history-api-check + image: docker.stackable.tech/stackable/testing-tools:0.1.0-stackable0.1.0 + command: + [ + "bash", + "-x", + "-c", + "test 2 == $(curl http://spark-history-node-cleaner:18080/api/v1/applications | jq length)", + ] diff --git a/tests/templates/kuttl/spark-history-server/helm-bitnami-eventlog-minio-values.yaml b/tests/templates/kuttl/spark-history-server/helm-bitnami-eventlog-minio-values.yaml new file mode 100644 index 00000000..bcb802a2 --- /dev/null +++ b/tests/templates/kuttl/spark-history-server/helm-bitnami-eventlog-minio-values.yaml @@ -0,0 +1,23 @@ +--- +volumePermissions: + enabled: false + +podSecurityContext: + enabled: false + +containerSecurityContext: + enabled: false + +mode: standalone + +disableWebUI: true + +persistence: + enabled: false + +resources: + requests: + memory: 1Gi + +auth: + existingSecret: history-credentials diff --git a/tests/templates/kuttl/spark-history-server/helm-bitnami-minio-values.yaml b/tests/templates/kuttl/spark-history-server/helm-bitnami-minio-values.yaml new file mode 100644 index 00000000..c8891024 --- /dev/null +++ b/tests/templates/kuttl/spark-history-server/helm-bitnami-minio-values.yaml @@ -0,0 +1,23 @@ +--- +volumePermissions: + enabled: false + +podSecurityContext: + enabled: false + +containerSecurityContext: + enabled: false + +mode: standalone + +disableWebUI: true + +persistence: + enabled: false + +resources: + requests: + memory: 1Gi + +auth: + existingSecret: minio-credentials diff --git a/tests/templates/kuttl/spark-history-server/spark-examples_2.12-3.3.0.jar b/tests/templates/kuttl/spark-history-server/spark-examples_2.12-3.3.0.jar new file mode 100644 index 00000000..c5c34c3a Binary files /dev/null and b/tests/templates/kuttl/spark-history-server/spark-examples_2.12-3.3.0.jar differ diff --git a/tests/templates/kuttl/spark-ny-public-s3/01-prepare-bucket.yaml.j2 b/tests/templates/kuttl/spark-ny-public-s3/01-prepare-bucket.yaml.j2 index 00d89bf2..a5fdd4bb 100644 --- a/tests/templates/kuttl/spark-ny-public-s3/01-prepare-bucket.yaml.j2 +++ b/tests/templates/kuttl/spark-ny-public-s3/01-prepare-bucket.yaml.j2 @@ -2,6 +2,8 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep commands: + # give minio enough time to start + - command: sleep 5 - command: kubectl cp -n $NAMESPACE ny-tlc-report-1.1.0-{{ test_scenario['values']['spark'] }}.jar minio-client:/tmp - command: kubectl cp -n $NAMESPACE yellow_tripdata_2021-07.csv minio-client:/tmp - command: kubectl exec -n $NAMESPACE minio-client -- sh -c 'mc alias set test-minio http://test-minio:9000 $$MINIO_SERVER_ACCESS_KEY $$MINIO_SERVER_SECRET_KEY' diff --git a/tests/templates/kuttl/spark-ny-public-s3/10-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/spark-ny-public-s3/10-deploy-spark-app.yaml.j2 index 66dd0678..aab301bc 100644 --- a/tests/templates/kuttl/spark-ny-public-s3/10-deploy-spark-app.yaml.j2 +++ b/tests/templates/kuttl/spark-ny-public-s3/10-deploy-spark-app.yaml.j2 @@ -24,16 +24,11 @@ spec: name: cm-job-arguments args: - "--input /arguments/job-args.txt" - s3bucket: + s3connection: inline: - bucketName: my-bucket - connection: - inline: - host: test-minio - port: 9000 - accessStyle: Path - sparkConf: - spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider" + host: test-minio + port: 9000 + accessStyle: Path driver: volumeMounts: - name: cm-job-arguments diff --git a/tests/templates/kuttl/spark-pi-private-s3/00-s3-secret.yaml b/tests/templates/kuttl/spark-pi-private-s3/00-s3-secret.yaml index 0845c0a9..5c78faeb 100644 --- a/tests/templates/kuttl/spark-pi-private-s3/00-s3-secret.yaml +++ b/tests/templates/kuttl/spark-pi-private-s3/00-s3-secret.yaml @@ -7,8 +7,8 @@ metadata: secrets.stackable.tech/class: s3-credentials-class timeout: 240 stringData: - accessKeyId: minioAccessKey - secretAccessKey: minioSecretKey + accessKey: minioAccessKey + secretKey: minioSecretKey # The following two entries are used by the Bitnami chart for MinIO to # set up credentials for accessing buckets managed by the MinIO tenant. root-user: minioAccessKey diff --git a/tests/templates/kuttl/spark-pi-private-s3/01-prepare-bucket.yaml.j2 b/tests/templates/kuttl/spark-pi-private-s3/01-prepare-bucket.yaml.j2 index d917f328..8e217053 100644 --- a/tests/templates/kuttl/spark-pi-private-s3/01-prepare-bucket.yaml.j2 +++ b/tests/templates/kuttl/spark-pi-private-s3/01-prepare-bucket.yaml.j2 @@ -2,6 +2,8 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep commands: + # give minio enough time to start + - command: sleep 5 - command: kubectl cp -n $NAMESPACE spark-examples_2.12-{{ test_scenario['values']['spark'] }}.jar minio-client:/tmp - command: kubectl exec -n $NAMESPACE minio-client -- sh -c 'mc alias set test-minio http://test-minio:9000 $$MINIO_SERVER_ACCESS_KEY $$MINIO_SERVER_SECRET_KEY' - command: kubectl exec -n $NAMESPACE minio-client -- mc mb test-minio/my-bucket diff --git a/tests/templates/kuttl/spark-pi-private-s3/10-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/spark-pi-private-s3/10-deploy-spark-app.yaml.j2 index 762f217a..7f320c65 100644 --- a/tests/templates/kuttl/spark-pi-private-s3/10-deploy-spark-app.yaml.j2 +++ b/tests/templates/kuttl/spark-pi-private-s3/10-deploy-spark-app.yaml.j2 @@ -10,17 +10,12 @@ spec: mode: cluster mainClass: org.apache.spark.examples.SparkPi mainApplicationFile: s3a://my-bucket/spark-examples_2.12-{{ test_scenario['values']['spark'] }}.jar - s3bucket: + s3connection: inline: - bucketName: my-bucket - connection: - inline: - host: test-minio - port: 9000 - accessStyle: Path - credentials: - secretClass: s3-credentials-class - sparkConf: - spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" + host: test-minio + port: 9000 + accessStyle: Path + credentials: + secretClass: s3-credentials-class executor: instances: 1 diff --git a/tests/templates/kuttl/spark-pi-public-s3/01-prepare-bucket.yaml.j2 b/tests/templates/kuttl/spark-pi-public-s3/01-prepare-bucket.yaml.j2 index 6ac27d07..c5198ed5 100644 --- a/tests/templates/kuttl/spark-pi-public-s3/01-prepare-bucket.yaml.j2 +++ b/tests/templates/kuttl/spark-pi-public-s3/01-prepare-bucket.yaml.j2 @@ -2,6 +2,8 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep commands: + # give minio enough time to start + - command: sleep 5 - command: kubectl cp -n $NAMESPACE spark-examples_2.12-{{ test_scenario['values']['spark'] }}.jar minio-client:/tmp - command: kubectl exec -n $NAMESPACE minio-client -- sh -c 'mc alias set test-minio http://test-minio:9000 $$MINIO_SERVER_ACCESS_KEY $$MINIO_SERVER_SECRET_KEY' - command: kubectl exec -n $NAMESPACE minio-client -- mc mb test-minio/my-bucket diff --git a/tests/templates/kuttl/spark-pi-public-s3/10-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/spark-pi-public-s3/10-deploy-spark-app.yaml.j2 index aeef1180..3844ae70 100644 --- a/tests/templates/kuttl/spark-pi-public-s3/10-deploy-spark-app.yaml.j2 +++ b/tests/templates/kuttl/spark-pi-public-s3/10-deploy-spark-app.yaml.j2 @@ -10,15 +10,10 @@ spec: mode: cluster mainClass: org.apache.spark.examples.SparkPi mainApplicationFile: s3a://my-bucket/spark-examples_2.12-{{ test_scenario['values']['spark'] }}.jar - s3bucket: + s3connection: inline: - bucketName: my-bucket - connection: - inline: - host: test-minio - port: 9000 - accessStyle: Path - sparkConf: - spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider" + host: test-minio + port: 9000 + accessStyle: Path executor: instances: 1 diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index 805036e8..369925a8 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -18,25 +18,26 @@ dimensions: values: - 0.1.0 tests: - - name: spark-pi-private-s3 + - name: spark-history-server dimensions: - spark - stackable - openshift - - name: spark-pi-public-s3 + - name: spark-pi-private-s3 dimensions: - spark - stackable - openshift - - name: spark-ny-public-s3 + - name: spark-pi-public-s3 dimensions: - spark - stackable - openshift - - name: node-selector + - name: spark-ny-public-s3 dimensions: - spark - stackable + - openshift - name: spark-examples dimensions: - spark