Skip to content

Commit 519ec08

Browse files
razvanadwk67
andauthored
Deploy apps with dynamic dependencies. (#281)
* Create ivy2 volume when installing packages. * wip * Re-add pod overrides for the submit pod. * Unify driver and executor configurations. * Successful smoke tests. * Rename config structs. * smoke tests pass on kind. * Refactor, cleanup and split configuration between driver and executor again. * Update docs, examples and changelog. * fix typo * fix typo * Move replicas under spec.executor.config in tests and examples. * use spark.jars.packages instead of --packages * Added test with pyspark and iceberg dependency. * update docs * Clean up tests. * Clean up tests. * Apply suggestions * Remove old node selector struct * Added module doc for roles.rs * Use RoleGroup for executors to make replicas on the same level as executor configuration. * Update tests with "replicas" directly under "executor". * Update docs/examples with "replicas" directly under "executor". * Update rust/crd/src/roles.rs Co-authored-by: Andrew Kenworthy <[email protected]> * Implement review feedback. * Update CHANGELOG.md * Fix test for 3.3 and update docs. --------- Co-authored-by: Andrew Kenworthy <[email protected]>
1 parent 3802141 commit 519ec08

File tree

9 files changed

+174
-11
lines changed

9 files changed

+174
-11
lines changed

CHANGELOG.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,19 @@ All notable changes to this project will be documented in this file.
1515
- `operator-rs` `0.44.0` -> `0.48.0` ([#267], [#275]).
1616
- Removed usages of SPARK_DAEMON_JAVA_OPTS since it's not a reliable way to pass extra JVM options ([#272]).
1717
- [BREAKING] use product image selection instead of version ([#275]).
18-
- BREAKING refactored application roles to use `CommonConfiguration` structures from the operator framework ([#277]).
18+
- [BREAKING] refactored application roles to use `CommonConfiguration` structures from the operator framework ([#277]).
19+
20+
### Fixed
21+
22+
- Dynamic loading of Maven packages ([#281]).
1923

2024
[#267]: https://github.com/stackabletech/spark-k8s-operator/pull/267
2125
[#268]: https://github.com/stackabletech/spark-k8s-operator/pull/268
2226
[#269]: https://github.com/stackabletech/spark-k8s-operator/pull/269
2327
[#272]: https://github.com/stackabletech/spark-k8s-operator/pull/272
2428
[#275]: https://github.com/stackabletech/spark-k8s-operator/pull/275
2529
[#277]: https://github.com/stackabletech/spark-k8s-operator/pull/277
30+
[#281]: https://github.com/stackabletech/spark-k8s-operator/pull/281
2631

2732
## [23.7.0] - 2023-07-14
2833

docs/modules/spark-k8s/pages/usage-guide/job-dependencies.adoc

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -90,25 +90,28 @@ include::example$example-pvc.yaml[]
9090

9191
=== Spark native package coordinates and Python requirements
9292

93-
IMPORTANT: With the platform release 23.4.1 (and all previous releases), dynamic provisioning of dependencies using the Spark `packages` field doesn't work. This is a known problem with Spark and is tracked https://github.com/stackabletech/spark-k8s-operator/issues/141[here].
94-
9593
The last and most flexible way to provision dependencies is to use the built-in `spark-submit` support for Maven package coordinates.
9694

97-
These can be specified by adding the following section to the `SparkApplication` manifest file:
95+
The snippet below showcases how to add Apache Iceberg support to a Spark (version 3.4.x) application.
9896

9997
[source,yaml]
10098
----
10199
spec:
100+
sparkConf:
101+
spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
102+
spark.sql.catalog.spark_catalog: org.apache.iceberg.spark.SparkSessionCatalog
103+
spark.sql.catalog.spark_catalog.type: hive
104+
spark.sql.catalog.local: org.apache.iceberg.spark.SparkCatalog
105+
spark.sql.catalog.local.type: hadoop
106+
spark.sql.catalog.local.warehouse: /tmp/warehouse
102107
deps:
103-
repositories:
104-
- https://repository.example.com/prod
105108
packages:
106-
- com.example:some-package:1.0.0
107-
excludePackages:
108-
- com.example:other-package
109+
- org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.3.1
109110
----
110111

111-
These directly translate to the spark-submit parameters `--packages`, `--exclude-packages`, and `--repositories`.
112+
IMPORTANT: Currently it's not possible to provision dependencies that are loaded by the JVM's (system class loader)[https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/ClassLoader.html#getSystemClassLoader()]. Such dependencies include JDBC drivers. If you need access to JDBC sources from your Spark application, consider building your own custom Spark image.
113+
114+
IMPORTANT: Spark version 3.3.x has a https://issues.apache.org/jira/browse/SPARK-35084[known bug] that prevents this mechanism to work.
112115

113116
When submitting PySpark jobs, users can specify `pip` requirements that are installed before the driver and executor pods are created.
114117

rust/crd/src/constants.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ use stackable_operator::memory::{BinaryMultiple, MemoryQuantity};
22

33
pub const APP_NAME: &str = "spark-k8s";
44

5+
pub const VOLUME_MOUNT_NAME_IVY2: &str = "ivy2";
6+
pub const VOLUME_MOUNT_PATH_IVY2: &str = "/ivy2";
7+
58
pub const VOLUME_MOUNT_NAME_DRIVER_POD_TEMPLATES: &str = "driver-pod-template";
69
pub const VOLUME_MOUNT_PATH_DRIVER_POD_TEMPLATES: &str = "/stackable/spark/driver-pod-templates";
710

rust/crd/src/lib.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,14 @@ impl SparkApplication {
182182
.map(|req| req.join(" "))
183183
}
184184

185+
pub fn packages(&self) -> Vec<String> {
186+
self.spec
187+
.deps
188+
.as_ref()
189+
.and_then(|deps| deps.packages.clone())
190+
.unwrap_or_default()
191+
}
192+
185193
pub fn volumes(
186194
&self,
187195
s3conn: &Option<S3ConnectionSpec>,
@@ -242,6 +250,13 @@ impl SparkApplication {
242250
.build(),
243251
);
244252

253+
if !self.packages().is_empty() {
254+
result.push(
255+
VolumeBuilder::new(VOLUME_MOUNT_NAME_IVY2)
256+
.empty_dir(EmptyDirVolumeSource::default())
257+
.build(),
258+
);
259+
}
245260
if let Some(cert_secrets) = tlscerts::tls_secret_names(s3conn, s3logdir) {
246261
result.push(
247262
VolumeBuilder::new(STACKABLE_TRUST_STORE_NAME)
@@ -332,6 +347,13 @@ impl SparkApplication {
332347
..VolumeMount::default()
333348
});
334349

350+
if !self.packages().is_empty() {
351+
mounts.push(VolumeMount {
352+
name: VOLUME_MOUNT_NAME_IVY2.into(),
353+
mount_path: VOLUME_MOUNT_PATH_IVY2.into(),
354+
..VolumeMount::default()
355+
});
356+
}
335357
if let Some(cert_secrets) = tlscerts::tls_secret_names(s3conn, s3logdir) {
336358
mounts.push(VolumeMount {
337359
name: STACKABLE_TRUST_STORE_NAME.into(),
@@ -458,7 +480,10 @@ impl SparkApplication {
458480
deps.repositories
459481
.map(|r| format!("--repositories {}", r.join(","))),
460482
);
461-
submit_cmd.extend(deps.packages.map(|p| format!("--packages {}", p.join(","))));
483+
submit_cmd.extend(
484+
deps.packages
485+
.map(|p| format!("--conf spark.jars.packages={}", p.join(","))),
486+
);
462487
}
463488

464489
// some command elements need to be initially stored in a map (to allow overwrites) and
@@ -498,6 +523,10 @@ impl SparkApplication {
498523
submit_conf.extend(log_dir.application_spark_config());
499524
}
500525

526+
if !self.packages().is_empty() {
527+
submit_cmd.push(format!("--conf spark.jars.ivy={VOLUME_MOUNT_PATH_IVY2}"))
528+
}
529+
501530
// conf arguments: these should follow - and thus override - values set from resource limits above
502531
if let Some(spark_conf) = self.spec.spark_conf.clone() {
503532
submit_conf.extend(spark_conf);
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
apiVersion: kuttl.dev/v1beta1
3+
kind: TestAssert
4+
{% if lookup('env', 'VECTOR_AGGREGATOR') %}
5+
---
6+
apiVersion: v1
7+
kind: ConfigMap
8+
metadata:
9+
name: vector-aggregator-discovery
10+
{% endif %}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{% if lookup('env', 'VECTOR_AGGREGATOR') %}
2+
---
3+
apiVersion: v1
4+
kind: ConfigMap
5+
metadata:
6+
name: vector-aggregator-discovery
7+
data:
8+
ADDRESS: {{ lookup('env', 'VECTOR_AGGREGATOR') }}
9+
{% endif %}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
---
2+
apiVersion: kuttl.dev/v1beta1
3+
kind: TestAssert
4+
timeout: 300
5+
---
6+
# The Job starting the whole process
7+
apiVersion: spark.stackable.tech/v1alpha1
8+
kind: SparkApplication
9+
metadata:
10+
name: pyspark-iceberg
11+
status:
12+
{% if test_scenario['values']['spark'].startswith("3.3") %}
13+
# Spark 3.3 is expected to fail because of this https://issues.apache.org/jira/browse/SPARK-35084
14+
phase: Failed
15+
{% else %}
16+
phase: Succeeded
17+
{% endif %}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
---
2+
apiVersion: spark.stackable.tech/v1alpha1
3+
kind: SparkApplication
4+
metadata:
5+
name: pyspark-iceberg
6+
spec:
7+
version: "1.0"
8+
{% if lookup('env', 'VECTOR_AGGREGATOR') %}
9+
vectorAggregatorConfigMapName: vector-aggregator-discovery
10+
{% endif %}
11+
sparkImage:
12+
{% if test_scenario['values']['spark'].find(",") > 0 %}
13+
custom: "{{ test_scenario['values']['spark'].split(',')[1] }}"
14+
productVersion: "{{ test_scenario['values']['spark'].split(',')[0] }}"
15+
{% else %}
16+
productVersion: "{{ test_scenario['values']['spark'] }}"
17+
{% endif %}
18+
pullPolicy: IfNotPresent
19+
mode: cluster
20+
mainApplicationFile: "local:///stackable/spark/jobs/write-to-iceberg.py"
21+
sparkConf:
22+
spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
23+
spark.sql.catalog.spark_catalog: org.apache.iceberg.spark.SparkSessionCatalog
24+
spark.sql.catalog.spark_catalog.type: hive
25+
spark.sql.catalog.local: org.apache.iceberg.spark.SparkCatalog
26+
spark.sql.catalog.local.type: hadoop
27+
spark.sql.catalog.local.warehouse: /tmp/warehouse
28+
job:
29+
logging:
30+
enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }}
31+
driver:
32+
config:
33+
logging:
34+
enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }}
35+
volumeMounts:
36+
- name: script
37+
mountPath: /stackable/spark/jobs
38+
executor:
39+
replicas: 1
40+
config:
41+
logging:
42+
enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }}
43+
volumeMounts:
44+
- name: script
45+
mountPath: /stackable/spark/jobs
46+
deps:
47+
packages:
48+
- org.apache.iceberg:iceberg-spark-runtime-{{ test_scenario['values']['spark'].rstrip('.0') }}_2.12:1.3.1
49+
volumes:
50+
- name: script
51+
configMap:
52+
name: write-to-iceberg
53+
---
54+
apiVersion: v1
55+
kind: ConfigMap
56+
metadata:
57+
name: write-to-iceberg
58+
data:
59+
write-to-iceberg.py: |
60+
from pyspark.sql import SparkSession
61+
from pyspark.sql.types import *
62+
63+
spark = SparkSession.builder.appName("write-to-iceberg").getOrCreate()
64+
65+
schema = StructType([
66+
StructField("id", LongType(), True),
67+
StructField("data", StringType(), True)
68+
])
69+
70+
71+
# create table
72+
df = spark.createDataFrame([], schema)
73+
df.writeTo("local.db.table").create()
74+
75+
# append to table
76+
data = [
77+
(1,"one"),
78+
(2,"two"),
79+
(3,"three"),
80+
(4,"four")
81+
]
82+
83+
df = spark.createDataFrame(data, schema)
84+
df.writeTo("local.db.table").append()

tests/test-definition.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ tests:
6767
- spark
6868
- ny-tlc-report
6969
- openshift
70+
- name: iceberg
71+
dimensions:
72+
- spark
7073
suites:
7174
- name: nightly
7275
patch:

0 commit comments

Comments
 (0)