Skip to content

Commit de594cd

Browse files
authored
Merge pull request #17 from stackabletech/ny-tlc-report
PySpark job with dependencies
2 parents 7f5b1ff + 7d02eb5 commit de594cd

File tree

15 files changed

+533
-149
lines changed

15 files changed

+533
-149
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/ny_tlc_report.py

Lines changed: 23 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,53 +18,46 @@
1818

1919
def check_args() -> Namespace:
2020
"""Parse the given CLI arguments"""
21-
parser = argparse.ArgumentParser(
22-
description="NY taxi trip report"
21+
parser = argparse.ArgumentParser(description="NY taxi trip report")
22+
parser.add_argument("--input", "-i", required=True, help="Input path for dataset")
23+
parser.add_argument(
24+
"--output", "-o", required=False, help="Output path for the report."
2325
)
24-
parser.add_argument('--input', '-i', required=True,
25-
help='Input path for dataset')
26-
parser.add_argument('--output', '-o', required=False,
27-
help='Output path for the report.')
2826
return parser.parse_args()
2927

3028

3129
def build_report(spark: SparkSession, args: Namespace) -> DataFrame:
32-
""" Compute the total number of passangers plus the average fare and distance per day of week"""
30+
"""Compute the total number of passangers plus the average fare and distance per day of week"""
3331

3432
input_df = spark.read.options(header=True, inferSchema=True).csv(args.input)
3533

36-
return input_df \
37-
.select(
38-
dayofweek(input_df['tpep_pickup_datetime']).alias("day_of_week"),
39-
input_df['passenger_count'],
40-
input_df['trip_distance'],
41-
input_df['total_amount'],) \
42-
.groupby('day_of_week').agg({
43-
'passenger_count': 'sum',
44-
'trip_distance': 'avg',
45-
'total_amount': 'avg'}) \
46-
.withColumnRenamed('avg(total_amount)', 'avg_amount') \
47-
.withColumnRenamed('avg(trip_distance)', 'avg_trip_distance') \
48-
.withColumnRenamed('sum(passenger_count)', 'total_passengers') \
49-
.orderBy('day_of_week')
34+
return (
35+
input_df.select(
36+
dayofweek(input_df["tpep_pickup_datetime"]).alias("day_of_week"),
37+
input_df["passenger_count"],
38+
input_df["trip_distance"],
39+
input_df["total_amount"],
40+
)
41+
.groupby("day_of_week")
42+
.agg({"passenger_count": "sum", "trip_distance": "avg", "total_amount": "avg"})
43+
.withColumnRenamed("avg(total_amount)", "avg_amount")
44+
.withColumnRenamed("avg(trip_distance)", "avg_trip_distance")
45+
.withColumnRenamed("sum(passenger_count)", "total_passengers")
46+
.orderBy("day_of_week")
47+
)
5048

5149

5250
if __name__ == "__main__":
5351
args = check_args()
5452

55-
spark = SparkSession\
56-
.builder\
57-
.appName("NY TLC Report")\
58-
.getOrCreate()
53+
spark = SparkSession.builder.appName("NY TLC Report").getOrCreate()
5954

6055
try:
6156
report = build_report(spark, args)
6257
report.show()
6358
if args.output:
64-
report.coalesce(1) \
65-
.write \
66-
.mode('overwrite') \
67-
.options(header=True) \
68-
.csv(args.output)
59+
report.coalesce(1).write.mode("overwrite").options(header=True).csv(
60+
args.output
61+
)
6962
finally:
7063
spark.stop()

deploy/crd/sparkapplication.crd.yaml

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ spec:
2323
spec:
2424
description: SparkApplicationStatus CommandStatus
2525
properties:
26+
args:
27+
items:
28+
type: string
29+
nullable: true
30+
type: array
2631
cliOverrides:
2732
additionalProperties:
2833
type: string
@@ -53,6 +58,30 @@ spec:
5358
type: object
5459
default: {}
5560
type: object
61+
deps:
62+
nullable: true
63+
properties:
64+
excludePackages:
65+
items:
66+
type: string
67+
nullable: true
68+
type: array
69+
packages:
70+
items:
71+
type: string
72+
nullable: true
73+
type: array
74+
repositories:
75+
items:
76+
type: string
77+
nullable: true
78+
type: array
79+
requirements:
80+
items:
81+
type: string
82+
nullable: true
83+
type: array
84+
type: object
5685
driver:
5786
nullable: true
5887
properties:
@@ -102,6 +131,14 @@ spec:
102131
mode:
103132
nullable: true
104133
type: string
134+
sparkConf:
135+
additionalProperties:
136+
type: string
137+
nullable: true
138+
type: object
139+
sparkImage:
140+
nullable: true
141+
type: string
105142
stopped:
106143
nullable: true
107144
type: boolean

deploy/helm/spark-k8s-operator/crds/crds.yaml

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ spec:
2525
spec:
2626
description: SparkApplicationStatus CommandStatus
2727
properties:
28+
args:
29+
items:
30+
type: string
31+
nullable: true
32+
type: array
2833
cliOverrides:
2934
additionalProperties:
3035
type: string
@@ -55,6 +60,30 @@ spec:
5560
type: object
5661
default: {}
5762
type: object
63+
deps:
64+
nullable: true
65+
properties:
66+
excludePackages:
67+
items:
68+
type: string
69+
nullable: true
70+
type: array
71+
packages:
72+
items:
73+
type: string
74+
nullable: true
75+
type: array
76+
repositories:
77+
items:
78+
type: string
79+
nullable: true
80+
type: array
81+
requirements:
82+
items:
83+
type: string
84+
nullable: true
85+
type: array
86+
type: object
5887
driver:
5988
nullable: true
6089
properties:
@@ -104,6 +133,14 @@ spec:
104133
mode:
105134
nullable: true
106135
type: string
136+
sparkConf:
137+
additionalProperties:
138+
type: string
139+
nullable: true
140+
type: object
141+
sparkImage:
142+
nullable: true
143+
type: string
107144
stopped:
108145
nullable: true
109146
type: boolean

deploy/manifests/crds.yaml

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ spec:
2727
spec:
2828
description: SparkApplicationStatus CommandStatus
2929
properties:
30+
args:
31+
items:
32+
type: string
33+
nullable: true
34+
type: array
3035
cliOverrides:
3136
additionalProperties:
3237
type: string
@@ -57,6 +62,30 @@ spec:
5762
type: object
5863
default: {}
5964
type: object
65+
deps:
66+
nullable: true
67+
properties:
68+
excludePackages:
69+
items:
70+
type: string
71+
nullable: true
72+
type: array
73+
packages:
74+
items:
75+
type: string
76+
nullable: true
77+
type: array
78+
repositories:
79+
items:
80+
type: string
81+
nullable: true
82+
type: array
83+
requirements:
84+
items:
85+
type: string
86+
nullable: true
87+
type: array
88+
type: object
6089
driver:
6190
nullable: true
6291
properties:
@@ -106,6 +135,14 @@ spec:
106135
mode:
107136
nullable: true
108137
type: string
138+
sparkConf:
139+
additionalProperties:
140+
type: string
141+
nullable: true
142+
type: object
143+
sparkImage:
144+
nullable: true
145+
type: string
109146
stopped:
110147
nullable: true
111148
type: boolean

docs/modules/ROOT/pages/usage.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,9 @@ All override property values must be strings. The properties will be passed on w
4141
=== Environment Variables
4242

4343
Environment variables can be (over)written by adding them to `spark-env.sh` file as described above or by using the `envOverrides` property.
44+
45+
=== Development
46+
47+
export KUBERNETES_SERVICE_PORT_HTTPS=443
48+
export KUBERNETES_SERVICE_HOST=10.96.0.1
49+
kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=default:default
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
---
2+
apiVersion: spark.stackable.tech/v1alpha1
3+
kind: SparkApplication
4+
metadata:
5+
name: ny-tlc-report-external-dependencies
6+
namespace: default
7+
spec:
8+
version: "1.0"
9+
image: docker.stackable.tech/stackable/ny-tlc-report:0.1.0
10+
sparkImage: docker.stackable.tech/stackable/spark-k8s:3.2.1-hadoop3.2-python39-aws1.11.375-stackable0.3.0
11+
mode: cluster
12+
mainApplicationFile: s3a://stackable-spark-k8s-jars/jobs/ny_tlc_report.py
13+
args:
14+
- "--input 's3a://nyc-tlc/trip data/yellow_tripdata_2021-07.csv'"
15+
deps:
16+
requirements:
17+
- tabulate==0.8.9
18+
sparkConf:
19+
"spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"
20+
driver:
21+
cores: 1
22+
coreLimit: "1200m"
23+
memory: "512m"
24+
executor:
25+
cores: 1
26+
instances: 3
27+
memory: "512m"

examples/ny-tlc-report-image.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
---
2+
apiVersion: spark.stackable.tech/v1alpha1
3+
kind: SparkApplication
4+
metadata:
5+
name: ny-tlc-report-image
6+
namespace: default
7+
spec:
8+
version: "1.0"
9+
image: docker.stackable.tech/stackable/ny-tlc-report:0.1.0
10+
sparkImage: docker.stackable.tech/stackable/spark-k8s:3.2.1-hadoop3.2-python39-aws1.11.375-stackable0.3.0
11+
mode: cluster
12+
mainApplicationFile: local:///stackable/spark/jobs/ny_tlc_report.py
13+
args:
14+
- "--input 's3a://nyc-tlc/trip data/yellow_tripdata_2021-07.csv'"
15+
deps:
16+
requirements:
17+
- tabulate==0.8.9
18+
sparkConf:
19+
"spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"
20+
driver:
21+
cores: 1
22+
coreLimit: "1200m"
23+
memory: "512m"
24+
executor:
25+
cores: 1
26+
instances: 3
27+
memory: "512m"

examples/simple-spark-cluster.yaml

Lines changed: 0 additions & 20 deletions
This file was deleted.

examples/spark-examples-s3.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
---
2+
apiVersion: spark.stackable.tech/v1alpha1
3+
kind: SparkApplication
4+
metadata:
5+
name: spark-examples-s3
6+
spec:
7+
# TODO : remove the image
8+
image: docker.stackable.tech/stackable/ny-tlc-report:0.1.0
9+
version: "1.0"
10+
sparkImage: docker.stackable.tech/stackable/spark-k8s:3.2.1-hadoop3.2-python39-aws1.11.375-stackable0.3.0
11+
mode: cluster
12+
mainClass: org.apache.spark.examples.SparkPi
13+
mainApplicationFile: s3a://stackable-spark-k8s-jars/jobs/spark-examples_2.12-3.2.1.jar
14+
sparkConf:
15+
"spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"
16+
driver:
17+
cores: 1
18+
coreLimit: "1200m"
19+
memory: "512m"
20+
executor:
21+
cores: 1
22+
instances: 3
23+
memory: "512m"
24+
config:
25+
enableMonitoring: true

rust/crd/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ repository = "https://github.com/stackabletech/spark-k8s-operator"
88
version = "0.1.0-nightly"
99

1010
[dependencies]
11-
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "0.12.0" }
11+
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "0.13.0" }
1212

1313
semver = "1.0"
1414
serde = { version = "1.0", features = ["derive"] }

0 commit comments

Comments
 (0)