diff --git a/demos/demos-v1.yaml b/demos/demos-v1.yaml index 8c7bbac3..88edc2d2 100644 --- a/demos/demos-v1.yaml +++ b/demos/demos-v1.yaml @@ -53,6 +53,21 @@ demos: - plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/nifi-kafka-druid-water-level-data/create-nifi-ingestion-job.yaml - plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/nifi-kafka-druid-water-level-data/create-druid-ingestion-job.yaml - plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/nifi-kafka-druid-water-level-data/setup-superset.yaml + spark-k8s-anomaly-detection-taxi-data: + description: Demo loading New York taxi data into an S3 bucket and carrying out an anomaly detection analysis on it + documentation: https://docs.stackable.tech/stackablectl/stable/demos/spark-k8s-anomaly-detection-taxi-data.html + stackableStack: spark-trino-superset-s3 + labels: + - trino + - superset + - minio + - s3 + - ny-taxi-data + manifests: + - plainYaml: demos/spark-k8s-anomaly-detection-taxi-data/serviceaccount.yaml + - plainYaml: demos/spark-k8s-anomaly-detection-taxi-data/load-test-data.yaml + - plainYaml: demos/spark-k8s-anomaly-detection-taxi-data/create-spark-anomaly-detection-job.yaml + - plainYaml: demos/spark-k8s-anomaly-detection-taxi-data/setup-superset.yaml trino-taxi-data: description: Demo loading 2.5 years of New York taxi data into S3 bucket, creating a Trino table and a Superset dashboard documentation: https://docs.stackable.tech/stackablectl/stable/demos/trino-taxi-data.html diff --git a/demos/spark-k8s-anomaly-detection-taxi-data/create-spark-anomaly-detection-job.yaml b/demos/spark-k8s-anomaly-detection-taxi-data/create-spark-anomaly-detection-job.yaml new file mode 100644 index 00000000..b350ead2 --- /dev/null +++ b/demos/spark-k8s-anomaly-detection-taxi-data/create-spark-anomaly-detection-job.yaml @@ -0,0 +1,192 @@ +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: create-spark-anomaly-detection-job +spec: + template: + spec: + initContainers: + - name: wait-for-testdata + image: docker.stackable.tech/stackable/tools:0.2.0-stackable0.3.0 + command: ["bash", "-c", "echo 'Waiting for job load-ny-taxi-data to finish' && kubectl wait --for=condition=complete --timeout=30m job/load-ny-taxi-data"] + containers: + - name: create-spark-anomaly-detection-job + image: docker.stackable.tech/stackable/tools:0.2.0-stackable0.3.0 + command: ["bash", "-c", "echo 'Submitting Spark job' && kubectl apply -f /tmp/manifest/spark-ad-job.yaml"] + volumeMounts: + - name: manifest + mountPath: /tmp/manifest + volumes: + - name: manifest + configMap: + name: create-spark-ad-job-manifest + restartPolicy: OnFailure + backoffLimit: 50 +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: create-spark-ad-job-manifest +data: + spark-ad-job.yaml: | + --- + apiVersion: spark.stackable.tech/v1alpha1 + kind: SparkApplication + metadata: + name: spark-ad + spec: + version: "1.0" + sparkImage: docker.stackable.tech/demos/pyspark-k8s-with-kafka-and-iceberg:3.3.0-stackable0.2.0 + mode: cluster + mainApplicationFile: local:///spark-scripts/spark-ad.py + deps: + requirements: + - scikit-learn==0.24.2 + volumes: + - name: cm-spark + configMap: + name: cm-spark + job: + resources: + cpu: + min: "100m" + max: "500m" + memory: + limit: "1Gi" + driver: + resources: + cpu: + min: "2" + max: "3" + memory: + limit: "2Gi" + volumeMounts: + - name: cm-spark + mountPath: /spark-scripts + executor: + resources: + cpu: + min: "2" + max: "3" + memory: + limit: "5Gi" + volumeMounts: + - name: cm-spark + mountPath: /spark-scripts + sparkConf: + spark.kubernetes.submission.waitAppCompletion: "false" + spark.kubernetes.driver.pod.name: "spark-ad-driver" + spark.kubernetes.executor.podNamePrefix: "spark-ad" + spark.executor.instances: "6" + spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" + spark.hadoop.fs.s3a.endpoint: http://minio-trino:9000 + spark.hadoop.fs.s3a.path.style.access: "true" + spark.hadoop.fs.s3a.access.key: demo + spark.hadoop.fs.s3a.secret.key: demodemo + spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions + spark.sql.catalog.prediction: org.apache.iceberg.spark.SparkCatalog + spark.sql.catalog.prediction.type: hive + spark.sql.catalog.prediction.uri: thrift://hive-iceberg:9083 +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: cm-spark +data: + spark-ad.py: | + from pyspark.sql import SparkSession + from pyspark.sql.functions import dayofweek, to_date, to_timestamp, date_format, year, hour, minute, month, when, dayofmonth, dayofweek + from pyspark.sql.functions import concat_ws, substring, concat, lpad, lit + from pyspark.sql.functions import round, sum, count, avg + from pyspark.sql.functions import lag + from pyspark.sql.window import Window + from pyspark.sql import functions, types + from sklearn.ensemble import IsolationForest + from sklearn.preprocessing import StandardScaler + + spark = SparkSession.builder.appName("ny-tlc-anomaly-detection").getOrCreate() + spark.sql("CREATE SCHEMA IF NOT EXISTS prediction.ad LOCATION 's3a://prediction/anomaly-detection'") + spark.sql("CREATE TABLE IF NOT EXISTS prediction.ad.iforest (pickup_ts TIMESTAMP, pickup_minute_group VARCHAR(4), pickup_hour INT, pickup_year INT, pickup_month INT, pickup_dayofmonth INT, pickup_dayofweek INT, norides INT, total_bill DOUBLE, avg_bill DOUBLE, norides_lag INT, pred INT) USING iceberg") + + input_df = spark.read.parquet("s3a://demo/ny-taxi-data/raw/") + + df = input_df.select( + to_date(input_df.pickup_datetime).alias("day_date") + , year(input_df.pickup_datetime).alias('year') + , month(input_df.pickup_datetime).alias('month') + , dayofmonth(input_df.pickup_datetime).alias("dayofmonth") + , dayofweek(input_df.pickup_datetime).alias("dayofweek") + , hour(input_df.pickup_datetime).alias("hour") + , minute(input_df.pickup_datetime).alias("minute") + , input_df.driver_pay + ) + + df = df.withColumn("minute_group", when(df.minute < 30, '00').otherwise('30')) + df = df.withColumn("time_group",concat_ws(":", lpad(df.hour, 2, '0'), df.minute_group, lit('00'))) + df = df.withColumn("ts",concat_ws(" ", df.day_date, df.time_group)) + + dfs = df.select( + to_timestamp(df.ts, "yyyy-MM-dd HH:mm:ss").alias("date_group") + , df.minute_group + , df.year + , df.hour + , df.month + , df.dayofmonth + , df.dayofweek + , df.driver_pay + ).groupby("date_group", "minute_group", "hour", "year", "month", "dayofmonth", "dayofweek").agg(functions.count('driver_pay').alias('no_rides'), functions.round(functions.sum('driver_pay'), 2).alias('total_bill'), functions.round(functions.avg('driver_pay'), 2).alias('avg_bill')).orderBy("date_group") + + windowSpec = Window.partitionBy("hour").orderBy("date_group") + + dfs = dfs.withColumn("lag",lag("no_rides",2).over(windowSpec)) + dfs = dfs.filter("lag IS NOT NULL") + + scaler = StandardScaler() + classifier = IsolationForest(contamination=0.005, n_estimators=200, max_samples=0.7, random_state=42, n_jobs=-1) + + df_model = dfs.select(dfs.minute_group, dfs.hour, dfs.year, dfs.month, dfs.dayofmonth, dfs.dayofweek, dfs.no_rides, dfs.total_bill, dfs.avg_bill, dfs.lag) + + x_train = scaler.fit_transform(df_model.collect()) + clf = classifier.fit(x_train) + + SCL = spark.sparkContext.broadcast(scaler) + CLF = spark.sparkContext.broadcast(clf) + + def predict_using_broadcasts(minute_group, hour, year, month, dayofmonth, dayofweek, no_rides, total_bill, avg_bill, lag): + prediction = 0 + x_test = [[minute_group, hour, year, month, dayofmonth, dayofweek, no_rides, total_bill, avg_bill, lag]] + try: + x_test = SCL.value.transform(x_test) + prediction = CLF.value.predict(x_test)[0] + except ValueError: + import traceback + traceback.print_exc() + print('Cannot predict:', x_test) + return int(prediction) + + udf_predict_using_broadcasts = functions.udf(predict_using_broadcasts, types.IntegerType()) + + df_pred = dfs.withColumn( + 'prediction', + udf_predict_using_broadcasts('minute_group', 'hour', 'year', 'month', 'dayofmonth', 'dayofweek', 'no_rides', 'total_bill', 'avg_bill', 'lag') + ) + + # map to table columns + df_out = df_pred.select( + df_pred.date_group.alias("pickup_ts") + , df_pred.minute_group.alias("pickup_minute_group") + , df_pred.hour.alias("pickup_hour") + , df_pred.year.alias("pickup_year") + , df_pred.month.alias("pickup_month") + , df_pred.dayofmonth.alias("pickup_dayofmonth") + , df_pred.dayofweek.alias("pickup_dayofweek") + , df_pred.no_rides.alias("norides") + , df_pred.total_bill.alias("total_bill") + , df_pred.avg_bill.alias("avg_bill") + , df_pred.lag.alias("norides_lag") + , df_pred.prediction.alias("pred") + ) + + # write via iceberg + df_out.writeTo("prediction.ad.iforest").append() \ No newline at end of file diff --git a/demos/spark-k8s-anomaly-detection-taxi-data/load-test-data.yaml b/demos/spark-k8s-anomaly-detection-taxi-data/load-test-data.yaml new file mode 100644 index 00000000..310ee4e0 --- /dev/null +++ b/demos/spark-k8s-anomaly-detection-taxi-data/load-test-data.yaml @@ -0,0 +1,14 @@ +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: load-ny-taxi-data +spec: + template: + spec: + containers: + - name: load-ny-taxi-data + image: "bitnami/minio:2022-debian-10" + command: ["bash", "-c", "cd /tmp && for month in 2020-09 2020-10 2020-11 2020-12; do curl -O https://repo.stackable.tech/repository/misc/ny-taxi-data/fhvhv_tripdata_$month.parquet && mc --insecure alias set minio http://minio-trino:9000/ demo demodemo && mc cp fhvhv_tripdata_$month.parquet minio/demo/ny-taxi-data/raw/ && mc mb --ignore-existing minio/prediction; done"] + restartPolicy: OnFailure + backoffLimit: 50 diff --git a/demos/spark-k8s-anomaly-detection-taxi-data/serviceaccount.yaml b/demos/spark-k8s-anomaly-detection-taxi-data/serviceaccount.yaml new file mode 100644 index 00000000..3df7ad76 --- /dev/null +++ b/demos/spark-k8s-anomaly-detection-taxi-data/serviceaccount.yaml @@ -0,0 +1,44 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: demo-clusterrolebinding +subjects: + - apiGroup: rbac.authorization.k8s.io + kind: Group + name: system:serviceaccounts +roleRef: + kind: ClusterRole + name: demo-clusterrole + apiGroup: rbac.authorization.k8s.io +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: demo-clusterrole +rules: + - apiGroups: + - "" + resources: + - pods + verbs: + - get + - list + - watch + - apiGroups: + - batch + resources: + - jobs + verbs: + - get + - list + - watch + - apiGroups: + - spark.stackable.tech + resources: + - sparkapplications + verbs: + - get + - list + - watch + - create \ No newline at end of file diff --git a/demos/spark-k8s-anomaly-detection-taxi-data/setup-superset.yaml b/demos/spark-k8s-anomaly-detection-taxi-data/setup-superset.yaml new file mode 100644 index 00000000..a62dbb9b --- /dev/null +++ b/demos/spark-k8s-anomaly-detection-taxi-data/setup-superset.yaml @@ -0,0 +1,76 @@ +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: setup-superset +spec: + template: + spec: + containers: + - name: setup-superset + image: docker.stackable.tech/stackable/testing-tools:0.1.0-stackable0.1.0 + # TODO update + command: ["bash", "-c", "curl -o superset-assets.zip https://raw.githubusercontent.com/stackabletech/stackablectl/142_spark_anomaly_detection/demos/spark-k8s-anomaly-detection-taxi-data/superset-assets.zip && python -u /tmp/script/script.py"] + volumeMounts: + - name: script + mountPath: /tmp/script + volumes: + - name: script + configMap: + name: setup-superset-script + restartPolicy: OnFailure + backoffLimit: 50 +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: setup-superset-script +data: + script.py: | + import logging + import requests + + base_url = "http://superset-external:8088" # For local testing / developing replace it, afterwards change back to http://superset-external:8088 + username = "admin" + password = "admin" + + logging.basicConfig(level=logging.INFO) + logging.info("Starting setup of Superset") + logging.info("Getting access token from /api/v1/security/login") + session = requests.session() + access_token = session.post(f"{base_url}/api/v1/security/login", json={"username": username, "password": password, "provider": "db", "refresh": True}).json()['access_token'] + + logging.info("Getting csrf token from /api/v1/security/csrf_token") + csrf_token = session.get(f"{base_url}/api/v1/security/csrf_token", headers={"Authorization": f"Bearer {access_token}"}).json()["result"] + + headers = { + "accept": "application/json", + "Authorization": f"Bearer {access_token}", + "X-CSRFToken": csrf_token, + } + + ######################### + # Export + ######################### + #logging.info("Exporting all assets") + #result = session.get(f"{base_url}/api/v1/assets/export", headers=headers) + #assert result.status_code == 200 + #with open("superset-assets.zip", "wb") as f: + # f.write(result.content) + + ######################### + # Import + ######################### + logging.info("Importing all assets") + files = { + "bundle": ("superset-assets.zip", open("superset-assets.zip", "rb")), + } + data = { + "passwords": '{"databases/Trino.yaml": "demo"}' + } + result = session.post(f"{base_url}/api/v1/assets/import", headers=headers, files=files, data=data) + print(result) + print(result.text) + assert result.status_code == 200 + + logging.info("Finished setup of Superset") \ No newline at end of file diff --git a/demos/spark-k8s-anomaly-detection-taxi-data/superset-assets.zip b/demos/spark-k8s-anomaly-detection-taxi-data/superset-assets.zip new file mode 100644 index 00000000..723a8a9c Binary files /dev/null and b/demos/spark-k8s-anomaly-detection-taxi-data/superset-assets.zip differ diff --git a/docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/minio_0.png b/docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/minio_0.png new file mode 100644 index 00000000..789cb54c Binary files /dev/null and b/docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/minio_0.png differ diff --git a/docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/minio_1.png b/docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/minio_1.png new file mode 100644 index 00000000..5aad1adf Binary files /dev/null and b/docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/minio_1.png differ diff --git a/docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/minio_2.png b/docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/minio_2.png new file mode 100644 index 00000000..a5773d70 Binary files /dev/null and b/docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/minio_2.png differ diff --git a/docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/minio_3.png b/docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/minio_3.png new file mode 100644 index 00000000..ec289a53 Binary files /dev/null and b/docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/minio_3.png differ diff --git a/docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/minio_4.png b/docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/minio_4.png new file mode 100644 index 00000000..e2fef8b9 Binary files /dev/null and b/docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/minio_4.png differ diff --git a/docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/overview.png b/docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/overview.png new file mode 100644 index 00000000..774ba519 Binary files /dev/null and b/docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/overview.png differ diff --git a/docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/spark_job.png b/docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/spark_job.png new file mode 100644 index 00000000..b8609575 Binary files /dev/null and b/docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/spark_job.png differ diff --git a/docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/superset_anomaly_scores.png b/docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/superset_anomaly_scores.png new file mode 100644 index 00000000..af52da7a Binary files /dev/null and b/docs/modules/ROOT/images/spark-k8s-anomaly-detection-taxi-data/superset_anomaly_scores.png differ diff --git a/docs/modules/ROOT/pages/demos/spark-k8s-anomaly-detection-taxi-data.adoc b/docs/modules/ROOT/pages/demos/spark-k8s-anomaly-detection-taxi-data.adoc new file mode 100644 index 00000000..0b377d82 --- /dev/null +++ b/docs/modules/ROOT/pages/demos/spark-k8s-anomaly-detection-taxi-data.adoc @@ -0,0 +1,110 @@ += spark-k8s-anomaly-detection-taxi-data + +[NOTE] +==== +This guide assumes you already have the demo `spark-k8s-anomaly-detection-taxi-data` installed. +If you don't have it installed please follow the xref:commands/demo.adoc#_install_demo[documentation on how to install a demo]. +To put it simply you have to run `stackablectl demo install spark-k8s-anomaly-detection-taxi-data`. +==== + +This demo will + +* Install the required Stackable operators +* Spin up the following data products +** *Trino*: A fast distributed SQL query engine for big data analytics that helps you explore your data universe. This demo uses it to enable SQL access to the data +** *Spark*: A multi-language engine for executing data engineering, data science, and machine learning. This demo uses it to batch process data from S3 by training and scoring an unsupervised anomaly detection model, writing the results into a Trino table. In this demo Spark uses an isolation forest algorithm from the scikit-learn machine learning library. +** *MinIO*: A S3 compatible object store. This demo uses it as persistent storage to store all the data used +** *Hive metastore*: A service that stores metadata related to Apache Hive and other services. This demo uses it as metadata storage for Trino and Spark +** *Open policy agent* (OPA): A open source, general-purpose policy engine that unifies policy enforcement across the stack. This demo uses it as the authorizer for Trino, which decides which user is able to query which data. +** *Superset*: A modern data exploration and visualization platform. This demo utilizes Superset to retrieve data from Trino via SQL queries and build dashboards on top of that data +* Copy the taxi data in parquet format into the s3 staging area +* A Spark batch job is started, which fetches the raw data, trains and scores a model, writing out the results to Trino/S3 for use by Superset +* Create Superset dashboards for visualization of the anomaly detection scores +You can see the deployed products as well as their relationship in the following diagram: + + +image::spark-k8s-anomaly-detection-taxi-data/overview.png[] + +== List deployed Stackable services +To list the installed installed Stackable services run the following command: + +[source,console] +---- +$ stackablectl services list --all-namespaces +PRODUCT NAME NAMESPACE ENDPOINTS EXTRA INFOS + + hive hive spark-k8s-ad-taxi-data hive 172.18.0.2:31912 + metrics 172.18.0.2:30812 + + hive hive-iceberg spark-k8s-ad-taxi-data hive 172.18.0.4:32133 + metrics 172.18.0.4:32125 + + opa opa spark-k8s-ad-taxi-data http http://172.18.0.3:31450 + + superset superset spark-k8s-ad-taxi-data external-superset http://172.18.0.2:31339 Admin user: admin, password: admin + + trino trino spark-k8s-ad-taxi-data coordinator-metrics 172.18.0.3:32168 + coordinator-https https://172.18.0.3:31408 + + minio minio-trino spark-k8s-ad-taxi-data http http://172.18.0.3:30589 Third party service + console-http http://172.18.0.3:31452 Admin user: root, password: rootroot +---- + +[NOTE] +==== +When a product instance has not finished starting yet, the service will have no endpoint. +Starting all the product instances might take a considerable amount of time depending on your internet connectivity. +In case the product is not ready yet a warning might be shown. +==== + +== MinIO +=== List buckets +The S3 provided by MinIO is used as persistent storage to store all the data used. +Open the `minio-trino` endpoint `console-http` retrieved by `stackablectl services list` in your browser (http://172.18.0.3:31452 in this case). + +image::spark-k8s-anomaly-detection-taxi-data/minio_0.png[] + +Log in with the username `root` and password `rootroot`. + +image::spark-k8s-anomaly-detection-taxi-data/minio_2.png[] + +Here you can see the two buckets the S3 is split into: + +1. `demo`: The demo loads static datasets into this area. It is stored in parquet format. It forms the basis for the model that will be trained by Spark. +2. `prediction`: This bucket is where the model scores are persisted. The data is stored in the https://iceberg.apache.org/[Apache Iceberg] table format. + +=== Inspect raw data +Click on the blue button `Browse` on the bucket `demo`. + +image::spark-k8s-anomaly-detection-taxi-data/minio_3.png[] + +You can see a folder (called prefixes in S3) containing a dataset of similarly-structured data files. The data is partitioned by month and contains several hundred MBs of data. This may not seem particularly large for a data-set, but the model is a time-series model where the data has decreasing relevance the "older" it is: this is especially when the data is subject to multiple external factors, many of which are unknown and fluctuating in scope and effect. + +The second bucket `prediction` contains the output from the model scoring process: + +image::spark-k8s-anomaly-detection-taxi-data/minio_4.png[] + +This is a much smaller file as it only contains scores for each aggregated time period. + +== Spark + +The Spark job ingests the raw data and performs some fairly straightforward data wrangling and feature engineering. Any windowing features designed to capture the time-series nature of the data - such as lags or rolling averages - need to make use of evenly distributed partitions so that Spark can execute these tasks in parallel. The job uses an implementation of the Isolation Forest https://cs.nju.edu.cn/zhouzh/zhouzh.files/publication/icdm08b.pdf[algorithm] provided by the scikit-learn https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.IsolationForest.html[library]: the model is trained in a single task, but is then distributed to each executor from where it is invoked by a user-defined function. The Isolation Forest algorithm is used for unsupervised model training, which means that a labelled set of data - against which the model is trained - is not necessary. This makes model preparation easier as we do not have to divide the data set into training and validation datasets. + +You can inspect a running Spark job by forwarding the port used by the Spark-UI: + +[source,console] +---- +kubectl port-forward spark-ad-driver 4040 +---- + +and then opening a browser tab to http://localhost:4040: + +image::spark-k8s-anomaly-detection-taxi-data/spark_job.png[] + +== Dashboard + +The anomaly detection dashboard is pre-defined and accessible under `Dashboards` when you have logged in to Superset: + +image::spark-k8s-anomaly-detection-taxi-data/superset_anomaly_scores.png[] + +Have can we interpret the results? This is where the fun begins (!) as the model does not yield data that can be used directly for a root cause analysis. An isolation forest is a type of random forest that measures how many branches are needed in its underlying decision trees to isolate each data point: the more anomalous the data, the easier this will be - a clear outlier may only need a single partition to isolate it, whereas tightly clustered data will require significantly more. The number-of-partitions-to-isolate is therefore in inverse proportion to the anomaly-ness of the data. \ No newline at end of file diff --git a/stacks/spark-trino-superset-s3/hive-metastore.yaml b/stacks/spark-trino-superset-s3/hive-metastore.yaml new file mode 100644 index 00000000..a56e9833 --- /dev/null +++ b/stacks/spark-trino-superset-s3/hive-metastore.yaml @@ -0,0 +1,68 @@ +--- +apiVersion: hive.stackable.tech/v1alpha1 +kind: HiveCluster +metadata: + name: hive +spec: + version: 3.1.3-stackable0.1.0 + s3: + inline: + host: minio-trino + port: 9000 + accessStyle: Path + credentials: + secretClass: hive-s3-credentials + metastore: + roleGroups: + default: + replicas: 1 + config: + database: + connString: jdbc:postgresql://postgresql-hive:5432/hive + user: hive + password: hive + dbType: postgres +--- +apiVersion: hive.stackable.tech/v1alpha1 +kind: HiveCluster +metadata: + name: hive-iceberg +spec: + version: 3.1.3-stackable0.1.0 + s3: + inline: + host: minio-trino + port: 9000 + accessStyle: Path + credentials: + secretClass: hive-s3-credentials + metastore: + roleGroups: + default: + replicas: 1 + config: + database: + connString: jdbc:postgresql://postgresql-hive-iceberg:5432/hive + user: hive + password: hive + dbType: postgres +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: hive-s3-credentials +spec: + backend: + k8sSearch: + searchNamespace: + pod: {} +--- +apiVersion: v1 +kind: Secret +metadata: + name: hive-s3-credentials + labels: + secrets.stackable.tech/class: hive-s3-credentials +stringData: + accessKey: hive + secretKey: hivehive diff --git a/stacks/spark-trino-superset-s3/superset.yaml b/stacks/spark-trino-superset-s3/superset.yaml new file mode 100644 index 00000000..ca78cf15 --- /dev/null +++ b/stacks/spark-trino-superset-s3/superset.yaml @@ -0,0 +1,47 @@ +--- +apiVersion: superset.stackable.tech/v1alpha1 +kind: SupersetCluster +metadata: + name: superset +spec: + version: 1.5.1-stackable0.2.0 + statsdExporterVersion: v0.22.4 + credentialsSecret: superset-credentials + mapboxSecret: superset-mapbox-api-key + nodes: + roleGroups: + default: + replicas: 1 +--- +apiVersion: v1 +kind: Secret +metadata: + name: superset-credentials +type: Opaque +stringData: + adminUser.username: admin + adminUser.firstname: SupersetNur + adminUser.lastname: Admin + adminUser.email: admin@superset.com + adminUser.password: admin + connections.secretKey: thisISaSECRET_1234 + connections.sqlalchemyDatabaseUri: postgresql://superset:superset@postgresql-superset/superset +--- +apiVersion: v1 +kind: Secret +metadata: + name: superset-mapbox-api-key +type: Opaque +stringData: + connections.mapboxApiKey: "pk.eyJ1IjoibXVlbGxlcmNlbGluZSIsImEiOiJjbDZwM3llMTAwNGpmM2psdHU0Y21wZHJwIn0.kXFJKBbeyhbwf0z460JcTQ" +# --- +# TODO Use when available (https://github.com/stackabletech/superset-operator/issues/3) +# apiVersion: superset.stackable.tech/v1alpha1 +# kind: TrinoConnection +# metadata: +# name: superset-trino-connection +# spec: +# superset: +# name: superset +# trino: +# name: trino diff --git a/stacks/spark-trino-superset-s3/trino.yaml b/stacks/spark-trino-superset-s3/trino.yaml new file mode 100644 index 00000000..573d216e --- /dev/null +++ b/stacks/spark-trino-superset-s3/trino.yaml @@ -0,0 +1,128 @@ +--- +apiVersion: trino.stackable.tech/v1alpha1 +kind: TrinoCluster +metadata: + name: trino +spec: + version: 396-stackable0.1.0 + catalogLabelSelector: + matchLabels: + trino: trino + authentication: + method: + multiUser: + userCredentialsSecret: + name: trino-users + opa: + configMapName: opa + package: trino + coordinators: + roleGroups: + default: + replicas: 1 + workers: + roleGroups: + default: + replicas: 1 +--- +apiVersion: v1 +kind: Secret +metadata: + name: trino-users +type: kubernetes.io/opaque +stringData: + # admin:admin + admin: $2y$10$89xReovvDLacVzRGpjOyAOONnayOgDAyIS2nW9bs5DJT98q17Dy5i + # demo:demo + demo: $2y$10$mMRoIKfWtAuycEQnKiDCeOlCSYiWkvbs0WsMFLkaSnNO0ZnFKVRXm +--- +apiVersion: trino.stackable.tech/v1alpha1 +kind: TrinoCatalog +metadata: + name: hive + labels: + trino: trino +spec: + connector: + hive: + metastore: + configMap: hive + s3: + inline: + host: minio-trino + port: 9000 + accessStyle: Path + credentials: + secretClass: trino-s3-credentials +--- +apiVersion: trino.stackable.tech/v1alpha1 +kind: TrinoCatalog +metadata: + name: prediction + labels: + trino: trino +spec: + connector: + iceberg: + metastore: + configMap: hive-iceberg + s3: + inline: + host: minio-trino + port: 9000 + accessStyle: Path + credentials: + secretClass: trino-s3-credentials +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: trino-s3-credentials +spec: + backend: + k8sSearch: + searchNamespace: + pod: {} +--- +apiVersion: v1 +kind: Secret +metadata: + name: trino-s3-credentials + labels: + secrets.stackable.tech/class: trino-s3-credentials +stringData: + accessKey: trino + secretKey: trinotrino +--- +apiVersion: opa.stackable.tech/v1alpha1 +kind: OpaCluster +metadata: + name: opa +spec: + version: 0.41.0-stackable0.1.0 + servers: + roleGroups: + default: + selector: + matchLabels: + kubernetes.io/os: linux +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: trino-opa-bundle + labels: + opa.stackable.tech/bundle: "trino" +data: + trino.rego: | + package trino + + default allow = false + + allow { + input.context.identity.user == "admin" + } + + allow { + input.context.identity.user == "demo" + } diff --git a/stacks/stacks-v1.yaml b/stacks/stacks-v1.yaml index 9ce6e4ee..92f5d8fd 100644 --- a/stacks/stacks-v1.yaml +++ b/stacks/stacks-v1.yaml @@ -242,6 +242,22 @@ stacks: - plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/stacks/kafka-druid-superset-s3/druid.yaml # Reuse - plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/stacks/kafka-druid-superset-s3/superset.yaml # Reuse - plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/stacks/nifi-kafka-druid-superset-s3/nifi.yaml + spark-trino-superset-s3: + description: Stack containing MinIO, Trino and Superset for data visualization + stackableRelease: 22.09-latest-trino-spark + labels: + - trino + - superset + - minio + - s3 + manifests: + - helmChart: *template-minio-trino + - helmChart: *template-postgresql-hive + - helmChart: *template-postgresql-hive-iceberg + - helmChart: *template-postgresql-superset + - plainYaml: stacks/spark-trino-superset-s3/hive-metastore.yaml + - plainYaml: stacks/spark-trino-superset-s3/trino.yaml + - plainYaml: stacks/spark-trino-superset-s3/superset.yaml trino-superset-s3: description: Stack containing MinIO, Trino and Superset for data visualization stackableRelease: 22.09