Skip to content

Commit 3d9a8f4

Browse files
operator: Merge pull request redpanda-data#6304 from nicolaferraro/name-template
Add support for endpoint templates to customize external advertised addresses
2 parents 6da4cb5 + 31bb240 commit 3d9a8f4

16 files changed

+660
-37
lines changed

apis/redpanda/v1alpha1/cluster_types.go

+17-1
Original file line numberDiff line numberDiff line change
@@ -257,17 +257,33 @@ type StorageSpec struct {
257257
// used as a external listener. This port is tight to the autogenerated
258258
// host port. The collision between Kafka external, Kafka internal,
259259
// Admin, Pandaproxy, Schema Registry and RPC port is checked in the webhook.
260+
// An optional endpointTemplate can be used to configure advertised addresses
261+
// for Kafka API and Pandaproxy, while it is disallowed for other listeners.
260262
type ExternalConnectivityConfig struct {
261263
// Enabled enables the external connectivity feature
262264
Enabled bool `json:"enabled,omitempty"`
263265
// Subdomain can be used to change the behavior of an advertised
264266
// KafkaAPI. Each broker advertises Kafka API as follows
265-
// BROKER_ID.SUBDOMAIN:EXTERNAL_KAFKA_API_PORT.
267+
// ENDPOINT.SUBDOMAIN:EXTERNAL_KAFKA_API_PORT.
266268
// If Subdomain is empty then each broker advertises Kafka
267269
// API as PUBLIC_NODE_IP:EXTERNAL_KAFKA_API_PORT.
268270
// If TLS is enabled then this subdomain will be requested
269271
// as a subject alternative name.
270272
Subdomain string `json:"subdomain,omitempty"`
273+
// EndpointTemplate is a Golang template string that allows customizing each
274+
// broker advertised address.
275+
// Redpanda uses the format BROKER_ID.SUBDOMAIN:EXTERNAL_KAFKA_API_PORT by
276+
// default for advertised addresses. When an EndpointTemplate is
277+
// provided, then the BROKER_ID part is replaced with the endpoint
278+
// computed from the template.
279+
// The following variables are available to the template:
280+
// - Index: the Redpanda broker progressive number
281+
// - HostIP: the ip address of the Node, as reported in pod status
282+
//
283+
// Common template functions from Sprig (http://masterminds.github.io/sprig/)
284+
// are also available. The set of available functions is limited to hermetic
285+
// functions because template application needs to be deterministic.
286+
EndpointTemplate string `json:"endpointTemplate,omitempty"`
271287
// The preferred address type to be assigned to the external
272288
// advertised addresses. The valid types are ExternalDNS,
273289
// ExternalIP, InternalDNS, InternalIP, and Hostname.

apis/redpanda/v1alpha1/cluster_webhook.go

+61
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"strconv"
1616

1717
cmmeta "github.com/jetstack/cert-manager/pkg/apis/meta/v1"
18+
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/utils"
1819
corev1 "k8s.io/api/core/v1"
1920
apierrors "k8s.io/apimachinery/pkg/api/errors"
2021
"k8s.io/apimachinery/pkg/api/resource"
@@ -277,6 +278,12 @@ func (r *Cluster) validateAdminListeners() field.ErrorList {
277278
r.Spec.Configuration.AdminAPI,
278279
"bootstrap loadbalancer not available for http admin api"))
279280
}
281+
if externalAdmin != nil && externalAdmin.External.EndpointTemplate != "" {
282+
allErrs = append(allErrs,
283+
field.Invalid(field.NewPath("spec").Child("configuration").Child("adminApi"),
284+
r.Spec.Configuration.AdminAPI,
285+
"cannot provide an endpoint template for admin listener"))
286+
}
280287

281288
// for now only one listener can have TLS to be backward compatible with v1alpha1 API
282289
foundListenerWithTLS := false
@@ -306,6 +313,7 @@ func (r *Cluster) validateKafkaListeners() field.ErrorList {
306313
}
307314

308315
var external *KafkaAPI
316+
var externalIdx int
309317
for i, p := range r.Spec.Configuration.KafkaAPI {
310318
if p.External.Enabled {
311319
if external != nil {
@@ -315,6 +323,7 @@ func (r *Cluster) validateKafkaListeners() field.ErrorList {
315323
"only one kafka api listener can be marked as external"))
316324
}
317325
external = &r.Spec.Configuration.KafkaAPI[i]
326+
externalIdx = i
318327
}
319328
}
320329

