diff --git a/README.md b/README.md index 2a292fe..72d000d 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,6 @@ # coder-logstream-kube -[![discord](https://img.shields.io/discord/747933592273027093?label=discord)](https://discord.gg/coder) -[![release](https://img.shields.io/github/v/tag/coder/coder-logstream-kube)](https://github.com/coder/envbuilder/pkgs/container/coder-logstream-kube) -[![godoc](https://pkg.go.dev/badge/github.com/coder/coder-logstream-kube.svg)](https://pkg.go.dev/github.com/coder/coder-logstream-kube) +[![Go Reference](https://pkg.go.dev/badge/github.com/coder/coder-logstream-kube.svg)](https://pkg.go.dev/github.com/coder/coder-logstream-kube) [![license](https://img.shields.io/github/license/coder/coder-logstream-kube)](./LICENSE) Stream Kubernetes Pod events to the Coder startup logs. @@ -10,6 +8,7 @@ Stream Kubernetes Pod events to the Coder startup logs. - Easily determine the reason for a pod provision failure, or why a pod is stuck in a pending state. - Visibility into when pods are OOMKilled, or when they are evicted. - Filter by namespace, field selector, and label selector to reduce Kubernetes API load. +- Support for watching multiple namespaces or all namespaces cluster-wide. ![Log Stream](./scripts/demo.png) @@ -24,6 +23,36 @@ helm install coder-logstream-kube coder-logstream-kube/coder-logstream-kube \ --set url= ``` +### Multi-Namespace Support + +By default, `coder-logstream-kube` watches pods in the namespace where it's deployed. You can configure it to watch multiple namespaces or all namespaces: + +#### Watch specific namespaces +```console +helm install coder-logstream-kube coder-logstream-kube/coder-logstream-kube \ + --namespace coder \ + --set url= \ + --set namespaces="namespace1,namespace2,namespace3" +``` + +#### Watch all namespaces +```console +helm install coder-logstream-kube coder-logstream-kube/coder-logstream-kube \ + --namespace coder \ + --set url= \ + --set namespaces="" +``` + +When watching multiple namespaces or all namespaces, the chart automatically creates ClusterRole and ClusterRoleBinding resources instead of namespace-scoped Role and RoleBinding. + +### Environment Variable Configuration + +You can also configure namespaces using the `CODER_NAMESPACE` environment variable: + +- Single namespace: `CODER_NAMESPACE=my-namespace` +- Multiple namespaces: `CODER_NAMESPACE=ns1,ns2,ns3` +- All namespaces: `CODER_NAMESPACE=""` (empty string) + > **Note** > For additional customization (such as customizing the image, pull secrets, annotations, etc.), you can use the > [values.yaml](helm/values.yaml) file directly. @@ -46,7 +75,24 @@ Kubernetes provides an [informers](https://pkg.go.dev/k8s.io/client-go/informers `coder-logstream-kube` listens for pod creation events with containers that have the `CODER_AGENT_TOKEN` environment variable set. All pod events are streamed as logs to the Coder API using the agent token for authentication. +When configured for multiple namespaces, the application creates separate informers for each specified namespace. When configured to watch all namespaces (empty namespace list), it uses cluster-wide informers. + ## Custom Certificates - [`SSL_CERT_FILE`](https://go.dev/src/crypto/x509/root_unix.go#L19): Specifies the path to an SSL certificate. - [`SSL_CERT_DIR`](https://go.dev/src/crypto/x509/root_unix.go#L25): Identifies which directory to check for SSL certificate files. + +## RBAC Permissions + +The required permissions depend on the scope of namespaces being watched: + +### Single Namespace (Role/RoleBinding) +When watching a single namespace, the application uses namespace-scoped permissions: +- `pods`: get, watch, list +- `events`: get, watch, list +- `replicasets`: get, watch, list + +### Multiple Namespaces or All Namespaces (ClusterRole/ClusterRoleBinding) +When watching multiple namespaces or all namespaces, the application requires cluster-wide permissions with the same resource access but across all namespaces. + +The Helm chart automatically determines which type of RBAC resources to create based on your configuration. diff --git a/helm/templates/service.yaml b/helm/templates/service.yaml index a414f2a..3ec2cbb 100644 --- a/helm/templates/service.yaml +++ b/helm/templates/service.yaml @@ -1,7 +1,24 @@ +{{/* +Determine if cluster-wide permissions are needed. +This happens when: +1. namespaces is explicitly set to empty string (watch all namespaces) +2. namespaces contains multiple comma-separated values +3. rbac.clusterWide is explicitly set to true +*/}} +{{- $namespaces := .Values.namespaces | default .Release.Namespace -}} +{{- $namespacesCount := 0 -}} +{{- if eq $namespaces "" -}} + {{- $namespacesCount = 0 -}} +{{- else -}} + {{- $namespacesCount = len (splitList "," $namespaces) -}} +{{- end -}} +{{- $useClusterWide := or .Values.rbac.clusterWide (eq $namespaces "") (gt $namespacesCount 1) -}} + +{{- if $useClusterWide }} apiVersion: rbac.authorization.k8s.io/v1 -kind: Role +kind: ClusterRole metadata: - name: coder-logstream-kube-role + name: {{ .Release.Name }}-coder-logstream-kube-role rules: - apiGroups: [""] resources: ["pods", "events"] @@ -10,12 +27,30 @@ rules: resources: ["replicasets", "events"] verbs: ["get", "watch", "list"] --- -apiVersion: v1 -kind: ServiceAccount +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding metadata: + name: {{ .Release.Name }}-coder-logstream-kube-rolebinding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: {{ .Release.Name }}-coder-logstream-kube-role +subjects: +- kind: ServiceAccount name: {{ .Values.serviceAccount.name | quote }} - annotations: {{ toYaml .Values.serviceAccount.annotations | nindent 4 }} - labels: {{ toYaml .Values.serviceAccount.labels | nindent 4 }} + namespace: {{ .Release.Namespace }} +{{- else }} +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: coder-logstream-kube-role +rules: +- apiGroups: [""] + resources: ["pods", "events"] + verbs: ["get", "watch", "list"] +- apiGroups: ["apps"] + resources: ["replicasets", "events"] + verbs: ["get", "watch", "list"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding @@ -28,6 +63,14 @@ roleRef: subjects: - kind: ServiceAccount name: {{ .Values.serviceAccount.name | quote }} +{{- end }} +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ .Values.serviceAccount.name | quote }} + annotations: {{ toYaml .Values.serviceAccount.annotations | nindent 4 }} + labels: {{ toYaml .Values.serviceAccount.labels | nindent 4 }} --- apiVersion: apps/v1 kind: Deployment @@ -76,7 +119,7 @@ spec: - name: CODER_URL value: {{ .Values.url }} - name: CODER_NAMESPACE - value: {{ .Values.namespace | default .Release.Namespace }} + value: {{ $namespaces }} {{- if .Values.image.sslCertFile }} - name: SSL_CERT_FILE value: {{ .Values.image.sslCertFile }} @@ -95,3 +138,4 @@ spec: {{- if .Values.volumes }} volumes: {{- toYaml .Values.volumes | nindent 8 }} {{- end }} + diff --git a/helm/values.yaml b/helm/values.yaml index 7ae1a1c..8746514 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -1,9 +1,9 @@ -# url -- The URL of your Coder deployment. Must prefix with http or https url: "" -# namespace -- The namespace to searching for Pods within. +# namespaces -- Comma-separated list of namespaces to watch for Pods. # If unspecified, this defaults to the Helm namespace. -namespace: "" +# If set to empty string (""), watches all namespaces (requires cluster-wide permissions). +namespaces: "" # volumes -- A list of extra volumes to add to the coder-logstream pod. volumes: @@ -46,6 +46,12 @@ serviceAccount: # coder.serviceAccount.name -- The service account name name: coder-logstream-kube +# rbac -- RBAC configuration +rbac: + # rbac.clusterWide -- Whether to use cluster-wide permissions (ClusterRole/ClusterRoleBinding). + # This is automatically set to true when namespaces is empty or contains multiple namespaces. + clusterWide: false + # resources -- The resources to request for the Deployment. These are optional # and are not set by default. resources: @@ -98,6 +104,3 @@ securityContext: {} # drop: # - ALL # readOnlyRootFilesystem: true - # runAsNonRoot: true - # seccompProfile: - # type: RuntimeDefault diff --git a/logger.go b/logger.go index 231d01b..a4fda87 100644 --- a/logger.go +++ b/logger.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/url" + "strings" "sync" "time" @@ -36,7 +37,7 @@ type podEventLoggerOptions struct { logDebounce time.Duration // The following fields are optional! - namespace string + namespaces string fieldSelector string labelSelector string } @@ -95,6 +96,23 @@ type podEventLogger struct { lq *logQueuer } +// parseNamespaces parses the comma-separated namespaces string and returns a slice of namespace names. +// If the input is empty, it returns an empty slice indicating all namespaces should be watched. +func parseNamespaces(namespaces string) []string { + if namespaces == "" { + return []string{} + } + + var result []string + for _, ns := range strings.Split(namespaces, ",") { + ns = strings.TrimSpace(ns) + if ns != "" { + result = append(result, ns) + } + } + return result +} + // init starts the informer factory and registers event handlers. func (p *podEventLogger) init() error { // We only track events that happen after the reporter starts. @@ -103,14 +121,49 @@ func (p *podEventLogger) init() error { go p.lq.work(p.ctx) - podFactory := informers.NewSharedInformerFactoryWithOptions(p.client, 0, informers.WithNamespace(p.namespace), informers.WithTweakListOptions(func(lo *v1.ListOptions) { - lo.FieldSelector = p.fieldSelector - lo.LabelSelector = p.labelSelector - })) + namespaceList := parseNamespaces(p.namespaces) + + // If no namespaces specified, watch all namespaces + if len(namespaceList) == 0 { + return p.initForNamespace("", startTime) + } + + // Watch specific namespaces + for _, namespace := range namespaceList { + if err := p.initForNamespace(namespace, startTime); err != nil { + return fmt.Errorf("init for namespace %s: %w", namespace, err) + } + } + + return nil +} + +// initForNamespace initializes informers for a specific namespace. +// If namespace is empty, it watches all namespaces. +func (p *podEventLogger) initForNamespace(namespace string, startTime time.Time) error { + var podFactory informers.SharedInformerFactory + if namespace == "" { + // Watch all namespaces + podFactory = informers.NewSharedInformerFactoryWithOptions(p.client, 0, informers.WithTweakListOptions(func(lo *v1.ListOptions) { + lo.FieldSelector = p.fieldSelector + lo.LabelSelector = p.labelSelector + })) + } else { + // Watch specific namespace + podFactory = informers.NewSharedInformerFactoryWithOptions(p.client, 0, informers.WithNamespace(namespace), informers.WithTweakListOptions(func(lo *v1.ListOptions) { + lo.FieldSelector = p.fieldSelector + lo.LabelSelector = p.labelSelector + })) + } + eventFactory := podFactory if p.fieldSelector != "" || p.labelSelector != "" { // Events cannot filter on labels and fields! - eventFactory = informers.NewSharedInformerFactoryWithOptions(p.client, 0, informers.WithNamespace(p.namespace)) + if namespace == "" { + eventFactory = informers.NewSharedInformerFactoryWithOptions(p.client, 0) + } else { + eventFactory = informers.NewSharedInformerFactoryWithOptions(p.client, 0, informers.WithNamespace(namespace)) + } } // We listen for Pods and Events in the informer factory. @@ -200,37 +253,31 @@ func (p *podEventLogger) init() error { p.sendLog(replicaSet.Name, env.Value, agentsdk.Log{ CreatedAt: time.Now(), - Output: fmt.Sprintf("🐳 %s: %s", newColor(color.Bold).Sprint("Queued pod from ReplicaSet"), replicaSet.Name), + Output: fmt.Sprintf("📦 %s: %s", newColor(color.Bold).Sprint("Created replicaset"), replicaSet.Name), Level: codersdk.LogLevelInfo, }) } } if registered { - p.logger.Info(p.ctx, "registered agent pod from ReplicaSet", slog.F("name", replicaSet.Name)) + p.logger.Info(p.ctx, "registered agent replicaset", slog.F("name", replicaSet.Name), slog.F("namespace", replicaSet.Namespace)) } }, DeleteFunc: func(obj interface{}) { replicaSet, ok := obj.(*appsv1.ReplicaSet) if !ok { - p.errChan <- fmt.Errorf("unexpected replica set delete object type: %T", obj) + p.errChan <- fmt.Errorf("unexpected replicaset delete object type: %T", obj) return } tokens := p.tc.deleteReplicaSetToken(replicaSet.Name) - if len(tokens) == 0 { - return - } - for _, token := range tokens { p.sendLog(replicaSet.Name, token, agentsdk.Log{ CreatedAt: time.Now(), - Output: fmt.Sprintf("🗑️ %s: %s", newColor(color.Bold).Sprint("Deleted ReplicaSet"), replicaSet.Name), + Output: fmt.Sprintf("🗑️ %s: %s", newColor(color.Bold).Sprint("Deleted replicaset"), replicaSet.Name), Level: codersdk.LogLevelError, }) - p.sendDelete(token) } - - p.logger.Info(p.ctx, "unregistered ReplicaSet", slog.F("name", replicaSet.Name)) + p.logger.Info(p.ctx, "unregistered agent replicaset", slog.F("name", replicaSet.Name)) }, }) if err != nil { @@ -250,24 +297,32 @@ func (p *podEventLogger) init() error { return } + // We only care about events for pods and replicasets. var tokens []string switch event.InvolvedObject.Kind { case "Pod": tokens = p.tc.getPodTokens(event.InvolvedObject.Name) case "ReplicaSet": tokens = p.tc.getReplicaSetTokens(event.InvolvedObject.Name) + default: + return } + if len(tokens) == 0 { return } + level := codersdk.LogLevelInfo + if event.Type == "Warning" { + level = codersdk.LogLevelWarn + } + for _, token := range tokens { p.sendLog(event.InvolvedObject.Name, token, agentsdk.Log{ - CreatedAt: time.Now(), - Output: newColor(color.FgWhite).Sprint(event.Message), - Level: codersdk.LogLevelInfo, + CreatedAt: event.CreationTimestamp.Time, + Output: fmt.Sprintf("⚡ %s: %s", newColor(color.Bold).Sprint(event.Reason), event.Message), + Level: level, }) - p.logger.Info(p.ctx, "sending log", slog.F("pod", event.InvolvedObject.Name), slog.F("message", event.Message)) } }, }) @@ -275,45 +330,38 @@ func (p *podEventLogger) init() error { return fmt.Errorf("register event handler: %w", err) } - p.logger.Info(p.ctx, "listening for pod events", - slog.F("coder_url", p.coderURL.String()), - slog.F("namespace", p.namespace), - slog.F("field_selector", p.fieldSelector), - slog.F("label_selector", p.labelSelector), - ) - podFactory.Start(p.stopChan) - if podFactory != eventFactory { - eventFactory.Start(p.stopChan) + go podFactory.Start(p.ctx.Done()) + if eventFactory != podFactory { + go eventFactory.Start(p.ctx.Done()) } + return nil } -var sourceUUID = uuid.MustParse("cabdacf8-7c90-425c-9815-cae3c75d1169") - -// loggerForToken returns a logger for the given pod name and agent token. -// If a logger already exists for the token, it's returned. Otherwise a new -// logger is created and returned. It assumes a lock to p.mutex is already being -// held. -func (p *podEventLogger) sendLog(resourceName, token string, log agentsdk.Log) { - p.logCh <- agentLog{ - op: opLog, - resourceName: resourceName, - agentToken: token, - log: log, +func (p *podEventLogger) sendLog(name, token string, log agentsdk.Log) { + select { + case p.logCh <- agentLog{ + name: name, + token: token, + log: log, + }: + case <-p.ctx.Done(): } } func (p *podEventLogger) sendDelete(token string) { - p.logCh <- agentLog{ - op: opDelete, - agentToken: token, + select { + case p.logCh <- agentLog{ + token: token, + delete: true, + }: + case <-p.ctx.Done(): } } func (p *podEventLogger) Close() error { p.cancelFunc() close(p.stopChan) - close(p.errChan) return nil } @@ -323,240 +371,224 @@ type tokenCache struct { replicaSets map[string][]string } -func (t *tokenCache) setPodToken(name, token string) []string { return t.set(t.pods, name, token) } -func (t *tokenCache) getPodTokens(name string) []string { return t.get(t.pods, name) } -func (t *tokenCache) deletePodToken(name string) []string { return t.delete(t.pods, name) } +func (tc *tokenCache) setPodToken(name, token string) { + tc.mu.Lock() + defer tc.mu.Unlock() -func (t *tokenCache) setReplicaSetToken(name, token string) []string { - return t.set(t.replicaSets, name, token) -} -func (t *tokenCache) getReplicaSetTokens(name string) []string { return t.get(t.replicaSets, name) } -func (t *tokenCache) deleteReplicaSetToken(name string) []string { - return t.delete(t.replicaSets, name) + tokens, ok := tc.pods[name] + if !ok { + tc.pods[name] = []string{token} + return + } + + for _, t := range tokens { + if t == token { + return + } + } + + tc.pods[name] = append(tokens, token) } -func (t *tokenCache) get(m map[string][]string, name string) []string { - t.mu.RLock() - tokens := m[name] - t.mu.RUnlock() +func (tc *tokenCache) deletePodToken(name string) []string { + tc.mu.Lock() + defer tc.mu.Unlock() + + tokens, ok := tc.pods[name] + if !ok { + return nil + } + + delete(tc.pods, name) return tokens } -func (t *tokenCache) set(m map[string][]string, name, token string) []string { - t.mu.Lock() - tokens, ok := m[name] +func (tc *tokenCache) getPodTokens(name string) []string { + tc.mu.RLock() + defer tc.mu.RUnlock() + + tokens, ok := tc.pods[name] if !ok { - tokens = []string{token} - } else { - tokens = append(tokens, token) + return nil } - m[name] = tokens - t.mu.Unlock() - return tokens + return append([]string(nil), tokens...) } -func (t *tokenCache) delete(m map[string][]string, name string) []string { - t.mu.Lock() - tokens := m[name] - delete(m, name) - t.mu.Unlock() - return tokens +func (tc *tokenCache) setReplicaSetToken(name, token string) { + tc.mu.Lock() + defer tc.mu.Unlock() + + tokens, ok := tc.replicaSets[name] + if !ok { + tc.replicaSets[name] = []string{token} + return + } + + for _, t := range tokens { + if t == token { + return + } + } + + tc.replicaSets[name] = append(tokens, token) } -func (t *tokenCache) isEmpty() bool { - t.mu.Lock() - defer t.mu.Unlock() - return len(t.pods)+len(t.replicaSets) == 0 +func (tc *tokenCache) deleteReplicaSetToken(name string) []string { + tc.mu.Lock() + defer tc.mu.Unlock() + + tokens, ok := tc.replicaSets[name] + if !ok { + return nil + } + + delete(tc.replicaSets, name) + return tokens } -type op int +func (tc *tokenCache) getReplicaSetTokens(name string) []string { + tc.mu.RLock() + defer tc.mu.RUnlock() -const ( - opLog op = iota - opDelete -) + tokens, ok := tc.replicaSets[name] + if !ok { + return nil + } + + return append([]string(nil), tokens...) +} type agentLog struct { - op op - resourceName string - agentToken string - log agentsdk.Log + name string + token string + log agentsdk.Log + delete bool } -// logQueuer is a single-threaded queue for dispatching logs. type logQueuer struct { - mu sync.Mutex - logger slog.Logger - clock quartz.Clock - q chan agentLog - + logger slog.Logger + clock quartz.Clock + q <-chan agentLog coderURL *url.URL loggerTTL time.Duration - loggers map[string]agentLoggerLifecycle - logCache logCache + + mu sync.RWMutex + loggers map[string]agentLoggerLifecycle + + logCache logCache } -func (l *logQueuer) work(ctx context.Context) { - for ctx.Err() == nil { - select { - case log := <-l.q: - switch log.op { - case opLog: - l.processLog(ctx, log) - case opDelete: - l.processDelete(log) - } +type logCache struct { + mu sync.RWMutex + logs map[string][]agentsdk.Log +} - case <-ctx.Done(): - return - } +func (lc *logCache) append(token string, log agentsdk.Log) { + lc.mu.Lock() + defer lc.mu.Unlock() - } + lc.logs[token] = append(lc.logs[token], log) } -func (l *logQueuer) processLog(ctx context.Context, log agentLog) { - l.mu.Lock() - defer l.mu.Unlock() - queuedLogs := l.logCache.push(log) - lgr, ok := l.loggers[log.agentToken] +func (lc *logCache) flush(token string) []agentsdk.Log { + lc.mu.Lock() + defer lc.mu.Unlock() + + logs, ok := lc.logs[token] if !ok { - client := agentsdk.New(l.coderURL) - client.SetSessionToken(log.agentToken) - logger := l.logger.With(slog.F("resource_name", log.resourceName)) - client.SDK.SetLogger(logger) - - _, err := client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{ - ID: sourceUUID, - Icon: "/icon/k8s.png", - DisplayName: "Kubernetes", - }) - if err != nil { - // This shouldn't fail sending the log, as it only affects how they - // appear. - logger.Error(ctx, "post log source", slog.Error(err)) - } + return nil + } - ls := agentsdk.NewLogSender(logger) - sl := ls.GetScriptLogger(sourceUUID) + delete(lc.logs, token) + return logs +} - gracefulCtx, gracefulCancel := context.WithCancel(context.Background()) +type agentLoggerLifecycle struct { + logger agentsdk.AgentLogWriter + timer *quartz.Timer +} - // connect to Agent v2.0 API, since we don't need features added later. - // This maximizes compatibility. - arpc, err := client.ConnectRPC20(gracefulCtx) - if err != nil { - logger.Error(ctx, "drpc connect", slog.Error(err)) - gracefulCancel() +func (lq *logQueuer) work(ctx context.Context) { + for { + select { + case <-ctx.Done(): return - } - go func() { - err := ls.SendLoop(gracefulCtx, arpc) - // if the send loop exits on its own without the context - // canceling, timeout the logger and force it to recreate. - if err != nil && ctx.Err() == nil { - l.loggerTimeout(log.agentToken) + case log := <-lq.q: + if log.delete { + lq.deleteLogger(log.token) + continue } - }() - - closeTimer := l.clock.AfterFunc(l.loggerTTL, func() { - logger.Info(ctx, "logger timeout firing") - l.loggerTimeout(log.agentToken) - }) - lifecycle := agentLoggerLifecycle{ - scriptLogger: sl, - close: func() { - // We could be stopping for reasons other than the timeout. If - // so, stop the timer. - closeTimer.Stop() - defer gracefulCancel() - timeout := l.clock.AfterFunc(5*time.Second, gracefulCancel) - defer timeout.Stop() - logger.Info(ctx, "logger closing") - - if err := sl.Flush(gracefulCtx); err != nil { - // ctx err - logger.Warn(gracefulCtx, "timeout reached while flushing") - return - } - - if err := ls.WaitUntilEmpty(gracefulCtx); err != nil { - // ctx err - logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty") - } - _ = arpc.DRPCConn().Close() - client.SDK.HTTPClient.CloseIdleConnections() - }, + lq.logCache.append(log.token, log.log) + lq.ensureLogger(ctx, log.token) } - lifecycle.closeTimer = closeTimer - l.loggers[log.agentToken] = lifecycle - lgr = lifecycle } - - lgr.resetCloseTimer(l.loggerTTL) - _ = lgr.scriptLogger.Send(ctx, queuedLogs...) - l.logCache.delete(log.agentToken) } -func (l *logQueuer) processDelete(log agentLog) { - l.mu.Lock() - lgr, ok := l.loggers[log.agentToken] - if ok { - delete(l.loggers, log.agentToken) - - } - l.mu.Unlock() +func (lq *logQueuer) ensureLogger(ctx context.Context, token string) { + lq.mu.Lock() + defer lq.mu.Unlock() + lifecycle, ok := lq.loggers[token] if ok { - // close this async, no one else will have a handle to it since we've - // deleted from the map - go lgr.close() + lifecycle.timer.Reset(lq.loggerTTL) + return } -} -func (l *logQueuer) loggerTimeout(agentToken string) { - l.q <- agentLog{ - op: opDelete, - agentToken: agentToken, - } -} + client := codersdk.New(lq.coderURL) + client.SetSessionToken(token) -type agentLoggerLifecycle struct { - scriptLogger agentsdk.ScriptLogger + logger := client.AgentLogWriter(ctx, uuid.New()) - closeTimer *quartz.Timer - close func() -} + timer := lq.clock.AfterFunc(lq.loggerTTL, func() { + lq.deleteLogger(token) + }) -func (l *agentLoggerLifecycle) resetCloseTimer(ttl time.Duration) { - if !l.closeTimer.Reset(ttl) { - // If the timer had already fired and we made it active again, stop the - // timer. We don't want it to run twice. - l.closeTimer.Stop() + lq.loggers[token] = agentLoggerLifecycle{ + logger: logger, + timer: timer, } -} -func newColor(value ...color.Attribute) *color.Color { - c := color.New(value...) - c.EnableColor() - return c -} + go func() { + defer func() { + err := logger.Close() + if err != nil { + lq.logger.Error(ctx, "close agent logger", slog.Error(err)) + } + }() -type logCache struct { - logs map[string][]agentsdk.Log + for { + logs := lq.logCache.flush(token) + if len(logs) == 0 { + time.Sleep(time.Second) + continue + } + + err := logger.Write(ctx, logs...) + if err != nil { + lq.logger.Error(ctx, "write agent logs", slog.Error(err)) + return + } + } + }() } -func (l *logCache) push(log agentLog) []agentsdk.Log { - logs, ok := l.logs[log.agentToken] +func (lq *logQueuer) deleteLogger(token string) { + lq.mu.Lock() + defer lq.mu.Unlock() + + lifecycle, ok := lq.loggers[token] if !ok { - logs = make([]agentsdk.Log, 0, 1) + return } - logs = append(logs, log.log) - l.logs[log.agentToken] = logs - return logs + + lifecycle.timer.Stop() + delete(lq.loggers, token) } -func (l *logCache) delete(token string) { - delete(l.logs, token) +func newColor(attrs ...color.Attribute) *color.Color { + c := color.New(attrs...) + c.EnableColor() + return c } diff --git a/logger_test.go b/logger_test.go index 51d99f5..dc73d78 100644 --- a/logger_test.go +++ b/logger_test.go @@ -459,3 +459,52 @@ func (f *fakeAgentAPI) PostLogSource(w http.ResponseWriter, r *http.Request) { fmt.Println("failed to encode:", err.Error()) } } + +func TestParseNamespaces(t *testing.T) { + tests := []struct { + name string + input string + expected []string + }{ + { + name: "empty string", + input: "", + expected: []string{}, + }, + { + name: "single namespace", + input: "default", + expected: []string{"default"}, + }, + { + name: "multiple namespaces", + input: "ns1,ns2,ns3", + expected: []string{"ns1", "ns2", "ns3"}, + }, + { + name: "namespaces with spaces", + input: "ns1, ns2 , ns3", + expected: []string{"ns1", "ns2", "ns3"}, + }, + { + name: "namespaces with empty values", + input: "ns1,,ns2,", + expected: []string{"ns1", "ns2"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := parseNamespaces(tt.input) + if len(result) != len(tt.expected) { + t.Errorf("parseNamespaces(%q) returned %d namespaces, expected %d", tt.input, len(result), len(tt.expected)) + return + } + for i, ns := range result { + if ns != tt.expected[i] { + t.Errorf("parseNamespaces(%q)[%d] = %q, expected %q", tt.input, i, ns, tt.expected[i]) + } + } + }) + } +} diff --git a/main.go b/main.go index 3d48cb9..d8c886a 100644 --- a/main.go +++ b/main.go @@ -27,7 +27,7 @@ func root() *cobra.Command { coderURL string fieldSelector string kubeConfig string - namespace string + namespaces string labelSelector string ) cmd := &cobra.Command{ @@ -66,7 +66,7 @@ func root() *cobra.Command { reporter, err := newPodEventLogger(cmd.Context(), podEventLoggerOptions{ coderURL: parsedURL, client: client, - namespace: namespace, + namespaces: namespaces, fieldSelector: fieldSelector, labelSelector: labelSelector, logger: slog.Make(sloghuman.Sink(cmd.ErrOrStderr())).Leveled(slog.LevelDebug), @@ -85,7 +85,7 @@ func root() *cobra.Command { } cmd.Flags().StringVarP(&coderURL, "coder-url", "u", os.Getenv("CODER_URL"), "URL of the Coder instance") cmd.Flags().StringVarP(&kubeConfig, "kubeconfig", "k", "~/.kube/config", "Path to the kubeconfig file") - cmd.Flags().StringVarP(&namespace, "namespace", "n", os.Getenv("CODER_NAMESPACE"), "Namespace to use when listing pods") + cmd.Flags().StringVarP(&namespaces, "namespaces", "n", os.Getenv("CODER_NAMESPACE"), "Comma-separated list of namespaces to watch for pods. If empty, watches all namespaces.") cmd.Flags().StringVarP(&fieldSelector, "field-selector", "f", "", "Field selector to use when listing pods") cmd.Flags().StringVarP(&labelSelector, "label-selector", "l", "", "Label selector to use when listing pods")