Skip to content
This repository was archived by the owner on Feb 16, 2024. It is now read-only.

[Merged by Bors] - Spark anomaly detection demo #155

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
662c71b
wip
adwk67 Oct 20, 2022
963d493
wip
adwk67 Oct 20, 2022
4ed4a7e
wip:initial script
adwk67 Oct 20, 2022
ecdfeeb
wip: working sklearn model/script
adwk67 Oct 26, 2022
94e41af
wip: minor cleanup
adwk67 Oct 26, 2022
6de0d21
revert to FHVHV data
adwk67 Oct 26, 2022
66987f1
output final group counts
adwk67 Oct 26, 2022
270e1bc
wait for data to be loaded before starting spark job
adwk67 Oct 27, 2022
764e4d8
removed manifest (now added as a config map)
adwk67 Oct 27, 2022
7b16c15
wip: hive/trino
adwk67 Oct 27, 2022
0ba6e3e
write results to trino table
adwk67 Oct 28, 2022
197f1de
add superset artefacts
adwk67 Oct 28, 2022
f384827
dedicated stack for the demo
adwk67 Oct 28, 2022
4c28506
fixed stacks references
adwk67 Oct 28, 2022
e23469a
added more data and fine-tuned columns
adwk67 Oct 31, 2022
f65d659
update superset assets
adwk67 Oct 31, 2022
4d65de4
Merge branch 'main' into 142_spark_anomaly_detection
adwk67 Oct 31, 2022
317c0b7
documentation
adwk67 Oct 31, 2022
6d39c82
updated doc link
adwk67 Oct 31, 2022
0c6444c
Merge branch 'main' into 142_spark_anomaly_detection
adwk67 Nov 2, 2022
77c2092
merge main
adwk67 Nov 4, 2022
18bea72
Update demos/spark-k8s-anomaly-detection-taxi-data/create-spark-anoma…
adwk67 Nov 4, 2022
c406e8a
use resource limits struct
adwk67 Nov 4, 2022
084ee5f
Merge branch 'main' into 142_spark_anomaly_detection
adwk67 Nov 4, 2022
ff785d9
removed unecessary file
adwk67 Nov 7, 2022
7a77084
minor corrections and added note/screenshot re. spark-ui
adwk67 Nov 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions demos/demos-v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,21 @@ demos:
- plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/nifi-kafka-druid-water-level-data/create-nifi-ingestion-job.yaml
- plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/nifi-kafka-druid-water-level-data/create-druid-ingestion-job.yaml
- plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/nifi-kafka-druid-water-level-data/setup-superset.yaml
spark-k8s-anomaly-detection-taxi-data:
description: Demo loading New York taxi data into an S3 bucket and carrying out an anomaly detection analysis on it
documentation: https://docs.stackable.tech/stackablectl/stable/demos/spark-k8s-anomaly-detection-taxi-data.html
stackableStack: spark-trino-superset-s3
labels:
- trino
- superset
- minio
- s3
- ny-taxi-data
manifests:
- plainYaml: demos/spark-k8s-anomaly-detection-taxi-data/serviceaccount.yaml
- plainYaml: demos/spark-k8s-anomaly-detection-taxi-data/load-test-data.yaml
- plainYaml: demos/spark-k8s-anomaly-detection-taxi-data/create-spark-anomaly-detection-job.yaml
- plainYaml: demos/spark-k8s-anomaly-detection-taxi-data/setup-superset.yaml
trino-taxi-data:
description: Demo loading 2.5 years of New York taxi data into S3 bucket, creating a Trino table and a Superset dashboard
documentation: https://docs.stackable.tech/stackablectl/stable/demos/trino-taxi-data.html
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
---
apiVersion: batch/v1
kind: Job
metadata:
name: create-spark-anomaly-detection-job
spec:
template:
spec:
initContainers:
- name: wait-for-testdata
image: docker.stackable.tech/stackable/tools:0.2.0-stackable0.3.0
command: ["bash", "-c", "echo 'Waiting for job load-ny-taxi-data to finish' && kubectl wait --for=condition=complete --timeout=30m job/load-ny-taxi-data"]
containers:
- name: create-spark-anomaly-detection-job
image: docker.stackable.tech/stackable/tools:0.2.0-stackable0.3.0
command: ["bash", "-c", "echo 'Submitting Spark job' && kubectl apply -f /tmp/manifest/spark-ad-job.yaml"]
volumeMounts:
- name: manifest
mountPath: /tmp/manifest
volumes:
- name: manifest
configMap:
name: create-spark-ad-job-manifest
restartPolicy: OnFailure
backoffLimit: 50
---
apiVersion: v1
kind: ConfigMap
metadata:
name: create-spark-ad-job-manifest
data:
spark-ad-job.yaml: |
---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkApplication
metadata:
name: spark-ad
spec:
version: "1.0"
sparkImage: docker.stackable.tech/demos/pyspark-k8s-with-kafka-and-iceberg:3.3.0-stackable0.2.0
mode: cluster
mainApplicationFile: local:///spark-scripts/spark-ad.py
deps:
requirements:
- scikit-learn==0.24.2
volumes:
- name: cm-spark
configMap:
name: cm-spark
job:
resources:
cpu:
min: "100m"
max: "500m"
memory:
limit: "1Gi"
driver:
resources:
cpu:
min: "2"
max: "3"
memory:
limit: "2Gi"
volumeMounts:
- name: cm-spark
mountPath: /spark-scripts
executor:
resources:
cpu:
min: "2"
max: "3"
memory:
limit: "5Gi"
volumeMounts:
- name: cm-spark
mountPath: /spark-scripts
sparkConf:
spark.kubernetes.submission.waitAppCompletion: "false"
spark.kubernetes.driver.pod.name: "spark-ad-driver"
spark.kubernetes.executor.podNamePrefix: "spark-ad"
spark.executor.instances: "6"
spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
spark.hadoop.fs.s3a.endpoint: http://minio-trino:9000
spark.hadoop.fs.s3a.path.style.access: "true"
spark.hadoop.fs.s3a.access.key: demo
spark.hadoop.fs.s3a.secret.key: demodemo
spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.prediction: org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.prediction.type: hive
spark.sql.catalog.prediction.uri: thrift://hive-iceberg:9083
---
apiVersion: v1
kind: ConfigMap
metadata:
name: cm-spark
data:
spark-ad.py: |
from pyspark.sql import SparkSession
from pyspark.sql.functions import dayofweek, to_date, to_timestamp, date_format, year, hour, minute, month, when, dayofmonth, dayofweek
from pyspark.sql.functions import concat_ws, substring, concat, lpad, lit
from pyspark.sql.functions import round, sum, count, avg
from pyspark.sql.functions import lag
from pyspark.sql.window import Window
from pyspark.sql import functions, types
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
spark = SparkSession.builder.appName("ny-tlc-anomaly-detection").getOrCreate()
spark.sql("CREATE SCHEMA IF NOT EXISTS prediction.ad LOCATION 's3a://prediction/anomaly-detection'")
spark.sql("CREATE TABLE IF NOT EXISTS prediction.ad.iforest (pickup_ts TIMESTAMP, pickup_minute_group VARCHAR(4), pickup_hour INT, pickup_year INT, pickup_month INT, pickup_dayofmonth INT, pickup_dayofweek INT, norides INT, total_bill DOUBLE, avg_bill DOUBLE, norides_lag INT, pred INT) USING iceberg")
input_df = spark.read.parquet("s3a://demo/ny-taxi-data/raw/")
df = input_df.select(
to_date(input_df.pickup_datetime).alias("day_date")
, year(input_df.pickup_datetime).alias('year')
, month(input_df.pickup_datetime).alias('month')
, dayofmonth(input_df.pickup_datetime).alias("dayofmonth")
, dayofweek(input_df.pickup_datetime).alias("dayofweek")
, hour(input_df.pickup_datetime).alias("hour")
, minute(input_df.pickup_datetime).alias("minute")
, input_df.driver_pay
)
df = df.withColumn("minute_group", when(df.minute < 30, '00').otherwise('30'))
df = df.withColumn("time_group",concat_ws(":", lpad(df.hour, 2, '0'), df.minute_group, lit('00')))
df = df.withColumn("ts",concat_ws(" ", df.day_date, df.time_group))
dfs = df.select(
to_timestamp(df.ts, "yyyy-MM-dd HH:mm:ss").alias("date_group")
, df.minute_group
, df.year
, df.hour
, df.month
, df.dayofmonth
, df.dayofweek
, df.driver_pay
).groupby("date_group", "minute_group", "hour", "year", "month", "dayofmonth", "dayofweek").agg(functions.count('driver_pay').alias('no_rides'), functions.round(functions.sum('driver_pay'), 2).alias('total_bill'), functions.round(functions.avg('driver_pay'), 2).alias('avg_bill')).orderBy("date_group")
windowSpec = Window.partitionBy("hour").orderBy("date_group")
dfs = dfs.withColumn("lag",lag("no_rides",2).over(windowSpec))
dfs = dfs.filter("lag IS NOT NULL")
scaler = StandardScaler()
classifier = IsolationForest(contamination=0.005, n_estimators=200, max_samples=0.7, random_state=42, n_jobs=-1)
df_model = dfs.select(dfs.minute_group, dfs.hour, dfs.year, dfs.month, dfs.dayofmonth, dfs.dayofweek, dfs.no_rides, dfs.total_bill, dfs.avg_bill, dfs.lag)
x_train = scaler.fit_transform(df_model.collect())
clf = classifier.fit(x_train)
SCL = spark.sparkContext.broadcast(scaler)
CLF = spark.sparkContext.broadcast(clf)
def predict_using_broadcasts(minute_group, hour, year, month, dayofmonth, dayofweek, no_rides, total_bill, avg_bill, lag):
prediction = 0
x_test = [[minute_group, hour, year, month, dayofmonth, dayofweek, no_rides, total_bill, avg_bill, lag]]
try:
x_test = SCL.value.transform(x_test)
prediction = CLF.value.predict(x_test)[0]
except ValueError:
import traceback
traceback.print_exc()
print('Cannot predict:', x_test)
return int(prediction)
udf_predict_using_broadcasts = functions.udf(predict_using_broadcasts, types.IntegerType())
df_pred = dfs.withColumn(
'prediction',
udf_predict_using_broadcasts('minute_group', 'hour', 'year', 'month', 'dayofmonth', 'dayofweek', 'no_rides', 'total_bill', 'avg_bill', 'lag')
)
# map to table columns
df_out = df_pred.select(
df_pred.date_group.alias("pickup_ts")
, df_pred.minute_group.alias("pickup_minute_group")
, df_pred.hour.alias("pickup_hour")
, df_pred.year.alias("pickup_year")
, df_pred.month.alias("pickup_month")
, df_pred.dayofmonth.alias("pickup_dayofmonth")
, df_pred.dayofweek.alias("pickup_dayofweek")
, df_pred.no_rides.alias("norides")
, df_pred.total_bill.alias("total_bill")
, df_pred.avg_bill.alias("avg_bill")
, df_pred.lag.alias("norides_lag")
, df_pred.prediction.alias("pred")
)
# write via iceberg
df_out.writeTo("prediction.ad.iforest").append()
14 changes: 14 additions & 0 deletions demos/spark-k8s-anomaly-detection-taxi-data/load-test-data.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
apiVersion: batch/v1
kind: Job
metadata:
name: load-ny-taxi-data
spec:
template:
spec:
containers:
- name: load-ny-taxi-data
image: "bitnami/minio:2022-debian-10"
command: ["bash", "-c", "cd /tmp && for month in 2020-09 2020-10 2020-11 2020-12; do curl -O https://repo.stackable.tech/repository/misc/ny-taxi-data/fhvhv_tripdata_$month.parquet && mc --insecure alias set minio http://minio-trino:9000/ demo demodemo && mc cp fhvhv_tripdata_$month.parquet minio/demo/ny-taxi-data/raw/ && mc mb --ignore-existing minio/prediction; done"]
restartPolicy: OnFailure
backoffLimit: 50
44 changes: 44 additions & 0 deletions demos/spark-k8s-anomaly-detection-taxi-data/serviceaccount.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: demo-clusterrolebinding
subjects:
- apiGroup: rbac.authorization.k8s.io
kind: Group
name: system:serviceaccounts
roleRef:
kind: ClusterRole
name: demo-clusterrole
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: demo-clusterrole
rules:
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- list
- watch
- apiGroups:
- batch
resources:
- jobs
verbs:
- get
- list
- watch
- apiGroups:
- spark.stackable.tech
resources:
- sparkapplications
verbs:
- get
- list
- watch
- create
76 changes: 76 additions & 0 deletions demos/spark-k8s-anomaly-detection-taxi-data/setup-superset.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
---
apiVersion: batch/v1
kind: Job
metadata:
name: setup-superset
spec:
template:
spec:
containers:
- name: setup-superset
image: docker.stackable.tech/stackable/testing-tools:0.1.0-stackable0.1.0
# TODO update
command: ["bash", "-c", "curl -o superset-assets.zip https://raw.githubusercontent.com/stackabletech/stackablectl/142_spark_anomaly_detection/demos/spark-k8s-anomaly-detection-taxi-data/superset-assets.zip && python -u /tmp/script/script.py"]
volumeMounts:
- name: script
mountPath: /tmp/script
volumes:
- name: script
configMap:
name: setup-superset-script
restartPolicy: OnFailure
backoffLimit: 50
---
apiVersion: v1
kind: ConfigMap
metadata:
name: setup-superset-script
data:
script.py: |
import logging
import requests