@@ -357,10 +366,36 @@ func (r *Cluster) validateKafkaListeners() field.ErrorList {
357366
r.Spec.Configuration.KafkaAPI,
358367
"bootstrap port cannot be empty"))
359368
}
369+
// nolint:dupl // not identical
370+
if external != nil && external.External.EndpointTemplate != "" {
371+
if external.External.Subdomain == "" {
372+
allErrs = append(allErrs,
373+
field.Invalid(field.NewPath("spec").Child("configuration").Child("kafkaApi").Index(externalIdx).Child("external"),
374+
external.External,
375+
"endpointTemplate can only be used in combination with subdomain"))
376+
}
377+
378+
err := checkValidEndpointTemplate(external.External.EndpointTemplate)
379+
if err != nil {
380+
log.Error(err, "Invalid endpoint template received", "template", external.External.EndpointTemplate)
381+
allErrs = append(allErrs,
382+
field.Invalid(field.NewPath("spec").Child("configuration").Child("kafkaApi").Index(externalIdx).Child("external").Child("endpointTemplate"),
383+
external.External.EndpointTemplate,
384+
fmt.Sprintf("template is invalid: %v", err)))
385+
}
386+
}
360387

361388
return allErrs
362389
}
363390

391+
func checkValidEndpointTemplate(tmpl string) error {
392+
// Using an example input to ensure that the template expression is allowed
393+
data := utils.NewEndpointTemplateData(0, "1.2.3.4")
394+
_, err := utils.ComputeEndpoint(tmpl, data)
395+
return err
396+
}
397+
398+
// nolint:funlen,gocyclo // it's a sequence of checks
364399
func (r *Cluster) validatePandaproxyListeners() field.ErrorList {
365400
var allErrs field.ErrorList
366401
var proxyExternal *PandaproxyAPI
@@ -412,6 +447,25 @@ func (r *Cluster) validatePandaproxyListeners() field.ErrorList {
412447
r.Spec.Configuration.PandaproxyAPI[i],
413448
"sudomain of external pandaproxy must be the same as kafka's"))
414449
}
450+
// nolint:dupl // not identical
451+
if kafkaExternal != nil && proxyExternal.External.EndpointTemplate != "" {
452+
if proxyExternal.External.Subdomain == "" {
453+
allErrs = append(allErrs,
454+
field.Invalid(field.NewPath("spec").Child("configuration").Child("pandaproxyApi").Index(i).Child("external"),
455+
proxyExternal.External,
456+
"endpointTemplate can only be used in combination with subdomain"))
457+
}
458+
459+
err := checkValidEndpointTemplate(proxyExternal.External.EndpointTemplate)
460+
if err != nil {
461+
log.Error(err, "Invalid endpoint template received", "template", proxyExternal.External.EndpointTemplate)
462+
allErrs = append(allErrs,
463+
field.Invalid(field.NewPath("spec").Child("configuration").Child("pandaproxyApi").Index(i).
464+
Child("external").Child("endpointTemplate"),
465+
proxyExternal.External.EndpointTemplate,
466+
fmt.Sprintf("template is invalid: %v", err)))
467+
}
468+
}
415469
}
416470

417471
// for now only one listener can have TLS to be backward compatible with v1alpha1 API
@@ -510,6 +564,13 @@ func (r *Cluster) validateSchemaRegistryListener() field.ErrorList {
510564
r.Spec.Configuration.SchemaRegistry.External,
511565
"bootstrap loadbalancer not available for schema reigstry"))
512566
}
567+
if schemaRegistry.External.EndpointTemplate != "" {
568+
allErrs = append(allErrs,
569+
field.Invalid(field.NewPath("spec").Child("configuration").Child("schemaRegistry").Child("external").Child("endpointTemplate"),
570+
r.Spec.Configuration.SchemaRegistry.External.EndpointTemplate,
571+
"cannot provide an endpoint template for schema registry"))
572+
}
573+
513574
return allErrs
514575
}
515576

apis/redpanda/v1alpha1/cluster_webhook_test.go

