1
+ ---
2
+ apiVersion : batch/v1
3
+ kind : Job
4
+ metadata :
5
+ name : create-spark-anomaly-detection-job
6
+ spec :
7
+ template :
8
+ spec :
9
+ initContainers :
10
+ - name : wait-for-testdata
11
+ image : docker.stackable.tech/stackable/tools:0.2.0-stackable0.3.0
12
+ command : ["bash", "-c", "echo 'Waiting for job load-ny-taxi-data to finish' && kubectl wait --for=condition=complete --timeout=30m job/load-ny-taxi-data"]
13
+ containers :
14
+ - name : create-spark-anomaly-detection-job
15
+ image : docker.stackable.tech/stackable/tools:0.2.0-stackable0.3.0
16
+ command : ["bash", "-c", "echo 'Submitting Spark job' && kubectl apply -f /tmp/manifest/spark-ad-job.yaml"]
17
+ volumeMounts :
18
+ - name : manifest
19
+ mountPath : /tmp/manifest
20
+ volumes :
21
+ - name : manifest
22
+ configMap :
23
+ name : create-spark-ad-job-manifest
24
+ restartPolicy : OnFailure
25
+ backoffLimit : 50
26
+ ---
27
+ apiVersion : v1
28
+ kind : ConfigMap
29
+ metadata :
30
+ name : create-spark-ad-job-manifest
31
+ data :
32
+ spark-ad-job.yaml : |
33
+ ---
34
+ apiVersion: spark.stackable.tech/v1alpha1
35
+ kind: SparkApplication
36
+ metadata:
37
+ name: spark-ad
38
+ spec:
39
+ version: "1.0"
40
+ sparkImage: docker.stackable.tech/demos/pyspark-k8s-with-kafka-and-iceberg:3.3.0-stackable0.2.0
41
+ mode: cluster
42
+ mainApplicationFile: local:///spark-scripts/spark-ad.py
43
+ deps:
44
+ requirements:
45
+ - scikit-learn==0.24.2
46
+ volumes:
47
+ - name: cm-spark
48
+ configMap:
49
+ name: cm-spark
50
+ job:
51
+ resources:
52
+ cpu:
53
+ min: "100m"
54
+ max: "500m"
55
+ memory:
56
+ limit: "1Gi"
57
+ driver:
58
+ resources:
59
+ cpu:
60
+ min: "2"
61
+ max: "3"
62
+ memory:
63
+ limit: "2Gi"
64
+ volumeMounts:
65
+ - name: cm-spark
66
+ mountPath: /spark-scripts
67
+ executor:
68
+ resources:
69
+ cpu:
70
+ min: "2"
71
+ max: "3"
72
+ memory:
73
+ limit: "5Gi"
74
+ volumeMounts:
75
+ - name: cm-spark
76
+ mountPath: /spark-scripts
77
+ sparkConf:
78
+ spark.kubernetes.submission.waitAppCompletion: "false"
79
+ spark.kubernetes.driver.pod.name: "spark-ad-driver"
80
+ spark.kubernetes.executor.podNamePrefix: "spark-ad"
81
+ spark.executor.instances: "6"
82
+ spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
83
+ spark.hadoop.fs.s3a.endpoint: http://minio-trino:9000
84
+ spark.hadoop.fs.s3a.path.style.access: "true"
85
+ spark.hadoop.fs.s3a.access.key: demo
86
+ spark.hadoop.fs.s3a.secret.key: demodemo
87
+ spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
88
+ spark.sql.catalog.prediction: org.apache.iceberg.spark.SparkCatalog
89
+ spark.sql.catalog.prediction.type: hive
90
+ spark.sql.catalog.prediction.uri: thrift://hive-iceberg:9083
91
+ ---
92
+ apiVersion : v1
93
+ kind : ConfigMap
94
+ metadata :
95
+ name : cm-spark
96
+ data :
97
+ spark-ad.py : |
98
+ from pyspark.sql import SparkSession
99
+ from pyspark.sql.functions import dayofweek, to_date, to_timestamp, date_format, year, hour, minute, month, when, dayofmonth, dayofweek
100
+ from pyspark.sql.functions import concat_ws, substring, concat, lpad, lit
101
+ from pyspark.sql.functions import round, sum, count, avg
102
+ from pyspark.sql.functions import lag
103
+ from pyspark.sql.window import Window
104
+ from pyspark.sql import functions, types
105
+ from sklearn.ensemble import IsolationForest
106
+ from sklearn.preprocessing import StandardScaler
107
+
108
+ spark = SparkSession.builder.appName("ny-tlc-anomaly-detection").getOrCreate()
109
+ spark.sql("CREATE SCHEMA IF NOT EXISTS prediction.ad LOCATION 's3a://prediction/anomaly-detection'")
110
+ spark.sql("CREATE TABLE IF NOT EXISTS prediction.ad.iforest (pickup_ts TIMESTAMP, pickup_minute_group VARCHAR(4), pickup_hour INT, pickup_year INT, pickup_month INT, pickup_dayofmonth INT, pickup_dayofweek INT, norides INT, total_bill DOUBLE, avg_bill DOUBLE, norides_lag INT, pred INT) USING iceberg")
111
+
112
+ input_df = spark.read.parquet("s3a://demo/ny-taxi-data/raw/")
113
+
114
+ df = input_df.select(
115
+ to_date(input_df.pickup_datetime).alias("day_date")
116
+ , year(input_df.pickup_datetime).alias('year')
117
+ , month(input_df.pickup_datetime).alias('month')
118
+ , dayofmonth(input_df.pickup_datetime).alias("dayofmonth")
119
+ , dayofweek(input_df.pickup_datetime).alias("dayofweek")
120
+ , hour(input_df.pickup_datetime).alias("hour")
121
+ , minute(input_df.pickup_datetime).alias("minute")
122
+ , input_df.driver_pay
123
+ )
124
+
125
+ df = df.withColumn("minute_group", when(df.minute < 30, '00').otherwise('30'))
126
+ df = df.withColumn("time_group",concat_ws(":", lpad(df.hour, 2, '0'), df.minute_group, lit('00')))
127
+ df = df.withColumn("ts",concat_ws(" ", df.day_date, df.time_group))
128
+
129
+ dfs = df.select(
130
+ to_timestamp(df.ts, "yyyy-MM-dd HH:mm:ss").alias("date_group")
131
+ , df.minute_group
132
+ , df.year
133
+ , df.hour
134
+ , df.month
135
+ , df.dayofmonth
136
+ , df.dayofweek
137
+ , df.driver_pay
138
+ ).groupby("date_group", "minute_group", "hour", "year", "month", "dayofmonth", "dayofweek").agg(functions.count('driver_pay').alias('no_rides'), functions.round(functions.sum('driver_pay'), 2).alias('total_bill'), functions.round(functions.avg('driver_pay'), 2).alias('avg_bill')).orderBy("date_group")
139
+
140
+ windowSpec = Window.partitionBy("hour").orderBy("date_group")
141
+
142
+ dfs = dfs.withColumn("lag",lag("no_rides",2).over(windowSpec))
143
+ dfs = dfs.filter("lag IS NOT NULL")
144
+
145
+ scaler = StandardScaler()
146
+ classifier = IsolationForest(contamination=0.005, n_estimators=200, max_samples=0.7, random_state=42, n_jobs=-1)
147
+
148
+ df_model = dfs.select(dfs.minute_group, dfs.hour, dfs.year, dfs.month, dfs.dayofmonth, dfs.dayofweek, dfs.no_rides, dfs.total_bill, dfs.avg_bill, dfs.lag)
149
+
150
+ x_train = scaler.fit_transform(df_model.collect())
151
+ clf = classifier.fit(x_train)
152
+
153
+ SCL = spark.sparkContext.broadcast(scaler)
154
+ CLF = spark.sparkContext.broadcast(clf)
155
+
156
+ def predict_using_broadcasts(minute_group, hour, year, month, dayofmonth, dayofweek, no_rides, total_bill, avg_bill, lag):
157
+ prediction = 0
158
+ x_test = [[minute_group, hour, year, month, dayofmonth, dayofweek, no_rides, total_bill, avg_bill, lag]]
159
+ try:
160
+ x_test = SCL.value.transform(x_test)
161
+ prediction = CLF.value.predict(x_test)[0]
162
+ except ValueError:
163
+ import traceback
164
+ traceback.print_exc()
165
+ print('Cannot predict:', x_test)
166
+ return int(prediction)
167
+
168
+ udf_predict_using_broadcasts = functions.udf(predict_using_broadcasts, types.IntegerType())
169
+
170
+ df_pred = dfs.withColumn(
171
+ 'prediction',
172
+ udf_predict_using_broadcasts('minute_group', 'hour', 'year', 'month', 'dayofmonth', 'dayofweek', 'no_rides', 'total_bill', 'avg_bill', 'lag')
173
+ )
174
+
175
+ # map to table columns
176
+ df_out = df_pred.select(
177
+ df_pred.date_group.alias("pickup_ts")
178
+ , df_pred.minute_group.alias("pickup_minute_group")
179
+ , df_pred.hour.alias("pickup_hour")
180
+ , df_pred.year.alias("pickup_year")
181
+ , df_pred.month.alias("pickup_month")
182
+ , df_pred.dayofmonth.alias("pickup_dayofmonth")
183
+ , df_pred.dayofweek.alias("pickup_dayofweek")
184
+ , df_pred.no_rides.alias("norides")
185
+ , df_pred.total_bill.alias("total_bill")
186
+ , df_pred.avg_bill.alias("avg_bill")
187
+ , df_pred.lag.alias("norides_lag")
188
+ , df_pred.prediction.alias("pred")
189
+ )
190
+
191
+ # write via iceberg
192
+ df_out.writeTo("prediction.ad.iforest").append()
0 commit comments