@@ -8,7 +8,7 @@ use std::{
8
8
use constants:: * ;
9
9
use history:: LogFileDirectorySpec ;
10
10
use logdir:: ResolvedLogDir ;
11
- use product_config:: { types:: PropertyNameKind , ProductConfigManager } ;
11
+ use product_config:: { ProductConfigManager , types:: PropertyNameKind } ;
12
12
use serde:: { Deserialize , Serialize } ;
13
13
use snafu:: { OptionExt , ResultExt , Snafu } ;
14
14
use stackable_operator:: {
@@ -33,8 +33,8 @@ use stackable_operator::{
33
33
kvp:: ObjectLabels ,
34
34
memory:: { BinaryMultiple , MemoryQuantity } ,
35
35
product_config_utils:: {
36
- transform_all_roles_to_config , validate_all_roles_and_groups_config ,
37
- ValidatedRoleConfigByPropertyKind ,
36
+ ValidatedRoleConfigByPropertyKind , transform_all_roles_to_config ,
37
+ validate_all_roles_and_groups_config ,
38
38
} ,
39
39
product_logging,
40
40
role_utils:: { CommonConfiguration , GenericRoleConfig , JavaCommonConfig , Role , RoleGroup } ,
@@ -544,20 +544,47 @@ impl v1alpha1::SparkApplication {
544
544
let mut submit_cmd = vec ! [
545
545
"/stackable/spark/bin/spark-submit" . to_string( ) ,
546
546
"--verbose" . to_string( ) ,
547
- "--master k8s://https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT_HTTPS}" . to_string( ) ,
547
+ "--master k8s://https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT_HTTPS}"
548
+ . to_string( ) ,
548
549
format!( "--deploy-mode {mode}" ) ,
549
550
format!( "--name {name}" ) ,
550
- format!( "--conf spark.kubernetes.driver.podTemplateFile={VOLUME_MOUNT_PATH_DRIVER_POD_TEMPLATES}/{POD_TEMPLATE_FILE}" ) ,
551
- format!( "--conf spark.kubernetes.executor.podTemplateFile={VOLUME_MOUNT_PATH_EXECUTOR_POD_TEMPLATES}/{POD_TEMPLATE_FILE}" ) ,
552
- format!( "--conf spark.kubernetes.driver.podTemplateContainerName={container_name}" , container_name = SparkContainer :: Spark ) ,
553
- format!( "--conf spark.kubernetes.executor.podTemplateContainerName={container_name}" , container_name = SparkContainer :: Spark ) ,
554
- format!( "--conf spark.kubernetes.namespace={}" , self . metadata. namespace. as_ref( ) . context( NoNamespaceSnafu ) ?) ,
555
- format!( "--conf spark.kubernetes.driver.container.image={}" , spark_image. to_string( ) ) ,
556
- format!( "--conf spark.kubernetes.executor.container.image={}" , spark_image. to_string( ) ) ,
557
- format!( "--conf spark.kubernetes.authenticate.driver.serviceAccountName={}" , serviceaccount_name) ,
558
- format!( "--conf spark.driver.defaultJavaOptions=-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}" ) ,
551
+ format!(
552
+ "--conf spark.kubernetes.driver.podTemplateFile={VOLUME_MOUNT_PATH_DRIVER_POD_TEMPLATES}/{POD_TEMPLATE_FILE}"
553
+ ) ,
554
+ format!(
555
+ "--conf spark.kubernetes.executor.podTemplateFile={VOLUME_MOUNT_PATH_EXECUTOR_POD_TEMPLATES}/{POD_TEMPLATE_FILE}"
556
+ ) ,
557
+ format!(
558
+ "--conf spark.kubernetes.driver.podTemplateContainerName={container_name}" ,
559
+ container_name = SparkContainer :: Spark
560
+ ) ,
561
+ format!(
562
+ "--conf spark.kubernetes.executor.podTemplateContainerName={container_name}" ,
563
+ container_name = SparkContainer :: Spark
564
+ ) ,
565
+ format!(
566
+ "--conf spark.kubernetes.namespace={}" ,
567
+ self . metadata. namespace. as_ref( ) . context( NoNamespaceSnafu ) ?
568
+ ) ,
569
+ format!(
570
+ "--conf spark.kubernetes.driver.container.image={}" ,
571
+ spark_image. to_string( )
572
+ ) ,
573
+ format!(
574
+ "--conf spark.kubernetes.executor.container.image={}" ,
575
+ spark_image. to_string( )
576
+ ) ,
577
+ format!(
578
+ "--conf spark.kubernetes.authenticate.driver.serviceAccountName={}" ,
579
+ serviceaccount_name
580
+ ) ,
581
+ format!(
582
+ "--conf spark.driver.defaultJavaOptions=-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}"
583
+ ) ,
559
584
format!( "--conf spark.driver.extraClassPath=/stackable/spark/extra-jars/*" ) ,
560
- format!( "--conf spark.executor.defaultJavaOptions=-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}" ) ,
585
+ format!(
586
+ "--conf spark.executor.defaultJavaOptions=-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}"
587
+ ) ,
561
588
format!( "--conf spark.executor.extraClassPath=/stackable/spark/extra-jars/*" ) ,
562
589
] ;
563
590
@@ -682,7 +709,9 @@ impl v1alpha1::SparkApplication {
682
709
submit_cmd. extend ( self . spec . args . clone ( ) ) ;
683
710
684
711
Ok ( vec ! [
685
- format!( "containerdebug --output={VOLUME_MOUNT_PATH_LOG}/containerdebug-state.json --loop &" ) ,
712
+ format!(
713
+ "containerdebug --output={VOLUME_MOUNT_PATH_LOG}/containerdebug-state.json --loop &"
714
+ ) ,
686
715
submit_cmd. join( " " ) ,
687
716
] )
688
717
}
@@ -792,14 +821,11 @@ impl v1alpha1::SparkApplication {
792
821
} ;
793
822
if let Some ( role_envs) = role_envs {
794
823
env. extend ( role_envs. iter ( ) . map ( |( k, v) | {
795
- (
796
- k,
797
- EnvVar {
798
- name : k. clone ( ) ,
799
- value : Some ( v. clone ( ) ) ,
800
- ..Default :: default ( )
801
- } ,
802
- )
824
+ ( k, EnvVar {
825
+ name : k. clone ( ) ,
826
+ value : Some ( v. clone ( ) ) ,
827
+ ..Default :: default ( )
828
+ } )
803
829
} ) )
804
830
}
805
831
@@ -854,13 +880,10 @@ impl v1alpha1::SparkApplication {
854
880
Role {
855
881
config : submit_conf. clone ( ) ,
856
882
role_config : GenericRoleConfig :: default ( ) ,
857
- role_groups : [ (
858
- "default" . to_string ( ) ,
859
- RoleGroup {
860
- config : submit_conf,
861
- replicas : Some ( 1 ) ,
862
- } ,
863
- ) ]
883
+ role_groups : [ ( "default" . to_string ( ) , RoleGroup {
884
+ config : submit_conf,
885
+ replicas : Some ( 1 ) ,
886
+ } ) ]
864
887
. into ( ) ,
865
888
}
866
889
. erase ( ) ,
@@ -877,13 +900,10 @@ impl v1alpha1::SparkApplication {
877
900
Role {
878
901
config : driver_conf. clone ( ) ,
879
902
role_config : GenericRoleConfig :: default ( ) ,
880
- role_groups : [ (
881
- "default" . to_string ( ) ,
882
- RoleGroup {
883
- config : driver_conf,
884
- replicas : Some ( 1 ) ,
885
- } ,
886
- ) ]
903
+ role_groups : [ ( "default" . to_string ( ) , RoleGroup {
904
+ config : driver_conf,
905
+ replicas : Some ( 1 ) ,
906
+ } ) ]
887
907
. into ( ) ,
888
908
}
889
909
. erase ( ) ,
@@ -967,7 +987,9 @@ fn subtract_spark_memory_overhead(for_java: bool, limit: &Quantity) -> Result<St
967
987
. value as u32 ;
968
988
969
989
if MIN_MEMORY_OVERHEAD > original_memory {
970
- tracing:: warn!( "Skip memory overhead since not enough memory ({original_memory}m). At least {MIN_MEMORY_OVERHEAD}m required" ) ;
990
+ tracing:: warn!(
991
+ "Skip memory overhead since not enough memory ({original_memory}m). At least {MIN_MEMORY_OVERHEAD}m required"
992
+ ) ;
971
993
return Ok ( format ! ( "{original_memory}m" ) ) ;
972
994
}
973
995
@@ -981,7 +1003,9 @@ fn subtract_spark_memory_overhead(for_java: bool, limit: &Quantity) -> Result<St
981
1003
982
1004
let deduction = max ( MIN_MEMORY_OVERHEAD , original_memory - reduced_memory) ;
983
1005
984
- tracing:: debug!( "subtract_spark_memory_overhead: original_memory ({original_memory}) - deduction ({deduction})" ) ;
1006
+ tracing:: debug!(
1007
+ "subtract_spark_memory_overhead: original_memory ({original_memory}) - deduction ({deduction})"
1008
+ ) ;
985
1009
Ok ( format ! ( "{}m" , original_memory - deduction) )
986
1010
}
987
1011
@@ -1089,7 +1113,7 @@ mod tests {
1089
1113
use std:: collections:: { BTreeMap , HashMap } ;
1090
1114
1091
1115
use indoc:: indoc;
1092
- use product_config:: { types:: PropertyNameKind , ProductConfigManager } ;
1116
+ use product_config:: { ProductConfigManager , types:: PropertyNameKind } ;
1093
1117
use rstest:: rstest;
1094
1118
use stackable_operator:: {
1095
1119
commons:: {
0 commit comments