+112
Original file line numberDiff line numberDiff line change
@@ -1022,6 +1022,118 @@ func TestCreation(t *testing.T) {
10221022
err := rp.ValidateCreate()
10231023
assert.Error(t, err)
10241024
})
1025+
t.Run("endpoint template not allowed for schemaregistry", func(t *testing.T) {
1026+
rp := redpandaCluster.DeepCopy()
1027+
const commonDomain = "company.org"
1028+
1029+
rp.Spec.Configuration.KafkaAPI = append(rp.Spec.Configuration.KafkaAPI, v1alpha1.KafkaAPI{External: v1alpha1.ExternalConnectivityConfig{
1030+
Enabled: true,
1031+
Subdomain: commonDomain,
1032+
}})
1033+
rp.Spec.Configuration.SchemaRegistry = &v1alpha1.SchemaRegistryAPI{External: &v1alpha1.ExternalConnectivityConfig{
1034+
Enabled: true,
1035+
Subdomain: commonDomain,
1036+
EndpointTemplate: "xxx",
1037+
}}
1038+
err := rp.ValidateCreate()
1039+
assert.Error(t, err)
1040+
})
1041+
// nolint:dupl // not really a duplicate
1042+
t.Run("endpoint template not allowed for adminapi", func(t *testing.T) {
1043+
rp := redpandaCluster.DeepCopy()
1044+
const commonDomain = "company.org"
1045+
1046+
rp.Spec.Configuration.KafkaAPI = append(rp.Spec.Configuration.KafkaAPI, v1alpha1.KafkaAPI{External: v1alpha1.ExternalConnectivityConfig{
1047+
Enabled: true,
1048+
Subdomain: commonDomain,
1049+
}})
1050+
rp.Spec.Configuration.AdminAPI = append(rp.Spec.Configuration.AdminAPI, v1alpha1.AdminAPI{External: v1alpha1.ExternalConnectivityConfig{
1051+
Enabled: true,
1052+
Subdomain: commonDomain,
1053+
EndpointTemplate: "xxx",
1054+
}})
1055+
err := rp.ValidateCreate()
1056+
assert.Error(t, err)
1057+
})
1058+
t.Run("endpoint template without subdomain is not allowed in kafka API", func(t *testing.T) {
1059+
rp := redpandaCluster.DeepCopy()
1060+
1061+
rp.Spec.Configuration.KafkaAPI = append(rp.Spec.Configuration.KafkaAPI, v1alpha1.KafkaAPI{External: v1alpha1.ExternalConnectivityConfig{
1062+
Enabled: true,
1063+
EndpointTemplate: "xxx",
1064+
}})
1065+
err := rp.ValidateCreate()
1066+
assert.Error(t, err)
1067+
})
1068+
t.Run("endpoint template without subdomain is not allowed in pandaproxy API", func(t *testing.T) {
1069+
rp := redpandaCluster.DeepCopy()
1070+
1071+
rp.Spec.Configuration.KafkaAPI = append(rp.Spec.Configuration.KafkaAPI, v1alpha1.KafkaAPI{External: v1alpha1.ExternalConnectivityConfig{
1072+
Enabled: true,
1073+
}})
1074+
rp.Spec.Configuration.PandaproxyAPI = append(rp.Spec.Configuration.PandaproxyAPI, v1alpha1.PandaproxyAPI{External: v1alpha1.ExternalConnectivityConfig{
1075+
Enabled: true,
1076+
EndpointTemplate: "xxx",
1077+
}})
1078+
err := rp.ValidateCreate()
1079+
assert.Error(t, err)
1080+
})
1081+
t.Run("invalid endpoint template in kafka API", func(t *testing.T) {
1082+
rp := redpandaCluster.DeepCopy()
1083+
1084+
rp.Spec.Configuration.KafkaAPI = append(rp.Spec.Configuration.KafkaAPI, v1alpha1.KafkaAPI{External: v1alpha1.ExternalConnectivityConfig{
1085+
Enabled: true,
1086+
Subdomain: "example.com",
1087+
EndpointTemplate: "{{.Inexistent}}",
1088+
}})
1089+
err := rp.ValidateCreate()
1090+
assert.Error(t, err)
1091+
})
1092+
t.Run("valid endpoint template in kafka API", func(t *testing.T) {
1093+
rp := redpandaCluster.DeepCopy()
1094+
1095+
rp.Spec.Configuration.KafkaAPI = append(rp.Spec.Configuration.KafkaAPI, v1alpha1.KafkaAPI{External: v1alpha1.ExternalConnectivityConfig{
1096+
Enabled: true,
1097+
Subdomain: "example.com",
1098+
EndpointTemplate: "{{.Index}}-broker",
1099+
}})
1100+
err := rp.ValidateCreate()
1101+
assert.NoError(t, err)
1102+
})
1103+
// nolint:dupl // not really a duplicate
1104+
t.Run("invalid endpoint template in pandaproxy API", func(t *testing.T) {
1105+
rp := redpandaCluster.DeepCopy()
1106+
1107+
const commonDomain = "mydomain"
1108+
rp.Spec.Configuration.KafkaAPI = append(rp.Spec.Configuration.KafkaAPI, v1alpha1.KafkaAPI{External: v1alpha1.ExternalConnectivityConfig{
1109+
Enabled: true,
1110+
Subdomain: commonDomain,
1111+
}})
1112+
rp.Spec.Configuration.PandaproxyAPI = append(rp.Spec.Configuration.PandaproxyAPI, v1alpha1.PandaproxyAPI{External: v1alpha1.ExternalConnectivityConfig{
1113+
Enabled: true,
1114+
Subdomain: commonDomain,
1115+
EndpointTemplate: "{{.Index | nonexistent }}",
1116+
}})
1117+
err := rp.ValidateCreate()
1118+
assert.Error(t, err)
1119+
})
1120+
// nolint:dupl // not really a duplicate
1121+
t.Run("valid endpoint template in pandaproxy API", func(t *testing.T) {
1122+
rp := redpandaCluster.DeepCopy()
1123+
1124+
const commonDomain = "mydomain"
1125+
rp.Spec.Configuration.KafkaAPI = append(rp.Spec.Configuration.KafkaAPI, v1alpha1.KafkaAPI{External: v1alpha1.ExternalConnectivityConfig{
1126+
Enabled: true,
1127+
Subdomain: commonDomain,
1128+
}})
1129+
rp.Spec.Configuration.PandaproxyAPI = append(rp.Spec.Configuration.PandaproxyAPI, v1alpha1.PandaproxyAPI{External: v1alpha1.ExternalConnectivityConfig{
1130+
Enabled: true,
1131+
Subdomain: commonDomain,
1132+
EndpointTemplate: "{{.Index}}-pp",
1133+
}})
1134+
err := rp.ValidateCreate()
1135+
assert.NoError(t, err)
1136+
})
10251137
}
10261138

