This repository was archived by the owner on Feb 16, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
[Merged by Bors] - Spark anomaly detection demo #155
Closed
Closed
Changes from 21 commits
Commits
Show all changes
26 commits
Select commit
Hold shift + click to select a range
662c71b
wip
adwk67 963d493
wip
adwk67 4ed4a7e
wip:initial script
adwk67 ecdfeeb
wip: working sklearn model/script
adwk67 94e41af
wip: minor cleanup
adwk67 6de0d21
revert to FHVHV data
adwk67 66987f1
output final group counts
adwk67 270e1bc
wait for data to be loaded before starting spark job
adwk67 764e4d8
removed manifest (now added as a config map)
adwk67 7b16c15
wip: hive/trino
adwk67 0ba6e3e
write results to trino table
adwk67 197f1de
add superset artefacts
adwk67 f384827
dedicated stack for the demo
adwk67 4c28506
fixed stacks references
adwk67 e23469a
added more data and fine-tuned columns
adwk67 f65d659
update superset assets
adwk67 4d65de4
Merge branch 'main' into 142_spark_anomaly_detection
adwk67 317c0b7
documentation
adwk67 6d39c82
updated doc link
adwk67 0c6444c
Merge branch 'main' into 142_spark_anomaly_detection
adwk67 77c2092
merge main
adwk67 18bea72
Update demos/spark-k8s-anomaly-detection-taxi-data/create-spark-anoma…
adwk67 c406e8a
use resource limits struct
adwk67 084ee5f
Merge branch 'main' into 142_spark_anomaly_detection
adwk67 ff785d9
removed unecessary file
adwk67 7a77084
minor corrections and added note/screenshot re. spark-ui
adwk67 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
FROM docker.stackable.tech/stackable/pyspark-k8s:3.3.0-stackable0.2.0 | ||
|
||
RUN curl -L -O http://search.maven.org/remotecontent?filepath=org/apache/ivy/ivy/2.5.0/ivy-2.5.0.jar | ||
|
||
RUN java -jar ivy-2.5.0.jar -notransitive \ | ||
-dependency org.apache.iceberg iceberg-spark-runtime-3.3_2.12 0.14.1 \ | ||
-retrieve "/stackable/spark/jars/[artifact]-[revision](-[classifier]).[ext]" | ||
RUN java -jar ivy-2.5.0.jar -confs compile \ | ||
-dependency org.apache.iceberg iceberg-spark-runtime-3.3_2.12 0.14.1 \ | ||
-retrieve "/stackable/spark/jars/[artifact]-[revision](-[classifier]).[ext]" |
185 changes: 185 additions & 0 deletions
185
demos/spark-k8s-anomaly-detection-taxi-data/create-spark-anomaly-detection-job.yaml
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,185 @@ | ||
--- | ||
apiVersion: batch/v1 | ||
kind: Job | ||
metadata: | ||
name: create-spark-anomaly-detection-job | ||
spec: | ||
template: | ||
spec: | ||
initContainers: | ||
- name: wait-for-testdata | ||
image: docker.stackable.tech/stackable/tools:0.2.0-stackable0.3.0 | ||
command: ["bash", "-c", "echo 'Waiting for job load-ny-taxi-data to finish' && kubectl wait --for=condition=complete --timeout=30m job/load-ny-taxi-data"] | ||
containers: | ||
- name: create-spark-anomaly-detection-job | ||
image: docker.stackable.tech/stackable/tools:0.2.0-stackable0.3.0 | ||
command: ["bash", "-c", "echo 'Submitting Spark job' && kubectl apply -f /tmp/manifest/spark-ad-job.yaml"] | ||
volumeMounts: | ||
- name: manifest | ||
mountPath: /tmp/manifest | ||
volumes: | ||
- name: manifest | ||
configMap: | ||
name: create-spark-ad-job-manifest | ||
restartPolicy: OnFailure | ||
backoffLimit: 50 | ||
--- | ||
apiVersion: v1 | ||
kind: ConfigMap | ||
metadata: | ||
name: create-spark-ad-job-manifest | ||
data: | ||
spark-ad-job.yaml: | | ||
--- | ||
apiVersion: spark.stackable.tech/v1alpha1 | ||
kind: SparkApplication | ||
metadata: | ||
name: spark-ad | ||
spec: | ||
version: "1.0" | ||
sparkImage: docker.stackable.tech/aken/pyspark-k8s-with-iceberg:0.1.0 #docker.stackable.tech/stackable/pyspark-k8s:3.3.0-stackable0.2.0 | ||
adwk67 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
mode: cluster | ||
mainApplicationFile: local:///spark-scripts/spark-ad.py | ||
deps: | ||
requirements: | ||
- scikit-learn==0.24.2 | ||
#packages: | ||
# - org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:0.14.1 | ||
volumes: | ||
- name: cm-spark | ||
configMap: | ||
name: cm-spark | ||
driver: | ||
volumeMounts: | ||
- name: cm-spark | ||
mountPath: /spark-scripts | ||
executor: | ||
volumeMounts: | ||
- name: cm-spark | ||
mountPath: /spark-scripts | ||
sparkConf: | ||
spark.kubernetes.submission.waitAppCompletion: "false" | ||
adwk67 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
spark.kubernetes.driver.pod.name: "spark-ad-driver" | ||
spark.kubernetes.executor.podNamePrefix: "spark-ad" | ||
spark.kubernetes.driver.request.cores: "2" | ||
spark.kubernetes.driver.limit.cores: "3" | ||
spark.driver.cores: "3" | ||
spark.driver.memory: "1g" | ||
spark.driver.memoryOverheadFactor: "0.4" | ||
spark.kubernetes.executor.request.cores: "2" | ||
spark.kubernetes.executor.limit.cores: "3" | ||
spark.executor.cores: "3" | ||
spark.executor.memory: "4g" | ||
spark.executor.memoryOverheadFactor: "0.4" | ||
spark.executor.instances: "6" | ||
spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" | ||
spark.hadoop.fs.s3a.endpoint: http://minio-trino:9000 | ||
spark.hadoop.fs.s3a.path.style.access: "true" | ||
spark.hadoop.fs.s3a.access.key: demo | ||
spark.hadoop.fs.s3a.secret.key: demodemo | ||
spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions | ||
spark.sql.catalog.prediction: org.apache.iceberg.spark.SparkCatalog | ||
spark.sql.catalog.prediction.type: hive | ||
spark.sql.catalog.prediction.uri: thrift://hive-iceberg:9083 | ||
--- | ||
apiVersion: v1 | ||
kind: ConfigMap | ||
metadata: | ||
name: cm-spark | ||
data: | ||
spark-ad.py: | | ||
from pyspark.sql import SparkSession | ||
from pyspark.sql.functions import dayofweek, to_date, to_timestamp, date_format, year, hour, minute, month, when, dayofmonth, dayofweek | ||
from pyspark.sql.functions import concat_ws, substring, concat, lpad, lit | ||
from pyspark.sql.functions import round, sum, count, avg | ||
from pyspark.sql.functions import lag | ||
from pyspark.sql.window import Window | ||
from pyspark.sql import functions, types | ||
from sklearn.ensemble import IsolationForest | ||
from sklearn.preprocessing import StandardScaler | ||
|
||
spark = SparkSession.builder.appName("ny-tlc-anomaly-detection").getOrCreate() | ||
spark.sql("CREATE SCHEMA IF NOT EXISTS prediction.ad LOCATION 's3a://prediction/anomaly-detection'") | ||
spark.sql("CREATE TABLE IF NOT EXISTS prediction.ad.iforest (pickup_ts TIMESTAMP, pickup_minute_group VARCHAR(4), pickup_hour INT, pickup_year INT, pickup_month INT, pickup_dayofmonth INT, pickup_dayofweek INT, norides INT, total_bill DOUBLE, avg_bill DOUBLE, norides_lag INT, pred INT) USING iceberg") | ||
|
||
input_df = spark.read.parquet("s3a://demo/ny-taxi-data/raw/") | ||
|
||
df = input_df.select( | ||
to_date(input_df.pickup_datetime).alias("day_date") | ||
, year(input_df.pickup_datetime).alias('year') | ||
, month(input_df.pickup_datetime).alias('month') | ||
, dayofmonth(input_df.pickup_datetime).alias("dayofmonth") | ||
, dayofweek(input_df.pickup_datetime).alias("dayofweek") | ||
, hour(input_df.pickup_datetime).alias("hour") | ||
, minute(input_df.pickup_datetime).alias("minute") | ||
, input_df.driver_pay | ||
) | ||
|
||
df = df.withColumn("minute_group", when(df.minute < 30, '00').otherwise('30')) | ||
df = df.withColumn("time_group",concat_ws(":", lpad(df.hour, 2, '0'), df.minute_group, lit('00'))) | ||
df = df.withColumn("ts",concat_ws(" ", df.day_date, df.time_group)) | ||
|
||
dfs = df.select( | ||
to_timestamp(df.ts, "yyyy-MM-dd HH:mm:ss").alias("date_group") | ||
, df.minute_group | ||
, df.year | ||
, df.hour | ||
, df.month | ||
, df.dayofmonth | ||
, df.dayofweek | ||
, df.driver_pay | ||
).groupby("date_group", "minute_group", "hour", "year", "month", "dayofmonth", "dayofweek").agg(functions.count('driver_pay').alias('no_rides'), functions.round(functions.sum('driver_pay'), 2).alias('total_bill'), functions.round(functions.avg('driver_pay'), 2).alias('avg_bill')).orderBy("date_group") | ||
|
||
windowSpec = Window.partitionBy("hour").orderBy("date_group") | ||
|
||
dfs = dfs.withColumn("lag",lag("no_rides",2).over(windowSpec)) | ||
dfs = dfs.filter("lag IS NOT NULL") | ||
|
||
scaler = StandardScaler() | ||
classifier = IsolationForest(contamination=0.005, n_estimators=200, max_samples=0.7, random_state=42, n_jobs=-1) | ||
|
||
df_model = dfs.select(dfs.minute_group, dfs.hour, dfs.year, dfs.month, dfs.dayofmonth, dfs.dayofweek, dfs.no_rides, dfs.total_bill, dfs.avg_bill, dfs.lag) | ||
|
||
x_train = scaler.fit_transform(df_model.collect()) | ||
clf = classifier.fit(x_train) | ||
|
||
SCL = spark.sparkContext.broadcast(scaler) | ||
CLF = spark.sparkContext.broadcast(clf) | ||
|
||
def predict_using_broadcasts(minute_group, hour, year, month, dayofmonth, dayofweek, no_rides, total_bill, avg_bill, lag): | ||
prediction = 0 | ||
x_test = [[minute_group, hour, year, month, dayofmonth, dayofweek, no_rides, total_bill, avg_bill, lag]] | ||
try: | ||
x_test = SCL.value.transform(x_test) | ||
prediction = CLF.value.predict(x_test)[0] | ||
except ValueError: | ||
import traceback | ||
traceback.print_exc() | ||
print('Cannot predict:', x_test) | ||
return int(prediction) | ||
|
||
udf_predict_using_broadcasts = functions.udf(predict_using_broadcasts, types.IntegerType()) | ||
|
||
df_pred = dfs.withColumn( | ||
'prediction', | ||
udf_predict_using_broadcasts('minute_group', 'hour', 'year', 'month', 'dayofmonth', 'dayofweek', 'no_rides', 'total_bill', 'avg_bill', 'lag') | ||
) | ||
|
||
# map to table columns | ||
df_out = df_pred.select( | ||
df_pred.date_group.alias("pickup_ts") | ||
, df_pred.minute_group.alias("pickup_minute_group") | ||
, df_pred.hour.alias("pickup_hour") | ||
, df_pred.year.alias("pickup_year") | ||
, df_pred.month.alias("pickup_month") | ||
, df_pred.dayofmonth.alias("pickup_dayofmonth") | ||
, df_pred.dayofweek.alias("pickup_dayofweek") | ||
, df_pred.no_rides.alias("norides") | ||
, df_pred.total_bill.alias("total_bill") | ||
, df_pred.avg_bill.alias("avg_bill") | ||
, df_pred.lag.alias("norides_lag") | ||
, df_pred.prediction.alias("pred") | ||
) | ||
|
||
# write via iceberg | ||
df_out.writeTo("prediction.ad.iforest").append() |
14 changes: 14 additions & 0 deletions
14
demos/spark-k8s-anomaly-detection-taxi-data/load-test-data.yaml
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
--- | ||
apiVersion: batch/v1 | ||
kind: Job | ||
metadata: | ||
name: load-ny-taxi-data | ||
spec: | ||
template: | ||
spec: | ||
containers: | ||
- name: load-ny-taxi-data | ||
image: "bitnami/minio:2022-debian-10" | ||
command: ["bash", "-c", "cd /tmp && for month in 2020-09 2020-10 2020-11 2020-12; do curl -O https://repo.stackable.tech/repository/misc/ny-taxi-data/fhvhv_tripdata_$month.parquet && mc --insecure alias set minio http://minio-trino:9000/ demo demodemo && mc cp fhvhv_tripdata_$month.parquet minio/demo/ny-taxi-data/raw/ && mc mb --ignore-existing minio/prediction; done"] | ||
restartPolicy: OnFailure | ||
backoffLimit: 50 |
44 changes: 44 additions & 0 deletions
44
demos/spark-k8s-anomaly-detection-taxi-data/serviceaccount.yaml
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
--- | ||
apiVersion: rbac.authorization.k8s.io/v1 | ||
kind: ClusterRoleBinding | ||
metadata: | ||
name: demo-clusterrolebinding | ||
subjects: | ||
- apiGroup: rbac.authorization.k8s.io | ||
kind: Group | ||
name: system:serviceaccounts | ||
roleRef: | ||
kind: ClusterRole | ||
name: demo-clusterrole | ||
apiGroup: rbac.authorization.k8s.io | ||
--- | ||
apiVersion: rbac.authorization.k8s.io/v1 | ||
kind: ClusterRole | ||
metadata: | ||
name: demo-clusterrole | ||
rules: | ||
- apiGroups: | ||
- "" | ||
resources: | ||
- pods | ||
verbs: | ||
- get | ||
- list | ||
- watch | ||
- apiGroups: | ||
- batch | ||
resources: | ||
- jobs | ||
verbs: | ||
- get | ||
- list | ||
- watch | ||
- apiGroups: | ||
- spark.stackable.tech | ||
resources: | ||
- sparkapplications | ||
verbs: | ||
- get | ||
- list | ||
- watch | ||
- create |
76 changes: 76 additions & 0 deletions
76
demos/spark-k8s-anomaly-detection-taxi-data/setup-superset.yaml
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
--- | ||
apiVersion: batch/v1 | ||
kind: Job | ||
metadata: | ||
name: setup-superset | ||
spec: | ||
template: | ||
spec: | ||
containers: | ||
- name: setup-superset | ||
image: docker.stackable.tech/stackable/testing-tools:0.1.0-stackable0.1.0 | ||
# TODO update | ||
command: ["bash", "-c", "curl -o superset-assets.zip https://raw.githubusercontent.com/stackabletech/stackablectl/142_spark_anomaly_detection/demos/spark-k8s-anomaly-detection-taxi-data/superset-assets.zip && python -u /tmp/script/script.py"] | ||
volumeMounts: | ||
- name: script | ||
mountPath: /tmp/script | ||
volumes: | ||
- name: script | ||
configMap: | ||
name: setup-superset-script | ||
restartPolicy: OnFailure | ||
backoffLimit: 50 | ||
--- | ||
apiVersion: v1 | ||
kind: ConfigMap | ||
metadata: | ||
name: setup-superset-script | ||
data: | ||
script.py: | | ||
import logging | ||
import requests | ||
|
||
base_url = "http://superset-external:8088" # For local testing / developing replace it, afterwards change back to http://superset-external:8088 | ||
username = "admin" | ||
password = "admin" | ||
|
||
logging.basicConfig(level=logging.INFO) | ||
logging.info("Starting setup of Superset") | ||
logging.info("Getting access token from /api/v1/security/login") | ||
session = requests.session() | ||
access_token = session.post(f"{base_url}/api/v1/security/login", json={"username": username, "password": password, "provider": "db", "refresh": True}).json()['access_token'] | ||
|
||
logging.info("Getting csrf token from /api/v1/security/csrf_token") | ||
csrf_token = session.get(f"{base_url}/api/v1/security/csrf_token", headers={"Authorization": f"Bearer {access_token}"}).json()["result"] | ||
|
||
headers = { | ||
"accept": "application/json", | ||
"Authorization": f"Bearer {access_token}", | ||
"X-CSRFToken": csrf_token, | ||
} | ||
|
||
######################### | ||
# Export | ||
######################### | ||
#logging.info("Exporting all assets") | ||
#result = session.get(f"{base_url}/api/v1/assets/export", headers=headers) | ||
#assert result.status_code == 200 | ||
#with open("superset-assets.zip", "wb") as f: | ||
# f.write(result.content) | ||
|
||
######################### | ||
# Import | ||
######################### | ||
logging.info("Importing all assets") | ||
files = { | ||
"bundle": ("superset-assets.zip", open("superset-assets.zip", "rb")), | ||
} | ||
data = { | ||
"passwords": '{"databases/Trino.yaml": "demo"}' | ||
} | ||
result = session.post(f"{base_url}/api/v1/assets/import", headers=headers, files=files, data=data) | ||
print(result) | ||
print(result.text) | ||
assert result.status_code == 200 | ||
|
||
logging.info("Finished setup of Superset") |
Binary file not shown.
Binary file added
BIN
+788 KB
docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/minio_0.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added
BIN
+789 KB
docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/minio_1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added
BIN
+240 KB
docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/minio_2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added
BIN
+286 KB
docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/minio_3.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added
BIN
+255 KB
docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/minio_4.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added
BIN
+73.7 KB
docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/overview.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added
BIN
+244 KB
...s/ROOT/images/spark-k8s-anomaly-detection-taxi-data/superset_anomaly_scores.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.