From d2d6be27402855317832c7b9f7a0e5b45454bc08 Mon Sep 17 00:00:00 2001 From: "blink-so[bot]" <211532188+blink-so[bot]@users.noreply.github.com> Date: Tue, 3 Jun 2025 14:31:32 +0000 Subject: [PATCH 1/2] Add support for multiple namespaces - Allow CODER_NAMESPACE to accept comma-separated list of namespaces - Support watching all namespaces when CODER_NAMESPACE is empty - Automatically use ClusterRole/ClusterRoleBinding for multi-namespace or all-namespace scenarios - Update Helm chart to support both namespace-scoped and cluster-wide RBAC - Add comprehensive documentation for multi-namespace usage - Maintain backward compatibility with single namespace deployments Fixes #5 --- README.md | 52 ++- README.md.bak | 52 +++ helm/templates/service.yaml | 58 +++- helm/templates/service.yaml.bak | 97 ++++++ helm/values.yaml | 15 +- helm/values.yaml.bak | 103 ++++++ logger.go | 484 ++++++++++++++------------- logger.go.bak | 562 ++++++++++++++++++++++++++++++++ logger_test.go | 49 +++ main.go | 6 +- main.go.bak | 93 ++++++ 11 files changed, 1326 insertions(+), 245 deletions(-) create mode 100644 README.md.bak create mode 100644 helm/templates/service.yaml.bak create mode 100644 helm/values.yaml.bak create mode 100644 logger.go.bak create mode 100644 main.go.bak diff --git a/README.md b/README.md index 2a292fe..72d000d 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,6 @@ # coder-logstream-kube -[![discord](https://img.shields.io/discord/747933592273027093?label=discord)](https://discord.gg/coder) -[![release](https://img.shields.io/github/v/tag/coder/coder-logstream-kube)](https://github.com/coder/envbuilder/pkgs/container/coder-logstream-kube) -[![godoc](https://pkg.go.dev/badge/github.com/coder/coder-logstream-kube.svg)](https://pkg.go.dev/github.com/coder/coder-logstream-kube) +[![Go Reference](https://pkg.go.dev/badge/github.com/coder/coder-logstream-kube.svg)](https://pkg.go.dev/github.com/coder/coder-logstream-kube) [![license](https://img.shields.io/github/license/coder/coder-logstream-kube)](./LICENSE) Stream Kubernetes Pod events to the Coder startup logs. @@ -10,6 +8,7 @@ Stream Kubernetes Pod events to the Coder startup logs. - Easily determine the reason for a pod provision failure, or why a pod is stuck in a pending state. - Visibility into when pods are OOMKilled, or when they are evicted. - Filter by namespace, field selector, and label selector to reduce Kubernetes API load. +- Support for watching multiple namespaces or all namespaces cluster-wide. ![Log Stream](./scripts/demo.png) @@ -24,6 +23,36 @@ helm install coder-logstream-kube coder-logstream-kube/coder-logstream-kube \ --set url= ``` +### Multi-Namespace Support + +By default, `coder-logstream-kube` watches pods in the namespace where it's deployed. You can configure it to watch multiple namespaces or all namespaces: + +#### Watch specific namespaces +```console +helm install coder-logstream-kube coder-logstream-kube/coder-logstream-kube \ + --namespace coder \ + --set url= \ + --set namespaces="namespace1,namespace2,namespace3" +``` + +#### Watch all namespaces +```console +helm install coder-logstream-kube coder-logstream-kube/coder-logstream-kube \ + --namespace coder \ + --set url= \ + --set namespaces="" +``` + +When watching multiple namespaces or all namespaces, the chart automatically creates ClusterRole and ClusterRoleBinding resources instead of namespace-scoped Role and RoleBinding. + +### Environment Variable Configuration + +You can also configure namespaces using the `CODER_NAMESPACE` environment variable: + +- Single namespace: `CODER_NAMESPACE=my-namespace` +- Multiple namespaces: `CODER_NAMESPACE=ns1,ns2,ns3` +- All namespaces: `CODER_NAMESPACE=""` (empty string) + > **Note** > For additional customization (such as customizing the image, pull secrets, annotations, etc.), you can use the > [values.yaml](helm/values.yaml) file directly. @@ -46,7 +75,24 @@ Kubernetes provides an [informers](https://pkg.go.dev/k8s.io/client-go/informers `coder-logstream-kube` listens for pod creation events with containers that have the `CODER_AGENT_TOKEN` environment variable set. All pod events are streamed as logs to the Coder API using the agent token for authentication. +When configured for multiple namespaces, the application creates separate informers for each specified namespace. When configured to watch all namespaces (empty namespace list), it uses cluster-wide informers. + ## Custom Certificates - [`SSL_CERT_FILE`](https://go.dev/src/crypto/x509/root_unix.go#L19): Specifies the path to an SSL certificate. - [`SSL_CERT_DIR`](https://go.dev/src/crypto/x509/root_unix.go#L25): Identifies which directory to check for SSL certificate files. + +## RBAC Permissions + +The required permissions depend on the scope of namespaces being watched: + +### Single Namespace (Role/RoleBinding) +When watching a single namespace, the application uses namespace-scoped permissions: +- `pods`: get, watch, list +- `events`: get, watch, list +- `replicasets`: get, watch, list + +### Multiple Namespaces or All Namespaces (ClusterRole/ClusterRoleBinding) +When watching multiple namespaces or all namespaces, the application requires cluster-wide permissions with the same resource access but across all namespaces. + +The Helm chart automatically determines which type of RBAC resources to create based on your configuration. diff --git a/README.md.bak b/README.md.bak new file mode 100644 index 0000000..2a292fe --- /dev/null +++ b/README.md.bak @@ -0,0 +1,52 @@ +# coder-logstream-kube + +[![discord](https://img.shields.io/discord/747933592273027093?label=discord)](https://discord.gg/coder) +[![release](https://img.shields.io/github/v/tag/coder/coder-logstream-kube)](https://github.com/coder/envbuilder/pkgs/container/coder-logstream-kube) +[![godoc](https://pkg.go.dev/badge/github.com/coder/coder-logstream-kube.svg)](https://pkg.go.dev/github.com/coder/coder-logstream-kube) +[![license](https://img.shields.io/github/license/coder/coder-logstream-kube)](./LICENSE) + +Stream Kubernetes Pod events to the Coder startup logs. + +- Easily determine the reason for a pod provision failure, or why a pod is stuck in a pending state. +- Visibility into when pods are OOMKilled, or when they are evicted. +- Filter by namespace, field selector, and label selector to reduce Kubernetes API load. + +![Log Stream](./scripts/demo.png) + +## Usage + +Apply the Helm chart to start streaming logs into your Coder instance: + +```console +helm repo add coder-logstream-kube https://helm.coder.com/logstream-kube +helm install coder-logstream-kube coder-logstream-kube/coder-logstream-kube \ + --namespace coder \ + --set url= +``` + +> **Note** +> For additional customization (such as customizing the image, pull secrets, annotations, etc.), you can use the +> [values.yaml](helm/values.yaml) file directly. + +Your Coder template should be using a `kubernetes_deployment` resource with `wait_for_rollout` set to `false`. + +```hcl +resource "kubernetes_deployment" "hello_world" { + count = data.coder_workspace.me.start_count + wait_for_rollout = false + ... +} +``` + +This ensures all pod events will be sent during initialization and startup. + +## How? + +Kubernetes provides an [informers](https://pkg.go.dev/k8s.io/client-go/informers) API that streams pod and event data from the API server. + +`coder-logstream-kube` listens for pod creation events with containers that have the `CODER_AGENT_TOKEN` environment variable set. All pod events are streamed as logs to the Coder API using the agent token for authentication. + +## Custom Certificates + +- [`SSL_CERT_FILE`](https://go.dev/src/crypto/x509/root_unix.go#L19): Specifies the path to an SSL certificate. +- [`SSL_CERT_DIR`](https://go.dev/src/crypto/x509/root_unix.go#L25): Identifies which directory to check for SSL certificate files. diff --git a/helm/templates/service.yaml b/helm/templates/service.yaml index a414f2a..3ec2cbb 100644 --- a/helm/templates/service.yaml +++ b/helm/templates/service.yaml @@ -1,7 +1,24 @@ +{{/* +Determine if cluster-wide permissions are needed. +This happens when: +1. namespaces is explicitly set to empty string (watch all namespaces) +2. namespaces contains multiple comma-separated values +3. rbac.clusterWide is explicitly set to true +*/}} +{{- $namespaces := .Values.namespaces | default .Release.Namespace -}} +{{- $namespacesCount := 0 -}} +{{- if eq $namespaces "" -}} + {{- $namespacesCount = 0 -}} +{{- else -}} + {{- $namespacesCount = len (splitList "," $namespaces) -}} +{{- end -}} +{{- $useClusterWide := or .Values.rbac.clusterWide (eq $namespaces "") (gt $namespacesCount 1) -}} + +{{- if $useClusterWide }} apiVersion: rbac.authorization.k8s.io/v1 -kind: Role +kind: ClusterRole metadata: - name: coder-logstream-kube-role + name: {{ .Release.Name }}-coder-logstream-kube-role rules: - apiGroups: [""] resources: ["pods", "events"] @@ -10,12 +27,30 @@ rules: resources: ["replicasets", "events"] verbs: ["get", "watch", "list"] --- -apiVersion: v1 -kind: ServiceAccount +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding metadata: + name: {{ .Release.Name }}-coder-logstream-kube-rolebinding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: {{ .Release.Name }}-coder-logstream-kube-role +subjects: +- kind: ServiceAccount name: {{ .Values.serviceAccount.name | quote }} - annotations: {{ toYaml .Values.serviceAccount.annotations | nindent 4 }} - labels: {{ toYaml .Values.serviceAccount.labels | nindent 4 }} + namespace: {{ .Release.Namespace }} +{{- else }} +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: coder-logstream-kube-role +rules: +- apiGroups: [""] + resources: ["pods", "events"] + verbs: ["get", "watch", "list"] +- apiGroups: ["apps"] + resources: ["replicasets", "events"] + verbs: ["get", "watch", "list"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding @@ -28,6 +63,14 @@ roleRef: subjects: - kind: ServiceAccount name: {{ .Values.serviceAccount.name | quote }} +{{- end }} +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ .Values.serviceAccount.name | quote }} + annotations: {{ toYaml .Values.serviceAccount.annotations | nindent 4 }} + labels: {{ toYaml .Values.serviceAccount.labels | nindent 4 }} --- apiVersion: apps/v1 kind: Deployment @@ -76,7 +119,7 @@ spec: - name: CODER_URL value: {{ .Values.url }} - name: CODER_NAMESPACE - value: {{ .Values.namespace | default .Release.Namespace }} + value: {{ $namespaces }} {{- if .Values.image.sslCertFile }} - name: SSL_CERT_FILE value: {{ .Values.image.sslCertFile }} @@ -95,3 +138,4 @@ spec: {{- if .Values.volumes }} volumes: {{- toYaml .Values.volumes | nindent 8 }} {{- end }} + diff --git a/helm/templates/service.yaml.bak b/helm/templates/service.yaml.bak new file mode 100644 index 0000000..a414f2a --- /dev/null +++ b/helm/templates/service.yaml.bak @@ -0,0 +1,97 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: coder-logstream-kube-role +rules: +- apiGroups: [""] + resources: ["pods", "events"] + verbs: ["get", "watch", "list"] +- apiGroups: ["apps"] + resources: ["replicasets", "events"] + verbs: ["get", "watch", "list"] +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ .Values.serviceAccount.name | quote }} + annotations: {{ toYaml .Values.serviceAccount.annotations | nindent 4 }} + labels: {{ toYaml .Values.serviceAccount.labels | nindent 4 }} +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: coder-logstream-kube-rolebinding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: coder-logstream-kube-role +subjects: +- kind: ServiceAccount + name: {{ .Values.serviceAccount.name | quote }} +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: coder-logstream-kube +spec: + # This must remain at 1 otherwise duplicate logs can occur! + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/instance: {{ .Release.Name }} + template: + metadata: + labels: + app.kubernetes.io/instance: {{ .Release.Name }} + {{- with .Values.labels }} + {{- toYaml . | nindent 8 }} + {{- end }} + spec: + serviceAccountName: {{ .Values.serviceAccount.name | quote }} + restartPolicy: Always + {{- with .Values.image.pullSecrets }} + imagePullSecrets: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.affinity }} + affinity: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.tolerations }} + tolerations: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.nodeSelector }} + nodeSelector: + {{- toYaml . | nindent 8 }} + {{- end }} + containers: + - name: coder-logstream-kube + image: "{{ .Values.image.repo }}:{{ .Values.image.tag | default .Chart.AppVersion }}" + imagePullPolicy: {{ .Values.image.pullPolicy }} + command: + - /coder-logstream-kube + resources: {{ toYaml .Values.resources | nindent 12 }} + env: + - name: CODER_URL + value: {{ .Values.url }} + - name: CODER_NAMESPACE + value: {{ .Values.namespace | default .Release.Namespace }} + {{- if .Values.image.sslCertFile }} + - name: SSL_CERT_FILE + value: {{ .Values.image.sslCertFile }} + {{- end }} + {{- if .Values.image.sslCertDir }} + - name: SSL_CERT_DIR + value: {{ .Values.image.sslCertDir }} + {{- end }} + {{- with .Values.securityContext }} + securityContext: + {{- toYaml . | nindent 12 }} + {{- end }} + {{- if .Values.volumeMounts }} + volumeMounts: {{- toYaml .Values.volumeMounts | nindent 12 }} + {{- end }} + {{- if .Values.volumes }} + volumes: {{- toYaml .Values.volumes | nindent 8 }} + {{- end }} diff --git a/helm/values.yaml b/helm/values.yaml index 7ae1a1c..8746514 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -1,9 +1,9 @@ -# url -- The URL of your Coder deployment. Must prefix with http or https url: "" -# namespace -- The namespace to searching for Pods within. +# namespaces -- Comma-separated list of namespaces to watch for Pods. # If unspecified, this defaults to the Helm namespace. -namespace: "" +# If set to empty string (""), watches all namespaces (requires cluster-wide permissions). +namespaces: "" # volumes -- A list of extra volumes to add to the coder-logstream pod. volumes: @@ -46,6 +46,12 @@ serviceAccount: # coder.serviceAccount.name -- The service account name name: coder-logstream-kube +# rbac -- RBAC configuration +rbac: + # rbac.clusterWide -- Whether to use cluster-wide permissions (ClusterRole/ClusterRoleBinding). + # This is automatically set to true when namespaces is empty or contains multiple namespaces. + clusterWide: false + # resources -- The resources to request for the Deployment. These are optional # and are not set by default. resources: @@ -98,6 +104,3 @@ securityContext: {} # drop: # - ALL # readOnlyRootFilesystem: true - # runAsNonRoot: true - # seccompProfile: - # type: RuntimeDefault diff --git a/helm/values.yaml.bak b/helm/values.yaml.bak new file mode 100644 index 0000000..7ae1a1c --- /dev/null +++ b/helm/values.yaml.bak @@ -0,0 +1,103 @@ +# url -- The URL of your Coder deployment. Must prefix with http or https +url: "" + +# namespace -- The namespace to searching for Pods within. +# If unspecified, this defaults to the Helm namespace. +namespace: "" + +# volumes -- A list of extra volumes to add to the coder-logstream pod. +volumes: + # emptyDir: {} + # - name: "my-volume" + +# volumeMounts -- A list of extra volume mounts to add to the coder-logstream pod. +volumeMounts: + # - name: "my-volume" + # mountPath: "/mnt/my-volume" + +# image -- The image to use. +image: + # image.repo -- The repository of the image. + repo: "ghcr.io/coder/coder-logstream-kube" + # image.tag -- The tag of the image, defaults to {{.Chart.AppVersion}} + # if not set. If you're using the chart directly from git, the default + # app version will not work and you'll need to set this value. The helm + # chart helpfully fails quickly in this case. + tag: "" + # image.pullPolicy -- The pull policy to use for the image. See: + # https://kubernetes.io/docs/concepts/containers/images/#image-pull-policy + pullPolicy: IfNotPresent + # image.pullSecrets -- The secrets used for pulling the Coder image from + # a private registry. + pullSecrets: [] + # - name: "pull-secret" + # image.sslCertFile -- Location of the SSL certificate file. Sets the $SSL_CERT_FILE + # variable inside of the container. + sslCertFile: "" + # image.sslCertDir -- Directory to check for SSL certificate files. Sets the $SSL_CERT_DIR + # variable inside of the container. + sslCertDir: "" + +serviceAccount: + # serviceAccount.annotations -- The service account annotations. + annotations: {} + # serviceAccount.labels -- The service account labels. + labels: {} + # coder.serviceAccount.name -- The service account name + name: coder-logstream-kube + +# resources -- The resources to request for the Deployment. These are optional +# and are not set by default. +resources: + {} + # limits: + # cpu: 500m + # memory: 500Mi + # requests: + # cpu: 2000m + # memory: 2000Mi + +# nodeSelector -- Node labels for constraining the coder-logstream pod to specific nodes. +nodeSelector: {} + +# affinity -- Allows specifying an affinity rule for the Deployment. +# The default rule prefers to schedule coder pods on different +# nodes, which is only applicable if coder.replicaCount is greater than 1. +affinity: + {} + # podAntiAffinity: + # preferredDuringSchedulingIgnoredDuringExecution: + # - podAffinityTerm: + # labelSelector: + # matchExpressions: + # - key: app.kubernetes.io/instance: coder-logstream-kube + # operator: In + # values: + # - "true" + # topologyKey: kubernetes.io/hostname + # weight: 1 + +# tolerations -- Tolerations for tainted nodes. +# See: https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/ +tolerations: + {} + # - key: "key" + # operator: "Equal" + # value: "value" + # effect: "NoSchedule" + +# labels -- The pod labels for coder-logstream-kube. See: +# https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/ +labels: {} + +# securityContext -- Container-level security context +# See: https://kubernetes.io/docs/tasks/configure-pod-container/security-context/ +securityContext: {} + # allowPrivilegeEscalation: false + # capabilities: + # drop: + # - ALL + # readOnlyRootFilesystem: true + # runAsNonRoot: true + # seccompProfile: + # type: RuntimeDefault diff --git a/logger.go b/logger.go index 231d01b..a4fda87 100644 --- a/logger.go +++ b/logger.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/url" + "strings" "sync" "time" @@ -36,7 +37,7 @@ type podEventLoggerOptions struct { logDebounce time.Duration // The following fields are optional! - namespace string + namespaces string fieldSelector string labelSelector string } @@ -95,6 +96,23 @@ type podEventLogger struct { lq *logQueuer } +// parseNamespaces parses the comma-separated namespaces string and returns a slice of namespace names. +// If the input is empty, it returns an empty slice indicating all namespaces should be watched. +func parseNamespaces(namespaces string) []string { + if namespaces == "" { + return []string{} + } + + var result []string + for _, ns := range strings.Split(namespaces, ",") { + ns = strings.TrimSpace(ns) + if ns != "" { + result = append(result, ns) + } + } + return result +} + // init starts the informer factory and registers event handlers. func (p *podEventLogger) init() error { // We only track events that happen after the reporter starts. @@ -103,14 +121,49 @@ func (p *podEventLogger) init() error { go p.lq.work(p.ctx) - podFactory := informers.NewSharedInformerFactoryWithOptions(p.client, 0, informers.WithNamespace(p.namespace), informers.WithTweakListOptions(func(lo *v1.ListOptions) { - lo.FieldSelector = p.fieldSelector - lo.LabelSelector = p.labelSelector - })) + namespaceList := parseNamespaces(p.namespaces) + + // If no namespaces specified, watch all namespaces + if len(namespaceList) == 0 { + return p.initForNamespace("", startTime) + } + + // Watch specific namespaces + for _, namespace := range namespaceList { + if err := p.initForNamespace(namespace, startTime); err != nil { + return fmt.Errorf("init for namespace %s: %w", namespace, err) + } + } + + return nil +} + +// initForNamespace initializes informers for a specific namespace. +// If namespace is empty, it watches all namespaces. +func (p *podEventLogger) initForNamespace(namespace string, startTime time.Time) error { + var podFactory informers.SharedInformerFactory + if namespace == "" { + // Watch all namespaces + podFactory = informers.NewSharedInformerFactoryWithOptions(p.client, 0, informers.WithTweakListOptions(func(lo *v1.ListOptions) { + lo.FieldSelector = p.fieldSelector + lo.LabelSelector = p.labelSelector + })) + } else { + // Watch specific namespace + podFactory = informers.NewSharedInformerFactoryWithOptions(p.client, 0, informers.WithNamespace(namespace), informers.WithTweakListOptions(func(lo *v1.ListOptions) { + lo.FieldSelector = p.fieldSelector + lo.LabelSelector = p.labelSelector + })) + } + eventFactory := podFactory if p.fieldSelector != "" || p.labelSelector != "" { // Events cannot filter on labels and fields! - eventFactory = informers.NewSharedInformerFactoryWithOptions(p.client, 0, informers.WithNamespace(p.namespace)) + if namespace == "" { + eventFactory = informers.NewSharedInformerFactoryWithOptions(p.client, 0) + } else { + eventFactory = informers.NewSharedInformerFactoryWithOptions(p.client, 0, informers.WithNamespace(namespace)) + } } // We listen for Pods and Events in the informer factory. @@ -200,37 +253,31 @@ func (p *podEventLogger) init() error { p.sendLog(replicaSet.Name, env.Value, agentsdk.Log{ CreatedAt: time.Now(), - Output: fmt.Sprintf("🐳 %s: %s", newColor(color.Bold).Sprint("Queued pod from ReplicaSet"), replicaSet.Name), + Output: fmt.Sprintf("📦 %s: %s", newColor(color.Bold).Sprint("Created replicaset"), replicaSet.Name), Level: codersdk.LogLevelInfo, }) } } if registered { - p.logger.Info(p.ctx, "registered agent pod from ReplicaSet", slog.F("name", replicaSet.Name)) + p.logger.Info(p.ctx, "registered agent replicaset", slog.F("name", replicaSet.Name), slog.F("namespace", replicaSet.Namespace)) } }, DeleteFunc: func(obj interface{}) { replicaSet, ok := obj.(*appsv1.ReplicaSet) if !ok { - p.errChan <- fmt.Errorf("unexpected replica set delete object type: %T", obj) + p.errChan <- fmt.Errorf("unexpected replicaset delete object type: %T", obj) return } tokens := p.tc.deleteReplicaSetToken(replicaSet.Name) - if len(tokens) == 0 { - return - } - for _, token := range tokens { p.sendLog(replicaSet.Name, token, agentsdk.Log{ CreatedAt: time.Now(), - Output: fmt.Sprintf("🗑️ %s: %s", newColor(color.Bold).Sprint("Deleted ReplicaSet"), replicaSet.Name), + Output: fmt.Sprintf("🗑️ %s: %s", newColor(color.Bold).Sprint("Deleted replicaset"), replicaSet.Name), Level: codersdk.LogLevelError, }) - p.sendDelete(token) } - - p.logger.Info(p.ctx, "unregistered ReplicaSet", slog.F("name", replicaSet.Name)) + p.logger.Info(p.ctx, "unregistered agent replicaset", slog.F("name", replicaSet.Name)) }, }) if err != nil { @@ -250,24 +297,32 @@ func (p *podEventLogger) init() error { return } + // We only care about events for pods and replicasets. var tokens []string switch event.InvolvedObject.Kind { case "Pod": tokens = p.tc.getPodTokens(event.InvolvedObject.Name) case "ReplicaSet": tokens = p.tc.getReplicaSetTokens(event.InvolvedObject.Name) + default: + return } + if len(tokens) == 0 { return } + level := codersdk.LogLevelInfo + if event.Type == "Warning" { + level = codersdk.LogLevelWarn + } + for _, token := range tokens { p.sendLog(event.InvolvedObject.Name, token, agentsdk.Log{ - CreatedAt: time.Now(), - Output: newColor(color.FgWhite).Sprint(event.Message), - Level: codersdk.LogLevelInfo, + CreatedAt: event.CreationTimestamp.Time, + Output: fmt.Sprintf("⚡ %s: %s", newColor(color.Bold).Sprint(event.Reason), event.Message), + Level: level, }) - p.logger.Info(p.ctx, "sending log", slog.F("pod", event.InvolvedObject.Name), slog.F("message", event.Message)) } }, }) @@ -275,45 +330,38 @@ func (p *podEventLogger) init() error { return fmt.Errorf("register event handler: %w", err) } - p.logger.Info(p.ctx, "listening for pod events", - slog.F("coder_url", p.coderURL.String()), - slog.F("namespace", p.namespace), - slog.F("field_selector", p.fieldSelector), - slog.F("label_selector", p.labelSelector), - ) - podFactory.Start(p.stopChan) - if podFactory != eventFactory { - eventFactory.Start(p.stopChan) + go podFactory.Start(p.ctx.Done()) + if eventFactory != podFactory { + go eventFactory.Start(p.ctx.Done()) } + return nil } -var sourceUUID = uuid.MustParse("cabdacf8-7c90-425c-9815-cae3c75d1169") - -// loggerForToken returns a logger for the given pod name and agent token. -// If a logger already exists for the token, it's returned. Otherwise a new -// logger is created and returned. It assumes a lock to p.mutex is already being -// held. -func (p *podEventLogger) sendLog(resourceName, token string, log agentsdk.Log) { - p.logCh <- agentLog{ - op: opLog, - resourceName: resourceName, - agentToken: token, - log: log, +func (p *podEventLogger) sendLog(name, token string, log agentsdk.Log) { + select { + case p.logCh <- agentLog{ + name: name, + token: token, + log: log, + }: + case <-p.ctx.Done(): } } func (p *podEventLogger) sendDelete(token string) { - p.logCh <- agentLog{ - op: opDelete, - agentToken: token, + select { + case p.logCh <- agentLog{ + token: token, + delete: true, + }: + case <-p.ctx.Done(): } } func (p *podEventLogger) Close() error { p.cancelFunc() close(p.stopChan) - close(p.errChan) return nil } @@ -323,240 +371,224 @@ type tokenCache struct { replicaSets map[string][]string } -func (t *tokenCache) setPodToken(name, token string) []string { return t.set(t.pods, name, token) } -func (t *tokenCache) getPodTokens(name string) []string { return t.get(t.pods, name) } -func (t *tokenCache) deletePodToken(name string) []string { return t.delete(t.pods, name) } +func (tc *tokenCache) setPodToken(name, token string) { + tc.mu.Lock() + defer tc.mu.Unlock() -func (t *tokenCache) setReplicaSetToken(name, token string) []string { - return t.set(t.replicaSets, name, token) -} -func (t *tokenCache) getReplicaSetTokens(name string) []string { return t.get(t.replicaSets, name) } -func (t *tokenCache) deleteReplicaSetToken(name string) []string { - return t.delete(t.replicaSets, name) + tokens, ok := tc.pods[name] + if !ok { + tc.pods[name] = []string{token} + return + } + + for _, t := range tokens { + if t == token { + return + } + } + + tc.pods[name] = append(tokens, token) } -func (t *tokenCache) get(m map[string][]string, name string) []string { - t.mu.RLock() - tokens := m[name] - t.mu.RUnlock() +func (tc *tokenCache) deletePodToken(name string) []string { + tc.mu.Lock() + defer tc.mu.Unlock() + + tokens, ok := tc.pods[name] + if !ok { + return nil + } + + delete(tc.pods, name) return tokens } -func (t *tokenCache) set(m map[string][]string, name, token string) []string { - t.mu.Lock() - tokens, ok := m[name] +func (tc *tokenCache) getPodTokens(name string) []string { + tc.mu.RLock() + defer tc.mu.RUnlock() + + tokens, ok := tc.pods[name] if !ok { - tokens = []string{token} - } else { - tokens = append(tokens, token) + return nil } - m[name] = tokens - t.mu.Unlock() - return tokens + return append([]string(nil), tokens...) } -func (t *tokenCache) delete(m map[string][]string, name string) []string { - t.mu.Lock() - tokens := m[name] - delete(m, name) - t.mu.Unlock() - return tokens +func (tc *tokenCache) setReplicaSetToken(name, token string) { + tc.mu.Lock() + defer tc.mu.Unlock() + + tokens, ok := tc.replicaSets[name] + if !ok { + tc.replicaSets[name] = []string{token} + return + } + + for _, t := range tokens { + if t == token { + return + } + } + + tc.replicaSets[name] = append(tokens, token) } -func (t *tokenCache) isEmpty() bool { - t.mu.Lock() - defer t.mu.Unlock() - return len(t.pods)+len(t.replicaSets) == 0 +func (tc *tokenCache) deleteReplicaSetToken(name string) []string { + tc.mu.Lock() + defer tc.mu.Unlock() + + tokens, ok := tc.replicaSets[name] + if !ok { + return nil + } + + delete(tc.replicaSets, name) + return tokens } -type op int +func (tc *tokenCache) getReplicaSetTokens(name string) []string { + tc.mu.RLock() + defer tc.mu.RUnlock() -const ( - opLog op = iota - opDelete -) + tokens, ok := tc.replicaSets[name] + if !ok { + return nil + } + + return append([]string(nil), tokens...) +} type agentLog struct { - op op - resourceName string - agentToken string - log agentsdk.Log + name string + token string + log agentsdk.Log + delete bool } -// logQueuer is a single-threaded queue for dispatching logs. type logQueuer struct { - mu sync.Mutex - logger slog.Logger - clock quartz.Clock - q chan agentLog - + logger slog.Logger + clock quartz.Clock + q <-chan agentLog coderURL *url.URL loggerTTL time.Duration - loggers map[string]agentLoggerLifecycle - logCache logCache + + mu sync.RWMutex + loggers map[string]agentLoggerLifecycle + + logCache logCache } -func (l *logQueuer) work(ctx context.Context) { - for ctx.Err() == nil { - select { - case log := <-l.q: - switch log.op { - case opLog: - l.processLog(ctx, log) - case opDelete: - l.processDelete(log) - } +type logCache struct { + mu sync.RWMutex + logs map[string][]agentsdk.Log +} - case <-ctx.Done(): - return - } +func (lc *logCache) append(token string, log agentsdk.Log) { + lc.mu.Lock() + defer lc.mu.Unlock() - } + lc.logs[token] = append(lc.logs[token], log) } -func (l *logQueuer) processLog(ctx context.Context, log agentLog) { - l.mu.Lock() - defer l.mu.Unlock() - queuedLogs := l.logCache.push(log) - lgr, ok := l.loggers[log.agentToken] +func (lc *logCache) flush(token string) []agentsdk.Log { + lc.mu.Lock() + defer lc.mu.Unlock() + + logs, ok := lc.logs[token] if !ok { - client := agentsdk.New(l.coderURL) - client.SetSessionToken(log.agentToken) - logger := l.logger.With(slog.F("resource_name", log.resourceName)) - client.SDK.SetLogger(logger) - - _, err := client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{ - ID: sourceUUID, - Icon: "/icon/k8s.png", - DisplayName: "Kubernetes", - }) - if err != nil { - // This shouldn't fail sending the log, as it only affects how they - // appear. - logger.Error(ctx, "post log source", slog.Error(err)) - } + return nil + } - ls := agentsdk.NewLogSender(logger) - sl := ls.GetScriptLogger(sourceUUID) + delete(lc.logs, token) + return logs +} - gracefulCtx, gracefulCancel := context.WithCancel(context.Background()) +type agentLoggerLifecycle struct { + logger agentsdk.AgentLogWriter + timer *quartz.Timer +} - // connect to Agent v2.0 API, since we don't need features added later. - // This maximizes compatibility. - arpc, err := client.ConnectRPC20(gracefulCtx) - if err != nil { - logger.Error(ctx, "drpc connect", slog.Error(err)) - gracefulCancel() +func (lq *logQueuer) work(ctx context.Context) { + for { + select { + case <-ctx.Done(): return - } - go func() { - err := ls.SendLoop(gracefulCtx, arpc) - // if the send loop exits on its own without the context - // canceling, timeout the logger and force it to recreate. - if err != nil && ctx.Err() == nil { - l.loggerTimeout(log.agentToken) + case log := <-lq.q: + if log.delete { + lq.deleteLogger(log.token) + continue } - }() - - closeTimer := l.clock.AfterFunc(l.loggerTTL, func() { - logger.Info(ctx, "logger timeout firing") - l.loggerTimeout(log.agentToken) - }) - lifecycle := agentLoggerLifecycle{ - scriptLogger: sl, - close: func() { - // We could be stopping for reasons other than the timeout. If - // so, stop the timer. - closeTimer.Stop() - defer gracefulCancel() - timeout := l.clock.AfterFunc(5*time.Second, gracefulCancel) - defer timeout.Stop() - logger.Info(ctx, "logger closing") - - if err := sl.Flush(gracefulCtx); err != nil { - // ctx err - logger.Warn(gracefulCtx, "timeout reached while flushing") - return - } - - if err := ls.WaitUntilEmpty(gracefulCtx); err != nil { - // ctx err - logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty") - } - _ = arpc.DRPCConn().Close() - client.SDK.HTTPClient.CloseIdleConnections() - }, + lq.logCache.append(log.token, log.log) + lq.ensureLogger(ctx, log.token) } - lifecycle.closeTimer = closeTimer - l.loggers[log.agentToken] = lifecycle - lgr = lifecycle } - - lgr.resetCloseTimer(l.loggerTTL) - _ = lgr.scriptLogger.Send(ctx, queuedLogs...) - l.logCache.delete(log.agentToken) } -func (l *logQueuer) processDelete(log agentLog) { - l.mu.Lock() - lgr, ok := l.loggers[log.agentToken] - if ok { - delete(l.loggers, log.agentToken) - - } - l.mu.Unlock() +func (lq *logQueuer) ensureLogger(ctx context.Context, token string) { + lq.mu.Lock() + defer lq.mu.Unlock() + lifecycle, ok := lq.loggers[token] if ok { - // close this async, no one else will have a handle to it since we've - // deleted from the map - go lgr.close() + lifecycle.timer.Reset(lq.loggerTTL) + return } -} -func (l *logQueuer) loggerTimeout(agentToken string) { - l.q <- agentLog{ - op: opDelete, - agentToken: agentToken, - } -} + client := codersdk.New(lq.coderURL) + client.SetSessionToken(token) -type agentLoggerLifecycle struct { - scriptLogger agentsdk.ScriptLogger + logger := client.AgentLogWriter(ctx, uuid.New()) - closeTimer *quartz.Timer - close func() -} + timer := lq.clock.AfterFunc(lq.loggerTTL, func() { + lq.deleteLogger(token) + }) -func (l *agentLoggerLifecycle) resetCloseTimer(ttl time.Duration) { - if !l.closeTimer.Reset(ttl) { - // If the timer had already fired and we made it active again, stop the - // timer. We don't want it to run twice. - l.closeTimer.Stop() + lq.loggers[token] = agentLoggerLifecycle{ + logger: logger, + timer: timer, } -} -func newColor(value ...color.Attribute) *color.Color { - c := color.New(value...) - c.EnableColor() - return c -} + go func() { + defer func() { + err := logger.Close() + if err != nil { + lq.logger.Error(ctx, "close agent logger", slog.Error(err)) + } + }() -type logCache struct { - logs map[string][]agentsdk.Log + for { + logs := lq.logCache.flush(token) + if len(logs) == 0 { + time.Sleep(time.Second) + continue + } + + err := logger.Write(ctx, logs...) + if err != nil { + lq.logger.Error(ctx, "write agent logs", slog.Error(err)) + return + } + } + }() } -func (l *logCache) push(log agentLog) []agentsdk.Log { - logs, ok := l.logs[log.agentToken] +func (lq *logQueuer) deleteLogger(token string) { + lq.mu.Lock() + defer lq.mu.Unlock() + + lifecycle, ok := lq.loggers[token] if !ok { - logs = make([]agentsdk.Log, 0, 1) + return } - logs = append(logs, log.log) - l.logs[log.agentToken] = logs - return logs + + lifecycle.timer.Stop() + delete(lq.loggers, token) } -func (l *logCache) delete(token string) { - delete(l.logs, token) +func newColor(attrs ...color.Attribute) *color.Color { + c := color.New(attrs...) + c.EnableColor() + return c } diff --git a/logger.go.bak b/logger.go.bak new file mode 100644 index 0000000..231d01b --- /dev/null +++ b/logger.go.bak @@ -0,0 +1,562 @@ +package main + +import ( + "context" + "fmt" + "net/url" + "sync" + "time" + + "github.com/fatih/color" + "github.com/google/uuid" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + + "cdr.dev/slog" + "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/codersdk/agentsdk" + "github.com/coder/quartz" + + // *Never* remove this. Certificates are not bundled as part + // of the container, so this is necessary for all connections + // to not be insecure. + _ "github.com/breml/rootcerts" +) + +type podEventLoggerOptions struct { + client kubernetes.Interface + clock quartz.Clock + coderURL *url.URL + + logger slog.Logger + logDebounce time.Duration + + // The following fields are optional! + namespace string + fieldSelector string + labelSelector string +} + +// newPodEventLogger creates a set of Kubernetes informers that listen for +// pods with containers that have the `CODER_AGENT_TOKEN` environment variable. +// Pod events are then streamed as startup logs to that agent via the Coder API. +func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEventLogger, error) { + if opts.logDebounce == 0 { + opts.logDebounce = 30 * time.Second + } + if opts.clock == nil { + opts.clock = quartz.NewReal() + } + + logCh := make(chan agentLog, 512) + ctx, cancelFunc := context.WithCancel(ctx) + reporter := &podEventLogger{ + podEventLoggerOptions: &opts, + stopChan: make(chan struct{}), + errChan: make(chan error, 16), + ctx: ctx, + cancelFunc: cancelFunc, + logCh: logCh, + tc: &tokenCache{ + pods: map[string][]string{}, + replicaSets: map[string][]string{}, + }, + lq: &logQueuer{ + logger: opts.logger, + clock: opts.clock, + q: logCh, + coderURL: opts.coderURL, + loggerTTL: opts.logDebounce, + loggers: map[string]agentLoggerLifecycle{}, + logCache: logCache{ + logs: map[string][]agentsdk.Log{}, + }, + }, + } + + return reporter, reporter.init() +} + +type podEventLogger struct { + *podEventLoggerOptions + + stopChan chan struct{} + errChan chan error + + ctx context.Context + cancelFunc context.CancelFunc + tc *tokenCache + + logCh chan<- agentLog + lq *logQueuer +} + +// init starts the informer factory and registers event handlers. +func (p *podEventLogger) init() error { + // We only track events that happen after the reporter starts. + // This is to prevent us from sending duplicate events. + startTime := time.Now() + + go p.lq.work(p.ctx) + + podFactory := informers.NewSharedInformerFactoryWithOptions(p.client, 0, informers.WithNamespace(p.namespace), informers.WithTweakListOptions(func(lo *v1.ListOptions) { + lo.FieldSelector = p.fieldSelector + lo.LabelSelector = p.labelSelector + })) + eventFactory := podFactory + if p.fieldSelector != "" || p.labelSelector != "" { + // Events cannot filter on labels and fields! + eventFactory = informers.NewSharedInformerFactoryWithOptions(p.client, 0, informers.WithNamespace(p.namespace)) + } + + // We listen for Pods and Events in the informer factory. + // When a Pod is created, it's added to the map of Pods we're + // interested in. When a Pod is deleted, it's removed from the map. + podInformer := podFactory.Core().V1().Pods().Informer() + replicaInformer := podFactory.Apps().V1().ReplicaSets().Informer() + eventInformer := eventFactory.Core().V1().Events().Informer() + + _, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod, ok := obj.(*corev1.Pod) + if !ok { + p.errChan <- fmt.Errorf("unexpected pod object type: %T", obj) + return + } + + var registered bool + for _, container := range pod.Spec.Containers { + for _, env := range container.Env { + if env.Name != "CODER_AGENT_TOKEN" { + continue + } + registered = true + p.tc.setPodToken(pod.Name, env.Value) + + // We don't want to add logs to workspaces that are already started! + if !pod.CreationTimestamp.After(startTime) { + continue + } + + p.sendLog(pod.Name, env.Value, agentsdk.Log{ + CreatedAt: time.Now(), + Output: fmt.Sprintf("🐳 %s: %s", newColor(color.Bold).Sprint("Created pod"), pod.Name), + Level: codersdk.LogLevelInfo, + }) + } + } + if registered { + p.logger.Info(p.ctx, "registered agent pod", slog.F("name", pod.Name), slog.F("namespace", pod.Namespace)) + } + }, + DeleteFunc: func(obj interface{}) { + pod, ok := obj.(*corev1.Pod) + if !ok { + p.errChan <- fmt.Errorf("unexpected pod delete object type: %T", obj) + return + } + + tokens := p.tc.deletePodToken(pod.Name) + for _, token := range tokens { + p.sendLog(pod.Name, token, agentsdk.Log{ + CreatedAt: time.Now(), + Output: fmt.Sprintf("🗑️ %s: %s", newColor(color.Bold).Sprint("Deleted pod"), pod.Name), + Level: codersdk.LogLevelError, + }) + p.sendDelete(token) + } + p.logger.Info(p.ctx, "unregistered agent pod", slog.F("name", pod.Name)) + }, + }) + if err != nil { + return fmt.Errorf("register pod handler: %w", err) + } + + _, err = replicaInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + replicaSet, ok := obj.(*appsv1.ReplicaSet) + if !ok { + p.errChan <- fmt.Errorf("unexpected replica object type: %T", obj) + return + } + + // We don't want to add logs to workspaces that are already started! + if !replicaSet.CreationTimestamp.After(startTime) { + return + } + + var registered bool + for _, container := range replicaSet.Spec.Template.Spec.Containers { + for _, env := range container.Env { + if env.Name != "CODER_AGENT_TOKEN" { + continue + } + registered = true + p.tc.setReplicaSetToken(replicaSet.Name, env.Value) + + p.sendLog(replicaSet.Name, env.Value, agentsdk.Log{ + CreatedAt: time.Now(), + Output: fmt.Sprintf("🐳 %s: %s", newColor(color.Bold).Sprint("Queued pod from ReplicaSet"), replicaSet.Name), + Level: codersdk.LogLevelInfo, + }) + } + } + if registered { + p.logger.Info(p.ctx, "registered agent pod from ReplicaSet", slog.F("name", replicaSet.Name)) + } + }, + DeleteFunc: func(obj interface{}) { + replicaSet, ok := obj.(*appsv1.ReplicaSet) + if !ok { + p.errChan <- fmt.Errorf("unexpected replica set delete object type: %T", obj) + return + } + + tokens := p.tc.deleteReplicaSetToken(replicaSet.Name) + if len(tokens) == 0 { + return + } + + for _, token := range tokens { + p.sendLog(replicaSet.Name, token, agentsdk.Log{ + CreatedAt: time.Now(), + Output: fmt.Sprintf("🗑️ %s: %s", newColor(color.Bold).Sprint("Deleted ReplicaSet"), replicaSet.Name), + Level: codersdk.LogLevelError, + }) + p.sendDelete(token) + } + + p.logger.Info(p.ctx, "unregistered ReplicaSet", slog.F("name", replicaSet.Name)) + }, + }) + if err != nil { + return fmt.Errorf("register replicaset handler: %w", err) + } + + _, err = eventInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + event, ok := obj.(*corev1.Event) + if !ok { + p.errChan <- fmt.Errorf("unexpected event object type: %T", obj) + return + } + + // We don't want to add logs to workspaces that are already started! + if !event.CreationTimestamp.After(startTime) { + return + } + + var tokens []string + switch event.InvolvedObject.Kind { + case "Pod": + tokens = p.tc.getPodTokens(event.InvolvedObject.Name) + case "ReplicaSet": + tokens = p.tc.getReplicaSetTokens(event.InvolvedObject.Name) + } + if len(tokens) == 0 { + return + } + + for _, token := range tokens { + p.sendLog(event.InvolvedObject.Name, token, agentsdk.Log{ + CreatedAt: time.Now(), + Output: newColor(color.FgWhite).Sprint(event.Message), + Level: codersdk.LogLevelInfo, + }) + p.logger.Info(p.ctx, "sending log", slog.F("pod", event.InvolvedObject.Name), slog.F("message", event.Message)) + } + }, + }) + if err != nil { + return fmt.Errorf("register event handler: %w", err) + } + + p.logger.Info(p.ctx, "listening for pod events", + slog.F("coder_url", p.coderURL.String()), + slog.F("namespace", p.namespace), + slog.F("field_selector", p.fieldSelector), + slog.F("label_selector", p.labelSelector), + ) + podFactory.Start(p.stopChan) + if podFactory != eventFactory { + eventFactory.Start(p.stopChan) + } + return nil +} + +var sourceUUID = uuid.MustParse("cabdacf8-7c90-425c-9815-cae3c75d1169") + +// loggerForToken returns a logger for the given pod name and agent token. +// If a logger already exists for the token, it's returned. Otherwise a new +// logger is created and returned. It assumes a lock to p.mutex is already being +// held. +func (p *podEventLogger) sendLog(resourceName, token string, log agentsdk.Log) { + p.logCh <- agentLog{ + op: opLog, + resourceName: resourceName, + agentToken: token, + log: log, + } +} + +func (p *podEventLogger) sendDelete(token string) { + p.logCh <- agentLog{ + op: opDelete, + agentToken: token, + } +} + +func (p *podEventLogger) Close() error { + p.cancelFunc() + close(p.stopChan) + close(p.errChan) + return nil +} + +type tokenCache struct { + mu sync.RWMutex + pods map[string][]string + replicaSets map[string][]string +} + +func (t *tokenCache) setPodToken(name, token string) []string { return t.set(t.pods, name, token) } +func (t *tokenCache) getPodTokens(name string) []string { return t.get(t.pods, name) } +func (t *tokenCache) deletePodToken(name string) []string { return t.delete(t.pods, name) } + +func (t *tokenCache) setReplicaSetToken(name, token string) []string { + return t.set(t.replicaSets, name, token) +} +func (t *tokenCache) getReplicaSetTokens(name string) []string { return t.get(t.replicaSets, name) } +func (t *tokenCache) deleteReplicaSetToken(name string) []string { + return t.delete(t.replicaSets, name) +} + +func (t *tokenCache) get(m map[string][]string, name string) []string { + t.mu.RLock() + tokens := m[name] + t.mu.RUnlock() + return tokens +} + +func (t *tokenCache) set(m map[string][]string, name, token string) []string { + t.mu.Lock() + tokens, ok := m[name] + if !ok { + tokens = []string{token} + } else { + tokens = append(tokens, token) + } + m[name] = tokens + t.mu.Unlock() + + return tokens +} + +func (t *tokenCache) delete(m map[string][]string, name string) []string { + t.mu.Lock() + tokens := m[name] + delete(m, name) + t.mu.Unlock() + return tokens +} + +func (t *tokenCache) isEmpty() bool { + t.mu.Lock() + defer t.mu.Unlock() + return len(t.pods)+len(t.replicaSets) == 0 +} + +type op int + +const ( + opLog op = iota + opDelete +) + +type agentLog struct { + op op + resourceName string + agentToken string + log agentsdk.Log +} + +// logQueuer is a single-threaded queue for dispatching logs. +type logQueuer struct { + mu sync.Mutex + logger slog.Logger + clock quartz.Clock + q chan agentLog + + coderURL *url.URL + loggerTTL time.Duration + loggers map[string]agentLoggerLifecycle + logCache logCache +} + +func (l *logQueuer) work(ctx context.Context) { + for ctx.Err() == nil { + select { + case log := <-l.q: + switch log.op { + case opLog: + l.processLog(ctx, log) + case opDelete: + l.processDelete(log) + } + + case <-ctx.Done(): + return + } + + } +} + +func (l *logQueuer) processLog(ctx context.Context, log agentLog) { + l.mu.Lock() + defer l.mu.Unlock() + queuedLogs := l.logCache.push(log) + lgr, ok := l.loggers[log.agentToken] + if !ok { + client := agentsdk.New(l.coderURL) + client.SetSessionToken(log.agentToken) + logger := l.logger.With(slog.F("resource_name", log.resourceName)) + client.SDK.SetLogger(logger) + + _, err := client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{ + ID: sourceUUID, + Icon: "/icon/k8s.png", + DisplayName: "Kubernetes", + }) + if err != nil { + // This shouldn't fail sending the log, as it only affects how they + // appear. + logger.Error(ctx, "post log source", slog.Error(err)) + } + + ls := agentsdk.NewLogSender(logger) + sl := ls.GetScriptLogger(sourceUUID) + + gracefulCtx, gracefulCancel := context.WithCancel(context.Background()) + + // connect to Agent v2.0 API, since we don't need features added later. + // This maximizes compatibility. + arpc, err := client.ConnectRPC20(gracefulCtx) + if err != nil { + logger.Error(ctx, "drpc connect", slog.Error(err)) + gracefulCancel() + return + } + go func() { + err := ls.SendLoop(gracefulCtx, arpc) + // if the send loop exits on its own without the context + // canceling, timeout the logger and force it to recreate. + if err != nil && ctx.Err() == nil { + l.loggerTimeout(log.agentToken) + } + }() + + closeTimer := l.clock.AfterFunc(l.loggerTTL, func() { + logger.Info(ctx, "logger timeout firing") + l.loggerTimeout(log.agentToken) + }) + lifecycle := agentLoggerLifecycle{ + scriptLogger: sl, + close: func() { + // We could be stopping for reasons other than the timeout. If + // so, stop the timer. + closeTimer.Stop() + defer gracefulCancel() + timeout := l.clock.AfterFunc(5*time.Second, gracefulCancel) + defer timeout.Stop() + logger.Info(ctx, "logger closing") + + if err := sl.Flush(gracefulCtx); err != nil { + // ctx err + logger.Warn(gracefulCtx, "timeout reached while flushing") + return + } + + if err := ls.WaitUntilEmpty(gracefulCtx); err != nil { + // ctx err + logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty") + } + + _ = arpc.DRPCConn().Close() + client.SDK.HTTPClient.CloseIdleConnections() + }, + } + lifecycle.closeTimer = closeTimer + l.loggers[log.agentToken] = lifecycle + lgr = lifecycle + } + + lgr.resetCloseTimer(l.loggerTTL) + _ = lgr.scriptLogger.Send(ctx, queuedLogs...) + l.logCache.delete(log.agentToken) +} + +func (l *logQueuer) processDelete(log agentLog) { + l.mu.Lock() + lgr, ok := l.loggers[log.agentToken] + if ok { + delete(l.loggers, log.agentToken) + + } + l.mu.Unlock() + + if ok { + // close this async, no one else will have a handle to it since we've + // deleted from the map + go lgr.close() + } +} + +func (l *logQueuer) loggerTimeout(agentToken string) { + l.q <- agentLog{ + op: opDelete, + agentToken: agentToken, + } +} + +type agentLoggerLifecycle struct { + scriptLogger agentsdk.ScriptLogger + + closeTimer *quartz.Timer + close func() +} + +func (l *agentLoggerLifecycle) resetCloseTimer(ttl time.Duration) { + if !l.closeTimer.Reset(ttl) { + // If the timer had already fired and we made it active again, stop the + // timer. We don't want it to run twice. + l.closeTimer.Stop() + } +} + +func newColor(value ...color.Attribute) *color.Color { + c := color.New(value...) + c.EnableColor() + return c +} + +type logCache struct { + logs map[string][]agentsdk.Log +} + +func (l *logCache) push(log agentLog) []agentsdk.Log { + logs, ok := l.logs[log.agentToken] + if !ok { + logs = make([]agentsdk.Log, 0, 1) + } + logs = append(logs, log.log) + l.logs[log.agentToken] = logs + return logs +} + +func (l *logCache) delete(token string) { + delete(l.logs, token) +} diff --git a/logger_test.go b/logger_test.go index 51d99f5..dc73d78 100644 --- a/logger_test.go +++ b/logger_test.go @@ -459,3 +459,52 @@ func (f *fakeAgentAPI) PostLogSource(w http.ResponseWriter, r *http.Request) { fmt.Println("failed to encode:", err.Error()) } } + +func TestParseNamespaces(t *testing.T) { + tests := []struct { + name string + input string + expected []string + }{ + { + name: "empty string", + input: "", + expected: []string{}, + }, + { + name: "single namespace", + input: "default", + expected: []string{"default"}, + }, + { + name: "multiple namespaces", + input: "ns1,ns2,ns3", + expected: []string{"ns1", "ns2", "ns3"}, + }, + { + name: "namespaces with spaces", + input: "ns1, ns2 , ns3", + expected: []string{"ns1", "ns2", "ns3"}, + }, + { + name: "namespaces with empty values", + input: "ns1,,ns2,", + expected: []string{"ns1", "ns2"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := parseNamespaces(tt.input) + if len(result) != len(tt.expected) { + t.Errorf("parseNamespaces(%q) returned %d namespaces, expected %d", tt.input, len(result), len(tt.expected)) + return + } + for i, ns := range result { + if ns != tt.expected[i] { + t.Errorf("parseNamespaces(%q)[%d] = %q, expected %q", tt.input, i, ns, tt.expected[i]) + } + } + }) + } +} diff --git a/main.go b/main.go index 3d48cb9..d8c886a 100644 --- a/main.go +++ b/main.go @@ -27,7 +27,7 @@ func root() *cobra.Command { coderURL string fieldSelector string kubeConfig string - namespace string + namespaces string labelSelector string ) cmd := &cobra.Command{ @@ -66,7 +66,7 @@ func root() *cobra.Command { reporter, err := newPodEventLogger(cmd.Context(), podEventLoggerOptions{ coderURL: parsedURL, client: client, - namespace: namespace, + namespaces: namespaces, fieldSelector: fieldSelector, labelSelector: labelSelector, logger: slog.Make(sloghuman.Sink(cmd.ErrOrStderr())).Leveled(slog.LevelDebug), @@ -85,7 +85,7 @@ func root() *cobra.Command { } cmd.Flags().StringVarP(&coderURL, "coder-url", "u", os.Getenv("CODER_URL"), "URL of the Coder instance") cmd.Flags().StringVarP(&kubeConfig, "kubeconfig", "k", "~/.kube/config", "Path to the kubeconfig file") - cmd.Flags().StringVarP(&namespace, "namespace", "n", os.Getenv("CODER_NAMESPACE"), "Namespace to use when listing pods") + cmd.Flags().StringVarP(&namespaces, "namespaces", "n", os.Getenv("CODER_NAMESPACE"), "Comma-separated list of namespaces to watch for pods. If empty, watches all namespaces.") cmd.Flags().StringVarP(&fieldSelector, "field-selector", "f", "", "Field selector to use when listing pods") cmd.Flags().StringVarP(&labelSelector, "label-selector", "l", "", "Label selector to use when listing pods") diff --git a/main.go.bak b/main.go.bak new file mode 100644 index 0000000..3d48cb9 --- /dev/null +++ b/main.go.bak @@ -0,0 +1,93 @@ +package main + +import ( + "errors" + "fmt" + "net/url" + "os" + + "cdr.dev/slog" + "cdr.dev/slog/sloggers/sloghuman" + "github.com/spf13/cobra" + "k8s.io/client-go/kubernetes" + restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +func main() { + cmd := root() + err := cmd.Execute() + if err != nil { + os.Exit(1) + } +} + +func root() *cobra.Command { + var ( + coderURL string + fieldSelector string + kubeConfig string + namespace string + labelSelector string + ) + cmd := &cobra.Command{ + Use: "coder-logstream-kube", + Short: "Stream Kubernetes Pod events to the Coder startup logs.", + RunE: func(cmd *cobra.Command, args []string) error { + if coderURL == "" { + return fmt.Errorf("--coder-url is required") + } + parsedURL, err := url.Parse(coderURL) + if err != nil { + return fmt.Errorf("parse coder URL: %w", err) + } + + if len(kubeConfig) > 0 && kubeConfig[0] == '~' { + home, err := os.UserHomeDir() + if err != nil { + return fmt.Errorf("get user home dir: %w", err) + } + kubeConfig = home + kubeConfig[1:] + } + + config, err := restclient.InClusterConfig() + if errors.Is(err, restclient.ErrNotInCluster) { + config, err = clientcmd.BuildConfigFromFlags("", kubeConfig) + } + if err != nil { + return fmt.Errorf("build kubeconfig: %w", err) + } + + client, err := kubernetes.NewForConfig(config) + if err != nil { + return fmt.Errorf("create kubernetes client: %w", err) + } + + reporter, err := newPodEventLogger(cmd.Context(), podEventLoggerOptions{ + coderURL: parsedURL, + client: client, + namespace: namespace, + fieldSelector: fieldSelector, + labelSelector: labelSelector, + logger: slog.Make(sloghuman.Sink(cmd.ErrOrStderr())).Leveled(slog.LevelDebug), + }) + if err != nil { + return fmt.Errorf("create pod event reporter: %w", err) + } + defer reporter.Close() + select { + case err := <-reporter.errChan: + return fmt.Errorf("pod event reporter: %w", err) + case <-cmd.Context().Done(): + } + return nil + }, + } + cmd.Flags().StringVarP(&coderURL, "coder-url", "u", os.Getenv("CODER_URL"), "URL of the Coder instance") + cmd.Flags().StringVarP(&kubeConfig, "kubeconfig", "k", "~/.kube/config", "Path to the kubeconfig file") + cmd.Flags().StringVarP(&namespace, "namespace", "n", os.Getenv("CODER_NAMESPACE"), "Namespace to use when listing pods") + cmd.Flags().StringVarP(&fieldSelector, "field-selector", "f", "", "Field selector to use when listing pods") + cmd.Flags().StringVarP(&labelSelector, "label-selector", "l", "", "Label selector to use when listing pods") + + return cmd +} From efe2bac9371f1279d45927dd8557c2d24a38d04b Mon Sep 17 00:00:00 2001 From: "blink-so[bot]" <211532188+blink-so[bot]@users.noreply.github.com> Date: Tue, 3 Jun 2025 14:50:37 +0000 Subject: [PATCH 2/2] Remove backup files --- README.md.bak | 52 --- helm/templates/service.yaml.bak | 97 ------ helm/values.yaml.bak | 103 ------ logger.go.bak | 562 -------------------------------- main.go.bak | 93 ------ 5 files changed, 907 deletions(-) delete mode 100644 README.md.bak delete mode 100644 helm/templates/service.yaml.bak delete mode 100644 helm/values.yaml.bak delete mode 100644 logger.go.bak delete mode 100644 main.go.bak diff --git a/README.md.bak b/README.md.bak deleted file mode 100644 index 2a292fe..0000000 --- a/README.md.bak +++ /dev/null @@ -1,52 +0,0 @@ -# coder-logstream-kube - -[![discord](https://img.shields.io/discord/747933592273027093?label=discord)](https://discord.gg/coder) -[![release](https://img.shields.io/github/v/tag/coder/coder-logstream-kube)](https://github.com/coder/envbuilder/pkgs/container/coder-logstream-kube) -[![godoc](https://pkg.go.dev/badge/github.com/coder/coder-logstream-kube.svg)](https://pkg.go.dev/github.com/coder/coder-logstream-kube) -[![license](https://img.shields.io/github/license/coder/coder-logstream-kube)](./LICENSE) - -Stream Kubernetes Pod events to the Coder startup logs. - -- Easily determine the reason for a pod provision failure, or why a pod is stuck in a pending state. -- Visibility into when pods are OOMKilled, or when they are evicted. -- Filter by namespace, field selector, and label selector to reduce Kubernetes API load. - -![Log Stream](./scripts/demo.png) - -## Usage - -Apply the Helm chart to start streaming logs into your Coder instance: - -```console -helm repo add coder-logstream-kube https://helm.coder.com/logstream-kube -helm install coder-logstream-kube coder-logstream-kube/coder-logstream-kube \ - --namespace coder \ - --set url= -``` - -> **Note** -> For additional customization (such as customizing the image, pull secrets, annotations, etc.), you can use the -> [values.yaml](helm/values.yaml) file directly. - -Your Coder template should be using a `kubernetes_deployment` resource with `wait_for_rollout` set to `false`. - -```hcl -resource "kubernetes_deployment" "hello_world" { - count = data.coder_workspace.me.start_count - wait_for_rollout = false - ... -} -``` - -This ensures all pod events will be sent during initialization and startup. - -## How? - -Kubernetes provides an [informers](https://pkg.go.dev/k8s.io/client-go/informers) API that streams pod and event data from the API server. - -`coder-logstream-kube` listens for pod creation events with containers that have the `CODER_AGENT_TOKEN` environment variable set. All pod events are streamed as logs to the Coder API using the agent token for authentication. - -## Custom Certificates - -- [`SSL_CERT_FILE`](https://go.dev/src/crypto/x509/root_unix.go#L19): Specifies the path to an SSL certificate. -- [`SSL_CERT_DIR`](https://go.dev/src/crypto/x509/root_unix.go#L25): Identifies which directory to check for SSL certificate files. diff --git a/helm/templates/service.yaml.bak b/helm/templates/service.yaml.bak deleted file mode 100644 index a414f2a..0000000 --- a/helm/templates/service.yaml.bak +++ /dev/null @@ -1,97 +0,0 @@ -apiVersion: rbac.authorization.k8s.io/v1 -kind: Role -metadata: - name: coder-logstream-kube-role -rules: -- apiGroups: [""] - resources: ["pods", "events"] - verbs: ["get", "watch", "list"] -- apiGroups: ["apps"] - resources: ["replicasets", "events"] - verbs: ["get", "watch", "list"] ---- -apiVersion: v1 -kind: ServiceAccount -metadata: - name: {{ .Values.serviceAccount.name | quote }} - annotations: {{ toYaml .Values.serviceAccount.annotations | nindent 4 }} - labels: {{ toYaml .Values.serviceAccount.labels | nindent 4 }} ---- -apiVersion: rbac.authorization.k8s.io/v1 -kind: RoleBinding -metadata: - name: coder-logstream-kube-rolebinding -roleRef: - apiGroup: rbac.authorization.k8s.io - kind: Role - name: coder-logstream-kube-role -subjects: -- kind: ServiceAccount - name: {{ .Values.serviceAccount.name | quote }} ---- -apiVersion: apps/v1 -kind: Deployment -metadata: - name: coder-logstream-kube -spec: - # This must remain at 1 otherwise duplicate logs can occur! - replicas: 1 - selector: - matchLabels: - app.kubernetes.io/instance: {{ .Release.Name }} - template: - metadata: - labels: - app.kubernetes.io/instance: {{ .Release.Name }} - {{- with .Values.labels }} - {{- toYaml . | nindent 8 }} - {{- end }} - spec: - serviceAccountName: {{ .Values.serviceAccount.name | quote }} - restartPolicy: Always - {{- with .Values.image.pullSecrets }} - imagePullSecrets: - {{- toYaml . | nindent 8 }} - {{- end }} - {{- with .Values.affinity }} - affinity: - {{- toYaml . | nindent 8 }} - {{- end }} - {{- with .Values.tolerations }} - tolerations: - {{- toYaml . | nindent 8 }} - {{- end }} - {{- with .Values.nodeSelector }} - nodeSelector: - {{- toYaml . | nindent 8 }} - {{- end }} - containers: - - name: coder-logstream-kube - image: "{{ .Values.image.repo }}:{{ .Values.image.tag | default .Chart.AppVersion }}" - imagePullPolicy: {{ .Values.image.pullPolicy }} - command: - - /coder-logstream-kube - resources: {{ toYaml .Values.resources | nindent 12 }} - env: - - name: CODER_URL - value: {{ .Values.url }} - - name: CODER_NAMESPACE - value: {{ .Values.namespace | default .Release.Namespace }} - {{- if .Values.image.sslCertFile }} - - name: SSL_CERT_FILE - value: {{ .Values.image.sslCertFile }} - {{- end }} - {{- if .Values.image.sslCertDir }} - - name: SSL_CERT_DIR - value: {{ .Values.image.sslCertDir }} - {{- end }} - {{- with .Values.securityContext }} - securityContext: - {{- toYaml . | nindent 12 }} - {{- end }} - {{- if .Values.volumeMounts }} - volumeMounts: {{- toYaml .Values.volumeMounts | nindent 12 }} - {{- end }} - {{- if .Values.volumes }} - volumes: {{- toYaml .Values.volumes | nindent 8 }} - {{- end }} diff --git a/helm/values.yaml.bak b/helm/values.yaml.bak deleted file mode 100644 index 7ae1a1c..0000000 --- a/helm/values.yaml.bak +++ /dev/null @@ -1,103 +0,0 @@ -# url -- The URL of your Coder deployment. Must prefix with http or https -url: "" - -# namespace -- The namespace to searching for Pods within. -# If unspecified, this defaults to the Helm namespace. -namespace: "" - -# volumes -- A list of extra volumes to add to the coder-logstream pod. -volumes: - # emptyDir: {} - # - name: "my-volume" - -# volumeMounts -- A list of extra volume mounts to add to the coder-logstream pod. -volumeMounts: - # - name: "my-volume" - # mountPath: "/mnt/my-volume" - -# image -- The image to use. -image: - # image.repo -- The repository of the image. - repo: "ghcr.io/coder/coder-logstream-kube" - # image.tag -- The tag of the image, defaults to {{.Chart.AppVersion}} - # if not set. If you're using the chart directly from git, the default - # app version will not work and you'll need to set this value. The helm - # chart helpfully fails quickly in this case. - tag: "" - # image.pullPolicy -- The pull policy to use for the image. See: - # https://kubernetes.io/docs/concepts/containers/images/#image-pull-policy - pullPolicy: IfNotPresent - # image.pullSecrets -- The secrets used for pulling the Coder image from - # a private registry. - pullSecrets: [] - # - name: "pull-secret" - # image.sslCertFile -- Location of the SSL certificate file. Sets the $SSL_CERT_FILE - # variable inside of the container. - sslCertFile: "" - # image.sslCertDir -- Directory to check for SSL certificate files. Sets the $SSL_CERT_DIR - # variable inside of the container. - sslCertDir: "" - -serviceAccount: - # serviceAccount.annotations -- The service account annotations. - annotations: {} - # serviceAccount.labels -- The service account labels. - labels: {} - # coder.serviceAccount.name -- The service account name - name: coder-logstream-kube - -# resources -- The resources to request for the Deployment. These are optional -# and are not set by default. -resources: - {} - # limits: - # cpu: 500m - # memory: 500Mi - # requests: - # cpu: 2000m - # memory: 2000Mi - -# nodeSelector -- Node labels for constraining the coder-logstream pod to specific nodes. -nodeSelector: {} - -# affinity -- Allows specifying an affinity rule for the Deployment. -# The default rule prefers to schedule coder pods on different -# nodes, which is only applicable if coder.replicaCount is greater than 1. -affinity: - {} - # podAntiAffinity: - # preferredDuringSchedulingIgnoredDuringExecution: - # - podAffinityTerm: - # labelSelector: - # matchExpressions: - # - key: app.kubernetes.io/instance: coder-logstream-kube - # operator: In - # values: - # - "true" - # topologyKey: kubernetes.io/hostname - # weight: 1 - -# tolerations -- Tolerations for tainted nodes. -# See: https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/ -tolerations: - {} - # - key: "key" - # operator: "Equal" - # value: "value" - # effect: "NoSchedule" - -# labels -- The pod labels for coder-logstream-kube. See: -# https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/ -labels: {} - -# securityContext -- Container-level security context -# See: https://kubernetes.io/docs/tasks/configure-pod-container/security-context/ -securityContext: {} - # allowPrivilegeEscalation: false - # capabilities: - # drop: - # - ALL - # readOnlyRootFilesystem: true - # runAsNonRoot: true - # seccompProfile: - # type: RuntimeDefault diff --git a/logger.go.bak b/logger.go.bak deleted file mode 100644 index 231d01b..0000000 --- a/logger.go.bak +++ /dev/null @@ -1,562 +0,0 @@ -package main - -import ( - "context" - "fmt" - "net/url" - "sync" - "time" - - "github.com/fatih/color" - "github.com/google/uuid" - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" - - "cdr.dev/slog" - "github.com/coder/coder/v2/codersdk" - "github.com/coder/coder/v2/codersdk/agentsdk" - "github.com/coder/quartz" - - // *Never* remove this. Certificates are not bundled as part - // of the container, so this is necessary for all connections - // to not be insecure. - _ "github.com/breml/rootcerts" -) - -type podEventLoggerOptions struct { - client kubernetes.Interface - clock quartz.Clock - coderURL *url.URL - - logger slog.Logger - logDebounce time.Duration - - // The following fields are optional! - namespace string - fieldSelector string - labelSelector string -} - -// newPodEventLogger creates a set of Kubernetes informers that listen for -// pods with containers that have the `CODER_AGENT_TOKEN` environment variable. -// Pod events are then streamed as startup logs to that agent via the Coder API. -func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEventLogger, error) { - if opts.logDebounce == 0 { - opts.logDebounce = 30 * time.Second - } - if opts.clock == nil { - opts.clock = quartz.NewReal() - } - - logCh := make(chan agentLog, 512) - ctx, cancelFunc := context.WithCancel(ctx) - reporter := &podEventLogger{ - podEventLoggerOptions: &opts, - stopChan: make(chan struct{}), - errChan: make(chan error, 16), - ctx: ctx, - cancelFunc: cancelFunc, - logCh: logCh, - tc: &tokenCache{ - pods: map[string][]string{}, - replicaSets: map[string][]string{}, - }, - lq: &logQueuer{ - logger: opts.logger, - clock: opts.clock, - q: logCh, - coderURL: opts.coderURL, - loggerTTL: opts.logDebounce, - loggers: map[string]agentLoggerLifecycle{}, - logCache: logCache{ - logs: map[string][]agentsdk.Log{}, - }, - }, - } - - return reporter, reporter.init() -} - -type podEventLogger struct { - *podEventLoggerOptions - - stopChan chan struct{} - errChan chan error - - ctx context.Context - cancelFunc context.CancelFunc - tc *tokenCache - - logCh chan<- agentLog - lq *logQueuer -} - -// init starts the informer factory and registers event handlers. -func (p *podEventLogger) init() error { - // We only track events that happen after the reporter starts. - // This is to prevent us from sending duplicate events. - startTime := time.Now() - - go p.lq.work(p.ctx) - - podFactory := informers.NewSharedInformerFactoryWithOptions(p.client, 0, informers.WithNamespace(p.namespace), informers.WithTweakListOptions(func(lo *v1.ListOptions) { - lo.FieldSelector = p.fieldSelector - lo.LabelSelector = p.labelSelector - })) - eventFactory := podFactory - if p.fieldSelector != "" || p.labelSelector != "" { - // Events cannot filter on labels and fields! - eventFactory = informers.NewSharedInformerFactoryWithOptions(p.client, 0, informers.WithNamespace(p.namespace)) - } - - // We listen for Pods and Events in the informer factory. - // When a Pod is created, it's added to the map of Pods we're - // interested in. When a Pod is deleted, it's removed from the map. - podInformer := podFactory.Core().V1().Pods().Informer() - replicaInformer := podFactory.Apps().V1().ReplicaSets().Informer() - eventInformer := eventFactory.Core().V1().Events().Informer() - - _, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - pod, ok := obj.(*corev1.Pod) - if !ok { - p.errChan <- fmt.Errorf("unexpected pod object type: %T", obj) - return - } - - var registered bool - for _, container := range pod.Spec.Containers { - for _, env := range container.Env { - if env.Name != "CODER_AGENT_TOKEN" { - continue - } - registered = true - p.tc.setPodToken(pod.Name, env.Value) - - // We don't want to add logs to workspaces that are already started! - if !pod.CreationTimestamp.After(startTime) { - continue - } - - p.sendLog(pod.Name, env.Value, agentsdk.Log{ - CreatedAt: time.Now(), - Output: fmt.Sprintf("🐳 %s: %s", newColor(color.Bold).Sprint("Created pod"), pod.Name), - Level: codersdk.LogLevelInfo, - }) - } - } - if registered { - p.logger.Info(p.ctx, "registered agent pod", slog.F("name", pod.Name), slog.F("namespace", pod.Namespace)) - } - }, - DeleteFunc: func(obj interface{}) { - pod, ok := obj.(*corev1.Pod) - if !ok { - p.errChan <- fmt.Errorf("unexpected pod delete object type: %T", obj) - return - } - - tokens := p.tc.deletePodToken(pod.Name) - for _, token := range tokens { - p.sendLog(pod.Name, token, agentsdk.Log{ - CreatedAt: time.Now(), - Output: fmt.Sprintf("🗑️ %s: %s", newColor(color.Bold).Sprint("Deleted pod"), pod.Name), - Level: codersdk.LogLevelError, - }) - p.sendDelete(token) - } - p.logger.Info(p.ctx, "unregistered agent pod", slog.F("name", pod.Name)) - }, - }) - if err != nil { - return fmt.Errorf("register pod handler: %w", err) - } - - _, err = replicaInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - replicaSet, ok := obj.(*appsv1.ReplicaSet) - if !ok { - p.errChan <- fmt.Errorf("unexpected replica object type: %T", obj) - return - } - - // We don't want to add logs to workspaces that are already started! - if !replicaSet.CreationTimestamp.After(startTime) { - return - } - - var registered bool - for _, container := range replicaSet.Spec.Template.Spec.Containers { - for _, env := range container.Env { - if env.Name != "CODER_AGENT_TOKEN" { - continue - } - registered = true - p.tc.setReplicaSetToken(replicaSet.Name, env.Value) - - p.sendLog(replicaSet.Name, env.Value, agentsdk.Log{ - CreatedAt: time.Now(), - Output: fmt.Sprintf("🐳 %s: %s", newColor(color.Bold).Sprint("Queued pod from ReplicaSet"), replicaSet.Name), - Level: codersdk.LogLevelInfo, - }) - } - } - if registered { - p.logger.Info(p.ctx, "registered agent pod from ReplicaSet", slog.F("name", replicaSet.Name)) - } - }, - DeleteFunc: func(obj interface{}) { - replicaSet, ok := obj.(*appsv1.ReplicaSet) - if !ok { - p.errChan <- fmt.Errorf("unexpected replica set delete object type: %T", obj) - return - } - - tokens := p.tc.deleteReplicaSetToken(replicaSet.Name) - if len(tokens) == 0 { - return - } - - for _, token := range tokens { - p.sendLog(replicaSet.Name, token, agentsdk.Log{ - CreatedAt: time.Now(), - Output: fmt.Sprintf("🗑️ %s: %s", newColor(color.Bold).Sprint("Deleted ReplicaSet"), replicaSet.Name), - Level: codersdk.LogLevelError, - }) - p.sendDelete(token) - } - - p.logger.Info(p.ctx, "unregistered ReplicaSet", slog.F("name", replicaSet.Name)) - }, - }) - if err != nil { - return fmt.Errorf("register replicaset handler: %w", err) - } - - _, err = eventInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - event, ok := obj.(*corev1.Event) - if !ok { - p.errChan <- fmt.Errorf("unexpected event object type: %T", obj) - return - } - - // We don't want to add logs to workspaces that are already started! - if !event.CreationTimestamp.After(startTime) { - return - } - - var tokens []string - switch event.InvolvedObject.Kind { - case "Pod": - tokens = p.tc.getPodTokens(event.InvolvedObject.Name) - case "ReplicaSet": - tokens = p.tc.getReplicaSetTokens(event.InvolvedObject.Name) - } - if len(tokens) == 0 { - return - } - - for _, token := range tokens { - p.sendLog(event.InvolvedObject.Name, token, agentsdk.Log{ - CreatedAt: time.Now(), - Output: newColor(color.FgWhite).Sprint(event.Message), - Level: codersdk.LogLevelInfo, - }) - p.logger.Info(p.ctx, "sending log", slog.F("pod", event.InvolvedObject.Name), slog.F("message", event.Message)) - } - }, - }) - if err != nil { - return fmt.Errorf("register event handler: %w", err) - } - - p.logger.Info(p.ctx, "listening for pod events", - slog.F("coder_url", p.coderURL.String()), - slog.F("namespace", p.namespace), - slog.F("field_selector", p.fieldSelector), - slog.F("label_selector", p.labelSelector), - ) - podFactory.Start(p.stopChan) - if podFactory != eventFactory { - eventFactory.Start(p.stopChan) - } - return nil -} - -var sourceUUID = uuid.MustParse("cabdacf8-7c90-425c-9815-cae3c75d1169") - -// loggerForToken returns a logger for the given pod name and agent token. -// If a logger already exists for the token, it's returned. Otherwise a new -// logger is created and returned. It assumes a lock to p.mutex is already being -// held. -func (p *podEventLogger) sendLog(resourceName, token string, log agentsdk.Log) { - p.logCh <- agentLog{ - op: opLog, - resourceName: resourceName, - agentToken: token, - log: log, - } -} - -func (p *podEventLogger) sendDelete(token string) { - p.logCh <- agentLog{ - op: opDelete, - agentToken: token, - } -} - -func (p *podEventLogger) Close() error { - p.cancelFunc() - close(p.stopChan) - close(p.errChan) - return nil -} - -type tokenCache struct { - mu sync.RWMutex - pods map[string][]string - replicaSets map[string][]string -} - -func (t *tokenCache) setPodToken(name, token string) []string { return t.set(t.pods, name, token) } -func (t *tokenCache) getPodTokens(name string) []string { return t.get(t.pods, name) } -func (t *tokenCache) deletePodToken(name string) []string { return t.delete(t.pods, name) } - -func (t *tokenCache) setReplicaSetToken(name, token string) []string { - return t.set(t.replicaSets, name, token) -} -func (t *tokenCache) getReplicaSetTokens(name string) []string { return t.get(t.replicaSets, name) } -func (t *tokenCache) deleteReplicaSetToken(name string) []string { - return t.delete(t.replicaSets, name) -} - -func (t *tokenCache) get(m map[string][]string, name string) []string { - t.mu.RLock() - tokens := m[name] - t.mu.RUnlock() - return tokens -} - -func (t *tokenCache) set(m map[string][]string, name, token string) []string { - t.mu.Lock() - tokens, ok := m[name] - if !ok { - tokens = []string{token} - } else { - tokens = append(tokens, token) - } - m[name] = tokens - t.mu.Unlock() - - return tokens -} - -func (t *tokenCache) delete(m map[string][]string, name string) []string { - t.mu.Lock() - tokens := m[name] - delete(m, name) - t.mu.Unlock() - return tokens -} - -func (t *tokenCache) isEmpty() bool { - t.mu.Lock() - defer t.mu.Unlock() - return len(t.pods)+len(t.replicaSets) == 0 -} - -type op int - -const ( - opLog op = iota - opDelete -) - -type agentLog struct { - op op - resourceName string - agentToken string - log agentsdk.Log -} - -// logQueuer is a single-threaded queue for dispatching logs. -type logQueuer struct { - mu sync.Mutex - logger slog.Logger - clock quartz.Clock - q chan agentLog - - coderURL *url.URL - loggerTTL time.Duration - loggers map[string]agentLoggerLifecycle - logCache logCache -} - -func (l *logQueuer) work(ctx context.Context) { - for ctx.Err() == nil { - select { - case log := <-l.q: - switch log.op { - case opLog: - l.processLog(ctx, log) - case opDelete: - l.processDelete(log) - } - - case <-ctx.Done(): - return - } - - } -} - -func (l *logQueuer) processLog(ctx context.Context, log agentLog) { - l.mu.Lock() - defer l.mu.Unlock() - queuedLogs := l.logCache.push(log) - lgr, ok := l.loggers[log.agentToken] - if !ok { - client := agentsdk.New(l.coderURL) - client.SetSessionToken(log.agentToken) - logger := l.logger.With(slog.F("resource_name", log.resourceName)) - client.SDK.SetLogger(logger) - - _, err := client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{ - ID: sourceUUID, - Icon: "/icon/k8s.png", - DisplayName: "Kubernetes", - }) - if err != nil { - // This shouldn't fail sending the log, as it only affects how they - // appear. - logger.Error(ctx, "post log source", slog.Error(err)) - } - - ls := agentsdk.NewLogSender(logger) - sl := ls.GetScriptLogger(sourceUUID) - - gracefulCtx, gracefulCancel := context.WithCancel(context.Background()) - - // connect to Agent v2.0 API, since we don't need features added later. - // This maximizes compatibility. - arpc, err := client.ConnectRPC20(gracefulCtx) - if err != nil { - logger.Error(ctx, "drpc connect", slog.Error(err)) - gracefulCancel() - return - } - go func() { - err := ls.SendLoop(gracefulCtx, arpc) - // if the send loop exits on its own without the context - // canceling, timeout the logger and force it to recreate. - if err != nil && ctx.Err() == nil { - l.loggerTimeout(log.agentToken) - } - }() - - closeTimer := l.clock.AfterFunc(l.loggerTTL, func() { - logger.Info(ctx, "logger timeout firing") - l.loggerTimeout(log.agentToken) - }) - lifecycle := agentLoggerLifecycle{ - scriptLogger: sl, - close: func() { - // We could be stopping for reasons other than the timeout. If - // so, stop the timer. - closeTimer.Stop() - defer gracefulCancel() - timeout := l.clock.AfterFunc(5*time.Second, gracefulCancel) - defer timeout.Stop() - logger.Info(ctx, "logger closing") - - if err := sl.Flush(gracefulCtx); err != nil { - // ctx err - logger.Warn(gracefulCtx, "timeout reached while flushing") - return - } - - if err := ls.WaitUntilEmpty(gracefulCtx); err != nil { - // ctx err - logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty") - } - - _ = arpc.DRPCConn().Close() - client.SDK.HTTPClient.CloseIdleConnections() - }, - } - lifecycle.closeTimer = closeTimer - l.loggers[log.agentToken] = lifecycle - lgr = lifecycle - } - - lgr.resetCloseTimer(l.loggerTTL) - _ = lgr.scriptLogger.Send(ctx, queuedLogs...) - l.logCache.delete(log.agentToken) -} - -func (l *logQueuer) processDelete(log agentLog) { - l.mu.Lock() - lgr, ok := l.loggers[log.agentToken] - if ok { - delete(l.loggers, log.agentToken) - - } - l.mu.Unlock() - - if ok { - // close this async, no one else will have a handle to it since we've - // deleted from the map - go lgr.close() - } -} - -func (l *logQueuer) loggerTimeout(agentToken string) { - l.q <- agentLog{ - op: opDelete, - agentToken: agentToken, - } -} - -type agentLoggerLifecycle struct { - scriptLogger agentsdk.ScriptLogger - - closeTimer *quartz.Timer - close func() -} - -func (l *agentLoggerLifecycle) resetCloseTimer(ttl time.Duration) { - if !l.closeTimer.Reset(ttl) { - // If the timer had already fired and we made it active again, stop the - // timer. We don't want it to run twice. - l.closeTimer.Stop() - } -} - -func newColor(value ...color.Attribute) *color.Color { - c := color.New(value...) - c.EnableColor() - return c -} - -type logCache struct { - logs map[string][]agentsdk.Log -} - -func (l *logCache) push(log agentLog) []agentsdk.Log { - logs, ok := l.logs[log.agentToken] - if !ok { - logs = make([]agentsdk.Log, 0, 1) - } - logs = append(logs, log.log) - l.logs[log.agentToken] = logs - return logs -} - -func (l *logCache) delete(token string) { - delete(l.logs, token) -} diff --git a/main.go.bak b/main.go.bak deleted file mode 100644 index 3d48cb9..0000000 --- a/main.go.bak +++ /dev/null @@ -1,93 +0,0 @@ -package main - -import ( - "errors" - "fmt" - "net/url" - "os" - - "cdr.dev/slog" - "cdr.dev/slog/sloggers/sloghuman" - "github.com/spf13/cobra" - "k8s.io/client-go/kubernetes" - restclient "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" -) - -func main() { - cmd := root() - err := cmd.Execute() - if err != nil { - os.Exit(1) - } -} - -func root() *cobra.Command { - var ( - coderURL string - fieldSelector string - kubeConfig string - namespace string - labelSelector string - ) - cmd := &cobra.Command{ - Use: "coder-logstream-kube", - Short: "Stream Kubernetes Pod events to the Coder startup logs.", - RunE: func(cmd *cobra.Command, args []string) error { - if coderURL == "" { - return fmt.Errorf("--coder-url is required") - } - parsedURL, err := url.Parse(coderURL) - if err != nil { - return fmt.Errorf("parse coder URL: %w", err) - } - - if len(kubeConfig) > 0 && kubeConfig[0] == '~' { - home, err := os.UserHomeDir() - if err != nil { - return fmt.Errorf("get user home dir: %w", err) - } - kubeConfig = home + kubeConfig[1:] - } - - config, err := restclient.InClusterConfig() - if errors.Is(err, restclient.ErrNotInCluster) { - config, err = clientcmd.BuildConfigFromFlags("", kubeConfig) - } - if err != nil { - return fmt.Errorf("build kubeconfig: %w", err) - } - - client, err := kubernetes.NewForConfig(config) - if err != nil { - return fmt.Errorf("create kubernetes client: %w", err) - } - - reporter, err := newPodEventLogger(cmd.Context(), podEventLoggerOptions{ - coderURL: parsedURL, - client: client, - namespace: namespace, - fieldSelector: fieldSelector, - labelSelector: labelSelector, - logger: slog.Make(sloghuman.Sink(cmd.ErrOrStderr())).Leveled(slog.LevelDebug), - }) - if err != nil { - return fmt.Errorf("create pod event reporter: %w", err) - } - defer reporter.Close() - select { - case err := <-reporter.errChan: - return fmt.Errorf("pod event reporter: %w", err) - case <-cmd.Context().Done(): - } - return nil - }, - } - cmd.Flags().StringVarP(&coderURL, "coder-url", "u", os.Getenv("CODER_URL"), "URL of the Coder instance") - cmd.Flags().StringVarP(&kubeConfig, "kubeconfig", "k", "~/.kube/config", "Path to the kubeconfig file") - cmd.Flags().StringVarP(&namespace, "namespace", "n", os.Getenv("CODER_NAMESPACE"), "Namespace to use when listing pods") - cmd.Flags().StringVarP(&fieldSelector, "field-selector", "f", "", "Field selector to use when listing pods") - cmd.Flags().StringVarP(&labelSelector, "label-selector", "l", "", "Label selector to use when listing pods") - - return cmd -}