@@ -22,7 +22,7 @@ use stackable_operator::{
22
22
} ,
23
23
cluster_resources:: { ClusterResourceApplyStrategy , ClusterResources } ,
24
24
commons:: {
25
- listener:: { Listener , ListenerPort , ListenerSpec } ,
25
+ listener:: { Listener , ListenerPort } ,
26
26
product_image_selection:: ResolvedProductImage ,
27
27
} ,
28
28
k8s_openapi:: {
@@ -66,6 +66,7 @@ use crate::{
66
66
VOLUME_MOUNT_PATH_LOG , VOLUME_MOUNT_PATH_LOG_CONFIG ,
67
67
} ,
68
68
history:: { self , HistoryConfig , SparkHistoryServerContainer , v1alpha1} ,
69
+ listener,
69
70
logdir:: ResolvedLogDir ,
70
71
tlscerts, to_spark_env_sh_string,
71
72
} ,
@@ -82,6 +83,9 @@ pub enum Error {
82
83
source : stackable_operator:: builder:: meta:: Error ,
83
84
} ,
84
85
86
+ #[ snafu( display( "failed to build spark history group listener" ) ) ]
87
+ BuildListener { source : crate :: crd:: listener:: Error } ,
88
+
85
89
#[ snafu( display( "failed to build listener volume" ) ) ]
86
90
BuildListenerVolume {
87
91
source : ListenerOperatorVolumeSourceBuilderError ,
@@ -363,41 +367,27 @@ fn build_group_listener(
363
367
rolegroup : & RoleGroupRef < v1alpha1:: SparkHistoryServer > ,
364
368
listener_class : String ,
365
369
) -> Result < Listener , Error > {
366
- Ok ( Listener {
367
- metadata : ObjectMetaBuilder :: new ( )
368
- . name_and_namespace ( shs)
369
- . name ( shs. group_listener_name ( rolegroup) )
370
- . ownerreference_from_resource ( shs, None , Some ( true ) )
371
- . context ( ObjectMissingMetadataForOwnerRefSnafu ) ?
372
- . with_recommended_labels ( labels (
373
- shs,
374
- & resolved_product_image. app_version_label ,
375
- & rolegroup. role_group ,
376
- ) )
377
- . context ( ObjectMetaSnafu ) ?
378
- . build ( ) ,
379
- spec : ListenerSpec {
380
- class_name : Some ( listener_class) ,
381
- ports : Some ( listener_ports ( ) ) ,
382
- ..ListenerSpec :: default ( )
383
- } ,
384
- status : None ,
385
- } )
386
- }
370
+ let listener_name = rolegroup. object_name ( ) ;
371
+ let recommended_object_labels = labels (
372
+ shs,
373
+ & resolved_product_image. app_version_label ,
374
+ & rolegroup. role_group ,
375
+ ) ;
387
376
388
- fn listener_ports ( ) -> Vec < ListenerPort > {
389
- vec ! [
390
- ListenerPort {
391
- name: "metrics" . to_string( ) ,
392
- port: METRICS_PORT . into( ) ,
393
- protocol: Some ( "TCP" . to_string( ) ) ,
394
- } ,
395
- ListenerPort {
396
- name: "http" . to_string( ) ,
397
- port: HISTORY_UI_PORT . into( ) ,
398
- protocol: Some ( "TCP" . to_string( ) ) ,
399
- } ,
400
- ]
377
+ let listener_ports = [ ListenerPort {
378
+ name : "http" . to_string ( ) ,
379
+ port : HISTORY_UI_PORT . into ( ) ,
380
+ protocol : Some ( "TCP" . to_string ( ) ) ,
381
+ } ] ;
382
+
383
+ listener:: build_listener (
384
+ shs,
385
+ & listener_name,
386
+ & listener_class,
387
+ recommended_object_labels,
388
+ & listener_ports,
389
+ )
390
+ . context ( BuildListenerSnafu )
401
391
}
402
392
403
393
pub fn error_policy (
@@ -607,7 +597,7 @@ fn build_stateful_set(
607
597
// cluster-internal) as the address should still be consistent.
608
598
let volume_claim_templates = Some ( vec ! [
609
599
ListenerOperatorVolumeSourceBuilder :: new(
610
- & ListenerReference :: ListenerName ( shs . group_listener_name ( rolegroupref ) ) ,
600
+ & ListenerReference :: ListenerName ( rolegroupref . object_name ( ) ) ,
611
601
& recommended_labels,
612
602
)
613
603
. context( BuildListenerVolumeSnafu ) ?
0 commit comments