10271139
func TestSchemaRegistryValidations(t *testing.T) {

cmd/configurator/main.go

+56-24
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
"github.com/hashicorp/go-multierror"
2323
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/networking"
24+
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/utils"
2425
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
2526
"github.com/spf13/afero"
2627
"gopkg.in/yaml.v3"
@@ -31,33 +32,39 @@ import (
3132
)
3233

3334
const (
34-
hostNameEnvVar = "HOSTNAME"
35-
svcFQDNEnvVar = "SERVICE_FQDN"
36-
configSourceDirEnvVar = "CONFIG_SOURCE_DIR"
37-
configDestinationEnvVar = "CONFIG_DESTINATION"
38-
redpandaRPCPortEnvVar = "REDPANDA_RPC_PORT"
39-
nodeNameEnvVar = "NODE_NAME"
40-
externalConnectivityEnvVar = "EXTERNAL_CONNECTIVITY"
41-
externalConnectivitySubDomainEnvVar = "EXTERNAL_CONNECTIVITY_SUBDOMAIN"
42-
externalConnectivityAddressTypeEnvVar = "EXTERNAL_CONNECTIVITY_ADDRESS_TYPE"
43-
hostPortEnvVar = "HOST_PORT"
44-
proxyHostPortEnvVar = "PROXY_HOST_PORT"
35+
hostNameEnvVar = "HOSTNAME"
36+
svcFQDNEnvVar = "SERVICE_FQDN"
37+
configSourceDirEnvVar = "CONFIG_SOURCE_DIR"
38+
configDestinationEnvVar = "CONFIG_DESTINATION"
39+
redpandaRPCPortEnvVar = "REDPANDA_RPC_PORT"
40+
nodeNameEnvVar = "NODE_NAME"
41+
externalConnectivityEnvVar = "EXTERNAL_CONNECTIVITY"
42+
externalConnectivitySubDomainEnvVar = "EXTERNAL_CONNECTIVITY_SUBDOMAIN"
43+
externalConnectivityAddressTypeEnvVar = "EXTERNAL_CONNECTIVITY_ADDRESS_TYPE"
44+
externalConnectivityKafkaEndpointTemplateEnvVar = "EXTERNAL_CONNECTIVITY_KAFKA_ENDPOINT_TEMPLATE"
45+
externalConnectivityPandaProxyEndpointTemplateEnvVar = "EXTERNAL_CONNECTIVITY_PANDA_PROXY_ENDPOINT_TEMPLATE"
46+
hostIPEnvVar = "HOST_IP_ADDRESS"
47+
hostPortEnvVar = "HOST_PORT"
48+
proxyHostPortEnvVar = "PROXY_HOST_PORT"
4549
)
4650

4751
type brokerID int
4852

4953
type configuratorConfig struct {
50-
hostName string
51-
svcFQDN string
52-
configSourceDir string
53-
configDestination string
54-
nodeName string
55-
subdomain string
56-
externalConnectivity bool
57-
externalConnectivityAddressType corev1.NodeAddressType
58-
redpandaRPCPort int
59-
hostPort int
60-
proxyHostPort int
54+
hostName string
55+
svcFQDN string
56+
configSourceDir string
57+
configDestination string
58+
nodeName string
59+
subdomain string
60+
externalConnectivity bool
61+
externalConnectivityAddressType corev1.NodeAddressType
62+
externalConnectivityKafkaEndpointTemplate string
63+
externalConnectivityPandaProxyEndpointTemplate string
64+
redpandaRPCPort int
65+
hostPort int
66+
proxyHostPort int
67+
hostIP string
6168
}
6269

6370
func (c *configuratorConfig) String() string {
@@ -205,8 +212,14 @@ func registerAdvertisedKafkaAPI(
205212
}
206213

207214
if len(c.subdomain) > 0 {
215+
data := utils.NewEndpointTemplateData(int(index), c.hostIP)
216+
ep, err := utils.ComputeEndpoint(c.externalConnectivityKafkaEndpointTemplate, data)
217+
if err != nil {
218+
return err
219+
}
220+
208221
cfg.Redpanda.AdvertisedKafkaAPI = append(cfg.Redpanda.AdvertisedKafkaAPI, config.NamedSocketAddress{
209-
Address: fmt.Sprintf("%d.%s", index, c.subdomain),
222+
Address: fmt.Sprintf("%s.%s", ep, c.subdomain),
210223
Port: c.hostPort,
211224
Name: "kafka-external",
212225
})
@@ -244,8 +257,14 @@ func registerAdvertisedPandaproxyAPI(
244257

245258
// Pandaproxy uses the Kafka API subdomain.
246259
if len(c.subdomain) > 0 {
260+
data := utils.NewEndpointTemplateData(int(index), c.hostIP)
261+
ep, err := utils.ComputeEndpoint(c.externalConnectivityPandaProxyEndpointTemplate, data)
262+
if err != nil {
263+
return err
264+
}
265+
247266
cfg.Pandaproxy.AdvertisedPandaproxyAPI = append(cfg.Pandaproxy.AdvertisedPandaproxyAPI, config.NamedSocketAddress{
248-
Address: fmt.Sprintf("%d.%s", index, c.subdomain),
267+
Address: fmt.Sprintf("%s.%s", ep, c.subdomain),
249268
Port: c.proxyHostPort,
250269
Name: "proxy-external",
251270
})
@@ -278,6 +297,7 @@ func getExternalIP(node *corev1.Node) string {
278297
return ""
279298
}
280299

300+
// nolint:funlen // envs are many
281301
func checkEnvVars() (configuratorConfig, error) {
282302
var result error
283303
var extCon string
@@ -326,6 +346,18 @@ func checkEnvVars() (configuratorConfig, error) {
326346
value: &hostPort,
327347
name: hostPortEnvVar,
328348
},
349+
{
350+
value: &c.externalConnectivityKafkaEndpointTemplate,
351+
name: externalConnectivityKafkaEndpointTemplateEnvVar,
352+
},
353+
{
354+
value: &c.externalConnectivityPandaProxyEndpointTemplate,
355+
name: externalConnectivityPandaProxyEndpointTemplateEnvVar,
356+
},
357+
{
358+
value: &c.hostIP,
359+
name: hostIPEnvVar,
360+
},
329361
}
330362
for _, envVar := range envVarList {
331363
v, exist := os.LookupEnv(envVar.name)

0 commit comments

Comments
 (0)