Skip to content

[Merged by Bors] - added readme for running the examples #71

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.

## [Unreleased]

### Changed

- Updated examples ([#71])

[#71]: https://github.com/stackabletech/spark-k8s-operator/pull/71

## [0.1.0] - 2022-05-05

### Added
Expand Down
64 changes: 64 additions & 0 deletions examples/README-examples.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Examples

## Overview

This note outlines a few things that are needed to run these examples on a local Kubernetes cluster.

## Cluster

Create a new local cluster (e.g. with [Kind](https://kind.sigs.k8s.io/docs/user/quick-start/) and the [stackablectl tool](https://github.com/stackabletech/stackablectl)). This creates a cluster named `stackable-data-platform`.

````text
kind delete clusters --all
stackablectl operator install spark-k8s -k
````

Build the `ny-tlc-report` image from the Dockerfile in this repository (apps/docker/Dockerfile) and then load it to the cluster:

````text
kind load docker-image docker.stackable.tech/stackable/ny-tlc-report:0.1.0 --name stackable-data-platform
````

## Set up the `PersistentVolumeClaim`

The PVC should contain a few dependencies that Spark needs to access S3:

````text
kubectl apply -f kind/kind-pvc.yaml
````

## Set up the `minio` object store

Use a local object store to avoid external dependencies:

````text
helm install test-minio \
--set mode=standalone \
--set replicas=1 \
--set persistence.enabled=false \
--set buckets[0].name=my-bucket,buckets[0].policy=public \
--set resources.requests.memory=1Gi \
--repo https://charts.min.io/ minio
````

````text
kubectl apply -f kind/minio.yaml
````

Several resources are needed in this store. These can be loaded like this:

````text
kubectl exec minio-mc-0 -- sh -c 'mc alias set test-minio http://test-minio:9000/'
kubectl cp examples/ny-tlc-report-1.1.0.jar minio-mc-0:/tmp
kubectl cp examples/ny-tlc-report.py minio-mc-0:/tmp
kubectl cp examples/yellow_tripdata_2021-07.csv minio-mc-0:/tmp
kubectl exec minio-mc-0 -- mc cp /tmp/ny-tlc-report-1.1.0.jar test-minio/my-bucket
kubectl exec minio-mc-0 -- mc cp /tmp/ny-tlc-report.py test-minio/my-bucket
kubectl exec minio-mc-0 -- mc cp /tmp/yellow_tripdata_2021-07.csv test-minio/my-bucket
````

We now have a local S3-implementation with the bucket populated with the resources we need for the examples, which can be run like this:

````text
kubectl apply -f examples/ny-tlc-report-configmap.yaml
````
Binary file added examples/ny-tlc-report-1.1.0.jar
Binary file not shown.
20 changes: 14 additions & 6 deletions examples/ny-tlc-report-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@ metadata:
name: cm-job-arguments
data:
job-args.txt: |
s3a://nyc-tlc/trip data/yellow_tripdata_2021-07.csv
s3a://my-bucket/yellow_tripdata_2021-07.csv
---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkApplication
metadata:
name: ny-tlc-report-configmap
name: spark-ny-cm
namespace: default
spec:
version: "1.0"
sparkImage: docker.stackable.tech/stackable/spark-k8s:3.2.1-hadoop3.2-stackable0.4.0
mode: cluster
mainApplicationFile: s3a://stackable-spark-k8s-jars/jobs/ny-tlc-report-1.1.0.jar
mainApplicationFile: s3a://my-bucket/ny-tlc-report-1.1.0.jar
mainClass: tech.stackable.demo.spark.NYTLCReport
volumes:
- name: job-deps
Expand All @@ -27,10 +27,18 @@ spec:
name: cm-job-arguments
args:
- "--input /arguments/job-args.txt"
s3bucket:
inline:
bucketName: my-bucket
connection:
inline:
host: test-minio
port: 9000
sparkConf:
"spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"
"spark.driver.extraClassPath": "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
"spark.executor.extraClassPath": "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"
spark.hadoop.fs.s3a.path.style.access: "true"
spark.driver.extraClassPath: "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
spark.executor.extraClassPath: "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
driver:
cores: 1
coreLimit: "1200m"
Expand Down
20 changes: 14 additions & 6 deletions examples/ny-tlc-report-external-dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,30 @@
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkApplication
metadata:
name: ny-tlc-report-external-dependencies
name: spark-ny-ext
namespace: default
spec:
version: "1.0"
sparkImage: docker.stackable.tech/stackable/pyspark-k8s:3.2.1-hadoop3.2-python39-stackable0.1.0
mode: cluster
mainApplicationFile: s3a://stackable-spark-k8s-jars/jobs/ny_tlc_report.py
mainApplicationFile: s3a://my-bucket/ny-tlc-report.py
args:
- "--input 's3a://nyc-tlc/trip data/yellow_tripdata_2021-07.csv'"
- "--input 's3a://my-bucket/yellow_tripdata_2021-07.csv'"
deps:
requirements:
- tabulate==0.8.9
s3bucket:
inline:
bucketName: my-bucket
connection:
inline:
host: test-minio
port: 9000
sparkConf:
"spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"
"spark.driver.extraClassPath": "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
"spark.executor.extraClassPath": "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"
spark.hadoop.fs.s3a.path.style.access: "true"
spark.driver.extraClassPath: "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
spark.executor.extraClassPath: "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
volumes:
- name: job-deps
persistentVolumeClaim:
Expand Down
19 changes: 14 additions & 5 deletions examples/ny-tlc-report-image.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,32 @@
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkApplication
metadata:
name: ny-tlc-report-image
name: spark-ny-image
namespace: default
spec:
version: "1.0"
# everything under /jobs will be copied to /stackable/spark/jobs
image: docker.stackable.tech/stackable/ny-tlc-report:0.1.0
sparkImage: docker.stackable.tech/stackable/pyspark-k8s:3.2.1-hadoop3.2-python39-stackable0.1.0
mode: cluster
mainApplicationFile: local:///stackable/spark/jobs/ny_tlc_report.py
args:
- "--input 's3a://nyc-tlc/trip data/yellow_tripdata_2021-07.csv'"
- "--input 's3a://my-bucket/yellow_tripdata_2021-07.csv'"
deps:
requirements:
- tabulate==0.8.9
s3bucket:
inline:
bucketName: my-bucket
connection:
inline:
host: test-minio
port: 9000
sparkConf:
"spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"
"spark.driver.extraClassPath": "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
"spark.executor.extraClassPath": "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"
spark.hadoop.fs.s3a.path.style.access: "true"
spark.driver.extraClassPath: "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
spark.executor.extraClassPath: "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
volumes:
- name: job-deps
persistentVolumeClaim:
Expand Down
36 changes: 0 additions & 36 deletions examples/ny-tlc-report-pvc.yaml

This file was deleted.

65 changes: 65 additions & 0 deletions examples/ny-tlc-report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""
Creates a report with three indicators out of the NY TLC data set.

See: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page

It accepts two command line arguments:
--input Path to the input data source. Can be a local path, a S3 object
or whatever else Spark supports. Additional dependencies might
need to be submitted along with the job.
--output Path to write the report as a CSV file.
"""
import argparse

from argparse import Namespace
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import dayofweek

import tabulate


def check_args() -> Namespace:
"""Parse the given CLI arguments"""
parser = argparse.ArgumentParser(description="NY taxi trip report")
parser.add_argument("--input", "-i", required=True, help="Input path for dataset")
parser.add_argument(
"--output", "-o", required=False, help="Output path for the report."
)
return parser.parse_args()


def build_report(spark: SparkSession, args: Namespace) -> DataFrame:
"""Compute the total number of passangers plus the average fare and distance per day of week"""

input_df = spark.read.options(header=True, inferSchema=True).csv(args.input)

return (
input_df.select(
dayofweek(input_df["tpep_pickup_datetime"]).alias("day_of_week"),
input_df["passenger_count"],
input_df["trip_distance"],
input_df["total_amount"],
)
.groupby("day_of_week")
.agg({"passenger_count": "sum", "trip_distance": "avg", "total_amount": "avg"})
.withColumnRenamed("avg(total_amount)", "avg_amount")
.withColumnRenamed("avg(trip_distance)", "avg_trip_distance")
.withColumnRenamed("sum(passenger_count)", "total_passengers")
.orderBy("day_of_week")
)


if __name__ == "__main__":
args = check_args()

spark = SparkSession.builder.appName("NY TLC Report").getOrCreate()

try:
report = build_report(spark, args)
print(tabulate.tabulate(report.collect()))
if args.output:
report.coalesce(1).write.mode("overwrite").options(header=True).csv(
args.output
)
finally:
spark.stop()
Loading