base_url = "http://superset-external:8088" # For local testing / developing replace it, afterwards change back to http://superset-external:8088
username = "admin"
password = "admin"

logging.basicConfig(level=logging.INFO)
logging.info("Starting setup of Superset")
logging.info("Getting access token from /api/v1/security/login")
session = requests.session()
access_token = session.post(f"{base_url}/api/v1/security/login", json={"username": username, "password": password, "provider": "db", "refresh": True}).json()['access_token']

logging.info("Getting csrf token from /api/v1/security/csrf_token")
csrf_token = session.get(f"{base_url}/api/v1/security/csrf_token", headers={"Authorization": f"Bearer {access_token}"}).json()["result"]

headers = {
"accept": "application/json",
"Authorization": f"Bearer {access_token}",
"X-CSRFToken": csrf_token,
}

#########################
# Export
#########################
#logging.info("Exporting all assets")
#result = session.get(f"{base_url}/api/v1/assets/export", headers=headers)
#assert result.status_code == 200
#with open("superset-assets.zip", "wb") as f:
# f.write(result.content)

#########################
# Import
#########################
logging.info("Importing all assets")
files = {
"bundle": ("superset-assets.zip", open("superset-assets.zip", "rb")),
}
data = {
"passwords": '{"databases/Trino.yaml": "demo"}'
}
result = session.post(f"{base_url}/api/v1/assets/import", headers=headers, files=files, data=data)
print(result)
print(result.text)
assert result.status_code == 200

logging.info("Finished setup of Superset")
Binary file not shown.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading