Skip to content

Executors don't resolve dependencies #245

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions rust/crd/src/constants.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
pub const APP_NAME: &str = "spark-k8s";

pub const VOLUME_MOUNT_NAME_IVY2: &str = "ivy2";
pub const VOLUME_MOUNT_PATH_IVY2: &str = "/ivy2";

pub const VOLUME_MOUNT_NAME_DRIVER_POD_TEMPLATES: &str = "driver-pod-template";
pub const VOLUME_MOUNT_PATH_DRIVER_POD_TEMPLATES: &str = "/stackable/spark/driver-pod-templates";

Expand Down
27 changes: 27 additions & 0 deletions rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,14 @@ impl SparkApplication {
.map(|req| req.join(" "))
}

pub fn packages(&self) -> Vec<String> {
self.spec
.deps
.as_ref()
.and_then(|deps| deps.packages.clone())
.unwrap_or_default()
}

pub fn volumes(
&self,
s3conn: &Option<S3ConnectionSpec>,
Expand Down Expand Up @@ -332,6 +340,14 @@ impl SparkApplication {
.build(),
);

if !self.packages().is_empty() {
result.push(
VolumeBuilder::new(VOLUME_MOUNT_NAME_IVY2)
.empty_dir(EmptyDirVolumeSource::default())
.build(),
);
}

result
}

Expand Down Expand Up @@ -427,6 +443,13 @@ impl SparkApplication {
..VolumeMount::default()
});

if !self.packages().is_empty() {
mounts.push(VolumeMount {
name: VOLUME_MOUNT_NAME_IVY2.into(),
mount_path: VOLUME_MOUNT_PATH_IVY2.into(),
..VolumeMount::default()
});
}
mounts
}

Expand Down Expand Up @@ -612,6 +635,10 @@ impl SparkApplication {
submit_conf.extend(log_dir.application_spark_config());
}

if !self.packages().is_empty() {
submit_cmd.push(format!("--conf spark.jars.ivy={VOLUME_MOUNT_PATH_IVY2}"))
}

// conf arguments: these should follow - and thus override - values set from resource limits above
if let Some(spark_conf) = self.spec.spark_conf.clone() {
submit_conf.extend(spark_conf);
Expand Down
17 changes: 17 additions & 0 deletions tests/templates/kuttl/postgresql/00-assert.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
timeout: 900
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: integration-tests-sa
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: spark-postgresql
status:
readyReplicas: 1
replicas: 1
11 changes: 11 additions & 0 deletions tests/templates/kuttl/postgresql/00-install-postgresql.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestStep
commands:
- script: >-
helm install spark-postgresql
--namespace $NAMESPACE
--version 12.1.5
-f helm-bitnami-postgresql-values.yaml
--repo https://charts.bitnami.com/bitnami postgresql
timeout: 600
29 changes: 29 additions & 0 deletions tests/templates/kuttl/postgresql/00-serviceaccount.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: use-integration-tests-scc
rules:
{% if test_scenario['values']['openshift'] == "true" %}
- apiGroups: ["security.openshift.io"]
resources: ["securitycontextconstraints"]
resourceNames: ["privileged"]
verbs: ["use"]
{% endif %}
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: integration-tests-sa
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: use-integration-tests-scc
subjects:
- kind: ServiceAccount
name: integration-tests-sa
roleRef:
kind: Role
name: use-integration-tests-scc
apiGroup: rbac.authorization.k8s.io
10 changes: 10 additions & 0 deletions tests/templates/kuttl/postgresql/01-assert.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
{% if lookup('env', 'VECTOR_AGGREGATOR') %}
---
apiVersion: v1
kind: ConfigMap
metadata:
name: vector-aggregator-discovery
{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{% if lookup('env', 'VECTOR_AGGREGATOR') %}
---
apiVersion: v1
kind: ConfigMap
metadata:
name: vector-aggregator-discovery
data:
ADDRESS: {{ lookup('env', 'VECTOR_AGGREGATOR') }}
{% endif %}
12 changes: 12 additions & 0 deletions tests/templates/kuttl/postgresql/10-assert.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
timeout: 900
---
# The Job starting the whole process
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkApplication
metadata:
name: spark-examples
status:
phase: Succeeded
56 changes: 56 additions & 0 deletions tests/templates/kuttl/postgresql/10-deploy-spark-app.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkApplication
metadata:
name: spark-examples
spec:
version: "1.0"
{% if lookup('env', 'VECTOR_AGGREGATOR') %}
vectorAggregatorConfigMapName: vector-aggregator-discovery
{% endif %}
sparkImage: "docker.stackable.tech/stackable/pyspark-k8s:{{ test_scenario['values']['spark-latest'].split('-stackable')[0] }}-stackable{{ test_scenario['values']['spark-latest'].split('-stackable')[1] }}"
sparkImagePullPolicy: IfNotPresent
mode: cluster
mainApplicationFile: "local:///stackable/spark/jobs/write-to-postgresql.py"
job:
logging:
enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }}
driver:
logging:
enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }}
volumeMounts:
- name: script
mountPath: /stackable/spark/jobs
executor:
instances: 1
logging:
enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }}
volumeMounts:
- name: script
mountPath: /stackable/spark/jobs
deps:
packages:
- org.postgresql:postgresql:42.6.0
volumes:
- name: script
configMap:
name: write-to-postgresql
---
apiVersion: v1
kind: ConfigMap
metadata:
name: write-to-postgresql
data:
write-to-postgresql.py: |
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.appName("write-to-postgresql").getOrCreate()

df = spark.createDataFrame([1,2,3], IntegerType())

# Specifying create table column data types on write
df.write \
.option("createTableColumnTypes", "value INTEGER") \
.jdbc("jdbc:postgresql://spark-postgresql/spark", "sparktest",
properties={"user": "spark", "password": "spark"})
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
---
volumePermissions:
enabled: false
securityContext:
runAsUser: auto

primary:
podSecurityContext:
{% if test_scenario['values']['openshift'] == 'true' %}
enabled: false
{% else %}
enabled: true
{% endif %}
containerSecurityContext:
enabled: false

shmVolume:
chmod:
enabled: false

auth:
username: spark
password: spark
database: spark
11 changes: 11 additions & 0 deletions tests/test-definition.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ dimensions:
- name: ny-tlc-report
values:
- 0.1.0
- name: spark-latest
values:
- 3.4.0-stackable0.0.0-dev
- name: postgresql
values:
- 42.6.0
tests:
- name: spark-history-server
dimensions:
Expand Down Expand Up @@ -53,3 +59,8 @@ tests:
- spark
- ny-tlc-report
- openshift
- name: postgresql
dimensions:
- spark-latest
- postgresql
- openshift