Skip to content

Commit 9930cd6

Browse files
committed
added readme for running the examples (#71)
# Description Cleaned-up/updated the examples so we don't have an external S3 store as a dependency (adapted from the kuttl tests so that e.g. the examples can be used directly as a demo). Added a short readme to outline the necessary steps (separate from the usage docs as this is specific to local use and has a few extra steps).
1 parent 331fd64 commit 9930cd6

10 files changed

+5212
-53
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
44

55
## [Unreleased]
66

7+
### Changed
8+
9+
- Updated examples ([#71])
10+
11+
[#71]: https://github.com/stackabletech/spark-k8s-operator/pull/71
12+
713
## [0.1.0] - 2022-05-05
814

915
### Added

examples/README-examples.md

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Examples
2+
3+
## Overview
4+
5+
This note outlines a few things that are needed to run these examples on a local Kubernetes cluster.
6+
7+
## Cluster
8+
9+
Create a new local cluster (e.g. with [Kind](https://kind.sigs.k8s.io/docs/user/quick-start/) and the [stackablectl tool](https://github.com/stackabletech/stackablectl)). This creates a cluster named `stackable-data-platform`.
10+
11+
````text
12+
kind delete clusters --all
13+
stackablectl operator install spark-k8s -k
14+
````
15+
16+
Build the `ny-tlc-report` image from the Dockerfile in this repository (apps/docker/Dockerfile) and then load it to the cluster:
17+
18+
````text
19+
kind load docker-image docker.stackable.tech/stackable/ny-tlc-report:0.1.0 --name stackable-data-platform
20+
````
21+
22+
## Set up the `PersistentVolumeClaim`
23+
24+
The PVC should contain a few dependencies that Spark needs to access S3:
25+
26+
````text
27+
kubectl apply -f kind/kind-pvc.yaml
28+
````
29+
30+
## Set up the `minio` object store
31+
32+
Use a local object store to avoid external dependencies:
33+
34+
````text
35+
helm install test-minio \
36+
--set mode=standalone \
37+
--set replicas=1 \
38+
--set persistence.enabled=false \
39+
--set buckets[0].name=my-bucket,buckets[0].policy=public \
40+
--set resources.requests.memory=1Gi \
41+
--repo https://charts.min.io/ minio
42+
````
43+
44+
````text
45+
kubectl apply -f kind/minio.yaml
46+
````
47+
48+
Several resources are needed in this store. These can be loaded like this:
49+
50+
````text
51+
kubectl exec minio-mc-0 -- sh -c 'mc alias set test-minio http://test-minio:9000/'
52+
kubectl cp examples/ny-tlc-report-1.1.0.jar minio-mc-0:/tmp
53+
kubectl cp examples/ny-tlc-report.py minio-mc-0:/tmp
54+
kubectl cp examples/yellow_tripdata_2021-07.csv minio-mc-0:/tmp
55+
kubectl exec minio-mc-0 -- mc cp /tmp/ny-tlc-report-1.1.0.jar test-minio/my-bucket
56+
kubectl exec minio-mc-0 -- mc cp /tmp/ny-tlc-report.py test-minio/my-bucket
57+
kubectl exec minio-mc-0 -- mc cp /tmp/yellow_tripdata_2021-07.csv test-minio/my-bucket
58+
````
59+
60+
We now have a local S3-implementation with the bucket populated with the resources we need for the examples, which can be run like this:
61+
62+
````text
63+
kubectl apply -f examples/ny-tlc-report-configmap.yaml
64+
````

examples/ny-tlc-report-1.1.0.jar

132 KB
Binary file not shown.

examples/ny-tlc-report-configmap.yaml

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,18 @@ metadata:
55
name: cm-job-arguments
66
data:
77
job-args.txt: |
8-
s3a://nyc-tlc/trip data/yellow_tripdata_2021-07.csv
8+
s3a://my-bucket/yellow_tripdata_2021-07.csv
99
---
1010
apiVersion: spark.stackable.tech/v1alpha1
1111
kind: SparkApplication
1212
metadata:
13-
name: ny-tlc-report-configmap
13+
name: spark-ny-cm
1414
namespace: default
1515
spec:
1616
version: "1.0"
1717
sparkImage: docker.stackable.tech/stackable/spark-k8s:3.2.1-hadoop3.2-stackable0.4.0
1818
mode: cluster
19-
mainApplicationFile: s3a://stackable-spark-k8s-jars/jobs/ny-tlc-report-1.1.0.jar
19+
mainApplicationFile: s3a://my-bucket/ny-tlc-report-1.1.0.jar
2020
mainClass: tech.stackable.demo.spark.NYTLCReport
2121
volumes:
2222
- name: job-deps
@@ -27,10 +27,18 @@ spec:
2727
name: cm-job-arguments
2828
args:
2929
- "--input /arguments/job-args.txt"
30+
s3bucket:
31+
inline:
32+
bucketName: my-bucket
33+
connection:
34+
inline:
35+
host: test-minio
36+
port: 9000
3037
sparkConf:
31-
"spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"
32-
"spark.driver.extraClassPath": "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
33-
"spark.executor.extraClassPath": "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
38+
spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"
39+
spark.hadoop.fs.s3a.path.style.access: "true"
40+
spark.driver.extraClassPath: "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
41+
spark.executor.extraClassPath: "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
3442
driver:
3543
cores: 1
3644
coreLimit: "1200m"

examples/ny-tlc-report-external-dependencies.yaml

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,30 @@
22
apiVersion: spark.stackable.tech/v1alpha1
33
kind: SparkApplication
44
metadata:
5-
name: ny-tlc-report-external-dependencies
5+
name: spark-ny-ext
66
namespace: default
77
spec:
88
version: "1.0"
99
sparkImage: docker.stackable.tech/stackable/pyspark-k8s:3.2.1-hadoop3.2-python39-stackable0.1.0
1010
mode: cluster
11-
mainApplicationFile: s3a://stackable-spark-k8s-jars/jobs/ny_tlc_report.py
11+
mainApplicationFile: s3a://my-bucket/ny-tlc-report.py
1212
args:
13-
- "--input 's3a://nyc-tlc/trip data/yellow_tripdata_2021-07.csv'"
13+
- "--input 's3a://my-bucket/yellow_tripdata_2021-07.csv'"
1414
deps:
1515
requirements:
1616
- tabulate==0.8.9
17+
s3bucket:
18+
inline:
19+
bucketName: my-bucket
20+
connection:
21+
inline:
22+
host: test-minio
23+
port: 9000
1724
sparkConf:
18-
"spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"
19-
"spark.driver.extraClassPath": "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
20-
"spark.executor.extraClassPath": "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
25+
spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"
26+
spark.hadoop.fs.s3a.path.style.access: "true"
27+
spark.driver.extraClassPath: "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
28+
spark.executor.extraClassPath: "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
2129
volumes:
2230
- name: job-deps
2331
persistentVolumeClaim:

examples/ny-tlc-report-image.yaml

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,32 @@
22
apiVersion: spark.stackable.tech/v1alpha1
33
kind: SparkApplication
44
metadata:
5-
name: ny-tlc-report-image
5+
name: spark-ny-image
66
namespace: default
77
spec:
88
version: "1.0"
9+
# everything under /jobs will be copied to /stackable/spark/jobs
910
image: docker.stackable.tech/stackable/ny-tlc-report:0.1.0
1011
sparkImage: docker.stackable.tech/stackable/pyspark-k8s:3.2.1-hadoop3.2-python39-stackable0.1.0
1112
mode: cluster
1213
mainApplicationFile: local:///stackable/spark/jobs/ny_tlc_report.py
1314
args:
14-
- "--input 's3a://nyc-tlc/trip data/yellow_tripdata_2021-07.csv'"
15+
- "--input 's3a://my-bucket/yellow_tripdata_2021-07.csv'"
1516
deps:
1617
requirements:
1718
- tabulate==0.8.9
19+
s3bucket:
20+
inline:
21+
bucketName: my-bucket
22+
connection:
23+
inline:
24+
host: test-minio
25+
port: 9000
1826
sparkConf:
19-
"spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"
20-
"spark.driver.extraClassPath": "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
21-
"spark.executor.extraClassPath": "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
27+
spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"
28+
spark.hadoop.fs.s3a.path.style.access: "true"
29+
spark.driver.extraClassPath: "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
30+
spark.executor.extraClassPath: "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
2231
volumes:
2332
- name: job-deps
2433
persistentVolumeClaim:

examples/ny-tlc-report-pvc.yaml

Lines changed: 0 additions & 36 deletions
This file was deleted.

examples/ny-tlc-report.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
"""
2+
Creates a report with three indicators out of the NY TLC data set.
3+
4+
See: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page
5+
6+
It accepts two command line arguments:
7+
--input Path to the input data source. Can be a local path, a S3 object
8+
or whatever else Spark supports. Additional dependencies might
9+
need to be submitted along with the job.
10+
--output Path to write the report as a CSV file.
11+
"""
12+
import argparse
13+
14+
from argparse import Namespace
15+
from pyspark.sql import SparkSession, DataFrame
16+
from pyspark.sql.functions import dayofweek
17+
18+
import tabulate
19+
20+
21+
def check_args() -> Namespace:
22+
"""Parse the given CLI arguments"""
23+
parser = argparse.ArgumentParser(description="NY taxi trip report")
24+
parser.add_argument("--input", "-i", required=True, help="Input path for dataset")
25+
parser.add_argument(
26+
"--output", "-o", required=False, help="Output path for the report."
27+
)
28+
return parser.parse_args()
29+
30+
31+
def build_report(spark: SparkSession, args: Namespace) -> DataFrame:
32+
"""Compute the total number of passangers plus the average fare and distance per day of week"""
33+
34+
input_df = spark.read.options(header=True, inferSchema=True).csv(args.input)
35+
36+
return (
37+
input_df.select(
38+
dayofweek(input_df["tpep_pickup_datetime"]).alias("day_of_week"),
39+
input_df["passenger_count"],
40+
input_df["trip_distance"],
41+
input_df["total_amount"],
42+
)
43+
.groupby("day_of_week")
44+
.agg({"passenger_count": "sum", "trip_distance": "avg", "total_amount": "avg"})
45+
.withColumnRenamed("avg(total_amount)", "avg_amount")
46+
.withColumnRenamed("avg(trip_distance)", "avg_trip_distance")
47+
.withColumnRenamed("sum(passenger_count)", "total_passengers")
48+
.orderBy("day_of_week")
49+
)
50+
51+
52+
if __name__ == "__main__":
53+
args = check_args()
54+
55+
spark = SparkSession.builder.appName("NY TLC Report").getOrCreate()
56+
57+
try:
58+
report = build_report(spark, args)
59+
print(tabulate.tabulate(report.collect()))
60+
if args.output:
61+
report.coalesce(1).write.mode("overwrite").options(header=True).csv(
62+
args.output
63+
)
64+
finally:
65+
spark.stop()

0 commit comments

Comments
 (0)