Skip to content

Commit bba639e

Browse files
authored
Merge pull request #92 from stackabletech/pyspark-kuttl-tests
PySpark tests
2 parents b65659e + c165e30 commit bba639e

14 files changed

+5285
-18
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@ All notable changes to this project will be documented in this file.
1313
- BREAKING: Use current S3 connection/bucket structs ([#86])
1414
- Add node selector to top-level job and specify node selection in PVC-relevant tests ([#90])
1515
- Update kuttl tests to use Spark 3.3.0 ([#91])
16+
- Bugfix for duplicate volume mounts in PySpark jobs ([#92])
1617

1718
[#86]: https://github.com/stackabletech/spark-k8s-operator/pull/86
1819
[#90]: https://github.com/stackabletech/spark-k8s-operator/pull/90
1920
[#91]: https://github.com/stackabletech/spark-k8s-operator/pull/91
21+
[#92]: https://github.com/stackabletech/spark-k8s-operator/pull/92
2022

2123
## [0.2.0] - 2022-06-21
2224

examples/ny-tlc-report-image.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ spec:
99
# everything under /jobs will be copied to /stackable/spark/jobs
1010
image: docker.stackable.tech/stackable/ny-tlc-report:0.1.0
1111
sparkImage: docker.stackable.tech/stackable/pyspark-k8s:3.3.0-stackable0.1.0
12-
sparkImagePullPolicy: Always
12+
sparkImagePullPolicy: IfNotPresent
1313
mode: cluster
1414
mainApplicationFile: local:///stackable/spark/jobs/ny_tlc_report.py
1515
args:

rust/operator-binary/src/spark_k8s_controller.rs

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use stackable_operator::commons::s3::InlinedS3BucketSpec;
77
use stackable_operator::commons::tls::{CaCert, TlsVerification};
88
use stackable_operator::k8s_openapi::api::batch::v1::{Job, JobSpec};
99
use stackable_operator::k8s_openapi::api::core::v1::{
10-
ConfigMap, ConfigMapVolumeSource, Container, EmptyDirVolumeSource, EnvVar, Pod, PodSpec,
11-
PodTemplateSpec, ServiceAccount, Volume, VolumeMount,
10+
ConfigMap, ConfigMapVolumeSource, Container, EnvVar, Pod, PodSpec, PodTemplateSpec,
11+
ServiceAccount, Volume, VolumeMount,
1212
};
1313
use stackable_operator::k8s_openapi::api::rbac::v1::{ClusterRole, RoleBinding, RoleRef, Subject};
1414
use stackable_operator::k8s_openapi::Resource;
@@ -327,13 +327,6 @@ fn spark_job(
327327
..VolumeMount::default()
328328
}];
329329
volume_mounts.extend(spark_application.driver_volume_mounts(s3bucket));
330-
if job_container.is_some() {
331-
volume_mounts.push(VolumeMount {
332-
name: VOLUME_MOUNT_NAME_JOB.into(),
333-
mount_path: VOLUME_MOUNT_PATH_JOB.into(),
334-
..VolumeMount::default()
335-
})
336-
}
337330

338331
let mut cb = ContainerBuilder::new("spark-submit");
339332
cb.image(spark_image)
@@ -362,14 +355,6 @@ fn spark_job(
362355
}];
363356
volumes.extend(spark_application.volumes(s3bucket));
364357

365-
if job_container.is_some() {
366-
volumes.push(Volume {
367-
name: String::from(VOLUME_MOUNT_NAME_JOB),
368-
empty_dir: Some(EmptyDirVolumeSource::default()),
369-
..Volume::default()
370-
})
371-
}
372-
373358
let pod = PodTemplateSpec {
374359
metadata: Some(
375360
ObjectMetaBuilder::new()
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
---
2+
apiVersion: kuttl.dev/v1beta1
3+
kind: TestAssert
4+
metadata:
5+
name: minio
6+
timeout: 900
7+
---
8+
apiVersion: v1
9+
kind: Service
10+
metadata:
11+
name: test-minio
12+
labels:
13+
app: minio
14+
---
15+
apiVersion: apps/v1
16+
kind: StatefulSet
17+
metadata:
18+
name: minio-mc
19+
status:
20+
readyReplicas: 1
21+
replicas: 1
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
---
2+
apiVersion: v1
3+
kind: Service
4+
metadata:
5+
name: minio-mc
6+
labels:
7+
app: minio-mc
8+
timeout: 240
9+
spec:
10+
clusterIP: None
11+
selector:
12+
app: minio-mc
13+
---
14+
apiVersion: apps/v1
15+
kind: StatefulSet
16+
metadata:
17+
name: minio-mc
18+
labels:
19+
app: minio-mc
20+
timeout: 240
21+
spec:
22+
replicas: 1
23+
serviceName: "minio-mc"
24+
selector:
25+
matchLabels:
26+
app: minio-mc
27+
template:
28+
metadata:
29+
labels:
30+
app: minio-mc
31+
spec:
32+
containers:
33+
- name: minio-mc
34+
image: bitnami/minio:2022-debian-10
35+
stdin: true
36+
tty: true
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
---
2+
apiVersion: kuttl.dev/v1beta1
3+
kind: TestStep
4+
commands:
5+
- script: >-
6+
helm install test-minio
7+
--namespace $NAMESPACE
8+
--set mode=standalone
9+
--set replicas=1
10+
--set persistence.enabled=false
11+
--set buckets[0].name=my-bucket,buckets[0].policy=public
12+
--set resources.requests.memory=1Gi
13+
--repo https://charts.min.io/ minio
14+
timeout: 240
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
---
2+
apiVersion: kuttl.dev/v1beta1
3+
kind: TestStep
4+
commands:
5+
- script: >-
6+
kubectl exec -n $NAMESPACE minio-mc-0 --
7+
sh -c 'mc alias set test-minio http://test-minio:9000/'
8+
- script: kubectl cp -n $NAMESPACE ny_tlc_report.py minio-mc-0:/tmp
9+
- script: kubectl cp -n $NAMESPACE yellow_tripdata_2021-07.csv minio-mc-0:/tmp
10+
- script: >-
11+
kubectl exec -n $NAMESPACE minio-mc-0 --
12+
mc cp /tmp/ny_tlc_report.py test-minio/my-bucket
13+
- script: >-
14+
kubectl exec -n $NAMESPACE minio-mc-0 --
15+
mc cp /tmp/yellow_tripdata_2021-07.csv test-minio/my-bucket
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
---
2+
apiVersion: kuttl.dev/v1beta1
3+
kind: TestAssert
4+
metadata:
5+
name: pyspark-ny-deps-job
6+
timeout: 900
7+
---
8+
apiVersion: batch/v1
9+
kind: Job
10+
metadata:
11+
name: pyspark-ny-deps-job
12+
status:
13+
succeeded: 1
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
---
2+
apiVersion: v1
3+
kind: PersistentVolumeClaim
4+
metadata:
5+
name: pyspark-ny-pvc
6+
spec:
7+
accessModes:
8+
- ReadWriteOnce
9+
resources:
10+
requests:
11+
storage: 1Gi
12+
---
13+
apiVersion: batch/v1
14+
kind: Job
15+
metadata:
16+
name: pyspark-ny-deps-job
17+
spec:
18+
template:
19+
spec:
20+
nodeSelector:
21+
node: "1"
22+
restartPolicy: Never
23+
volumes:
24+
- name: job-deps
25+
persistentVolumeClaim:
26+
claimName: pyspark-ny-pvc
27+
containers:
28+
- name: aws-deps
29+
image: docker.stackable.tech/stackable/tools:0.2.0-stackable0
30+
env:
31+
- name: DEST_DIR
32+
value: "/dependencies/jars"
33+
- name: AWS
34+
value: "1.11.1026"
35+
- name: HADOOP
36+
value: "3.3.3"
37+
command:
38+
[
39+
"bash",
40+
"-x",
41+
"-o",
42+
"pipefail",
43+
"-c",
44+
"mkdir -p ${DEST_DIR} && curl -L https://search.maven.org/remotecontent?filepath=org/apache/hadoop/hadoop-aws/${HADOOP}/hadoop-aws-${HADOOP}.jar -o ${DEST_DIR}/hadoop-aws-${HADOOP}.jar && curl -L https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS}/aws-java-sdk-bundle-${AWS}.jar -o ${DEST_DIR}/aws-java-sdk-bundle-${AWS}.jar && chown -R stackable:stackable ${DEST_DIR} && chmod -R a=,u=rwX ${DEST_DIR}",
45+
]
46+
volumeMounts:
47+
- name: job-deps
48+
mountPath: /dependencies
49+
securityContext:
50+
runAsUser: 0
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
---
2+
apiVersion: kuttl.dev/v1beta1
3+
kind: TestAssert
4+
metadata:
5+
name: pyspark-ny-public-s3
6+
timeout: 900
7+
---
8+
# The Job starting the whole process
9+
apiVersion: spark.stackable.tech/v1alpha1
10+
kind: SparkApplication
11+
metadata:
12+
name: pyspark-ny-public-s3
13+
status:
14+
phase: Succeeded
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
---
2+
apiVersion: spark.stackable.tech/v1alpha1
3+
kind: SparkApplication
4+
metadata:
5+
name: pyspark-ny-public-s3
6+
spec:
7+
version: "1.0"
8+
# everything under /jobs will be copied to /stackable/spark/jobs
9+
sparkImage: docker.stackable.tech/stackable/pyspark-k8s:{{ test_scenario['values']['spark'] }}-stackable{{ test_scenario['values']['stackable'] }}
10+
sparkImagePullPolicy: IfNotPresent
11+
mode: cluster
12+
mainApplicationFile: s3a://my-bucket/ny_tlc_report.py
13+
args:
14+
- "--input 's3a://my-bucket/yellow_tripdata_2021-07.csv'"
15+
deps:
16+
requirements:
17+
- tabulate==0.8.9
18+
s3bucket:
19+
inline:
20+
bucketName: my-bucket
21+
connection:
22+
inline:
23+
host: test-minio
24+
port: 9000
25+
accessStyle: Path
26+
sparkConf:
27+
spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"
28+
spark.driver.extraClassPath: "/dependencies/jars/*"
29+
spark.executor.extraClassPath: "/dependencies/jars/*"
30+
volumes:
31+
- name: job-deps
32+
persistentVolumeClaim:
33+
claimName: pyspark-ny-pvc
34+
driver:
35+
cores: 1
36+
coreLimit: "1200m"
37+
memory: "512m"
38+
volumeMounts:
39+
- name: job-deps
40+
mountPath: /dependencies
41+
executor:
42+
cores: 1
43+
instances: 3
44+
memory: "512m"
45+
volumeMounts:
46+
- name: job-deps
47+
mountPath: /dependencies
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
"""
2+
Creates a report with three indicators out of the NY TLC data set.
3+
4+
See: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page
5+
6+
It accepts two command line arguments:
7+
--input Path to the input data source. Can be a local path, a S3 object
8+
or whatever else Spark supports. Additional dependencies might
9+
need to be submitted along with the job.
10+
--output Path to write the report as a CSV file.
11+
"""
12+
import argparse
13+
14+
from argparse import Namespace
15+
from pyspark.sql import SparkSession, DataFrame
16+
from pyspark.sql.functions import dayofweek
17+
18+
import tabulate
19+
20+
21+
def check_args() -> Namespace:
22+
"""Parse the given CLI arguments"""
23+
parser = argparse.ArgumentParser(description="NY taxi trip report")
24+
parser.add_argument("--input", "-i", required=True, help="Input path for dataset")
25+
parser.add_argument(
26+
"--output", "-o", required=False, help="Output path for the report."
27+
)
28+
return parser.parse_args()
29+
30+
31+
def build_report(spark: SparkSession, args: Namespace) -> DataFrame:
32+
"""Compute the total number of passangers plus the average fare and distance per day of week"""
33+
34+
input_df = spark.read.options(header=True, inferSchema=True).csv(args.input)
35+
36+
return (
37+
input_df.select(
38+
dayofweek(input_df["tpep_pickup_datetime"]).alias("day_of_week"),
39+
input_df["passenger_count"],
40+
input_df["trip_distance"],
41+
input_df["total_amount"],
42+
)
43+
.groupby("day_of_week")
44+
.agg({"passenger_count": "sum", "trip_distance": "avg", "total_amount": "avg"})
45+
.withColumnRenamed("avg(total_amount)", "avg_amount")
46+
.withColumnRenamed("avg(trip_distance)", "avg_trip_distance")
47+
.withColumnRenamed("sum(passenger_count)", "total_passengers")
48+
.orderBy("day_of_week")
49+
)
50+
51+
52+
if __name__ == "__main__":
53+
args = check_args()
54+
55+
spark = SparkSession.builder.appName("NY TLC Report").getOrCreate()
56+
57+
try:
58+
report = build_report(spark, args)
59+
print(tabulate.tabulate(report.collect()))
60+
if args.output:
61+
report.coalesce(1).write.mode("overwrite").options(header=True).csv(
62+
args.output
63+
)
64+
finally:
65+
spark.stop()

0 commit comments

Comments
 (0)