Skip to content

revert: "fix: refactor coder logger to allow flush without deadlock (#375)" #376

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 2 additions & 14 deletions cmd/envbuilder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,6 @@ func envbuilderCmd() serpent.Command {
Options: o.CLI(),
Handler: func(inv *serpent.Invocation) error {
o.SetDefaults()
var preExecs []func()
preExec := func() {
for _, fn := range preExecs {
fn()
}
preExecs = nil
}
defer preExec() // Ensure cleanup in case of error.

o.Logger = log.New(os.Stderr, o.Verbose)
if o.CoderAgentURL != "" {
if o.CoderAgentToken == "" {
Expand All @@ -58,10 +49,7 @@ func envbuilderCmd() serpent.Command {
coderLog, closeLogs, err := log.Coder(inv.Context(), u, o.CoderAgentToken)
if err == nil {
o.Logger = log.Wrap(o.Logger, coderLog)
preExecs = append(preExecs, func() {
o.Logger(log.LevelInfo, "Closing logs")
closeLogs()
})
defer closeLogs()
// This adds the envbuilder subsystem.
// If telemetry is enabled in a Coder deployment,
// this will be reported and help us understand
Expand Down Expand Up @@ -90,7 +78,7 @@ func envbuilderCmd() serpent.Command {
return nil
}

err := envbuilder.Run(inv.Context(), o, preExec)
err := envbuilder.Run(inv.Context(), o)
if err != nil {
o.Logger(log.LevelError, "error: %s", err)
}
Expand Down
7 changes: 1 addition & 6 deletions envbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@ type execArgsInfo struct {
// Logger is the logf to use for all operations.
// Filesystem is the filesystem to use for all operations.
// Defaults to the host filesystem.
// preExec are any functions that should be called before exec'ing the init
// command. This is useful for ensuring that defers get run.
func Run(ctx context.Context, opts options.Options, preExec ...func()) error {
func Run(ctx context.Context, opts options.Options) error {
var args execArgsInfo
// Run in a separate function to ensure all defers run before we
// setuid or exec.
Expand All @@ -105,9 +103,6 @@ func Run(ctx context.Context, opts options.Options, preExec ...func()) error {
}

opts.Logger(log.LevelInfo, "=== Running the init command %s %+v as the %q user...", opts.InitCommand, args.InitArgs, args.UserInfo.user.Username)
for _, fn := range preExec {
fn()
}

err = syscall.Exec(args.InitCommand, append([]string{args.InitCommand}, args.InitArgs...), args.Environ)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ require (
github.com/gliderlabs/ssh v0.3.7
github.com/go-git/go-billy/v5 v5.5.0
github.com/go-git/go-git/v5 v5.12.0
github.com/google/go-cmp v0.6.0
github.com/google/go-containerregistry v0.20.1
github.com/google/uuid v1.6.0
github.com/hashicorp/go-multierror v1.1.1
Expand Down Expand Up @@ -150,6 +149,7 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/nftables v0.2.0 // indirect
github.com/google/pprof v0.0.0-20230817174616-7a8ec2ada47b // indirect
github.com/gorilla/handlers v1.5.1 // indirect
Expand Down
67 changes: 0 additions & 67 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"testing"
"time"

"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/envbuilder"
"github.com/coder/envbuilder/devcontainer/features"
"github.com/coder/envbuilder/internal/magicdir"
Expand Down Expand Up @@ -60,71 +58,6 @@ const (
testImageUbuntu = "localhost:5000/envbuilder-test-ubuntu:latest"
)

func TestLogs(t *testing.T) {
t.Parallel()

token := uuid.NewString()
logsDone := make(chan struct{})

logHandler := func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/api/v2/buildinfo":
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"version": "v2.8.9"}`))
return
case "/api/v2/workspaceagents/me/logs":
w.WriteHeader(http.StatusOK)
tokHdr := r.Header.Get(codersdk.SessionTokenHeader)
assert.Equal(t, token, tokHdr)
var req agentsdk.PatchLogs
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
for _, log := range req.Logs {
t.Logf("got log: %+v", log)
if strings.Contains(log.Output, "Closing logs") {
close(logsDone)
return
}
}
return
default:
t.Errorf("unexpected request to %s", r.URL.Path)
w.WriteHeader(http.StatusNotFound)
return
}
}
logSrv := httptest.NewServer(http.HandlerFunc(logHandler))
defer logSrv.Close()

// Ensures that a Git repository with a devcontainer.json is cloned and built.
srv := gittest.CreateGitServer(t, gittest.Options{
Files: map[string]string{
"devcontainer.json": `{
"build": {
"dockerfile": "Dockerfile"
},
}`,
"Dockerfile": fmt.Sprintf(`FROM %s`, testImageUbuntu),
},
})
_, err := runEnvbuilder(t, runOpts{env: []string{
envbuilderEnv("GIT_URL", srv.URL),
"CODER_AGENT_URL=" + logSrv.URL,
"CODER_AGENT_TOKEN=" + token,
}})
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
select {
case <-ctx.Done():
t.Fatal("timed out waiting for logs")
case <-logsDone:
}
}

func TestInitScriptInitCommand(t *testing.T) {
t.Parallel()

Expand Down
102 changes: 45 additions & 57 deletions log/coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"net/url"
"os"
"sync"
"time"

"cdr.dev/slog"
Expand All @@ -28,14 +27,13 @@ var (
minAgentAPIV2 = "v2.9"
)

// Coder establishes a connection to the Coder instance located at coderURL and
// authenticates using token. It then establishes a dRPC connection to the Agent
// API and begins sending logs. If the version of Coder does not support the
// Agent API, it will fall back to using the PatchLogs endpoint. The closer is
// used to close the logger and to wait at most logSendGracePeriod for logs to
// be sent. Cancelling the context will close the logs immediately without
// waiting for logs to be sent.
func Coder(ctx context.Context, coderURL *url.URL, token string) (logger Func, closer func(), err error) {
// Coder establishes a connection to the Coder instance located at
// coderURL and authenticates using token. It then establishes a
// dRPC connection to the Agent API and begins sending logs.
// If the version of Coder does not support the Agent API, it will
// fall back to using the PatchLogs endpoint.
// The returned function is used to block until all logs are sent.
func Coder(ctx context.Context, coderURL *url.URL, token string) (Func, func(), error) {
// To troubleshoot issues, we need some way of logging.
metaLogger := slog.Make(sloghuman.Sink(os.Stderr))
defer metaLogger.Sync()
Expand All @@ -46,26 +44,18 @@ func Coder(ctx context.Context, coderURL *url.URL, token string) (logger Func, c
}
if semver.Compare(semver.MajorMinor(bi.Version), minAgentAPIV2) < 0 {
metaLogger.Warn(ctx, "Detected Coder version incompatible with AgentAPI v2, falling back to deprecated API", slog.F("coder_version", bi.Version))
logger, closer = sendLogsV1(ctx, client, metaLogger.Named("send_logs_v1"))
return logger, closer, nil
sendLogs, flushLogs := sendLogsV1(ctx, client, metaLogger.Named("send_logs_v1"))
return sendLogs, flushLogs, nil
}
// Note that ctx passed to initRPC will be inherited by the
// underlying connection, nothing we can do about that here.
dac, err := initRPC(ctx, client, metaLogger.Named("init_rpc"))
if err != nil {
// Logged externally
return nil, nil, fmt.Errorf("init coder rpc client: %w", err)
}
ls := agentsdk.NewLogSender(metaLogger.Named("coder_log_sender"))
metaLogger.Warn(ctx, "Sending logs via AgentAPI v2", slog.F("coder_version", bi.Version))
logger, closer = sendLogsV2(ctx, dac, ls, metaLogger.Named("send_logs_v2"))
var closeOnce sync.Once
return logger, func() {
closer()
closeOnce.Do(func() {
_ = dac.DRPCConn().Close()
})
}, nil
sendLogs, doneFunc := sendLogsV2(ctx, dac, ls, metaLogger.Named("send_logs_v2"))
return sendLogs, doneFunc, nil
}

type coderLogSender interface {
Expand All @@ -84,7 +74,7 @@ func initClient(coderURL *url.URL, token string) *agentsdk.Client {
func initRPC(ctx context.Context, client *agentsdk.Client, l slog.Logger) (proto.DRPCAgentClient20, error) {
var c proto.DRPCAgentClient20
var err error
retryCtx, retryCancel := context.WithTimeout(ctx, rpcConnectTimeout)
retryCtx, retryCancel := context.WithTimeout(context.Background(), rpcConnectTimeout)
defer retryCancel()
attempts := 0
for r := retry.New(100*time.Millisecond, time.Second); r.Wait(retryCtx); {
Expand All @@ -105,67 +95,65 @@ func initRPC(ctx context.Context, client *agentsdk.Client, l slog.Logger) (proto

// sendLogsV1 uses the PatchLogs endpoint to send logs.
// This is deprecated, but required for backward compatibility with older versions of Coder.
func sendLogsV1(ctx context.Context, client *agentsdk.Client, l slog.Logger) (logger Func, closer func()) {
func sendLogsV1(ctx context.Context, client *agentsdk.Client, l slog.Logger) (Func, func()) {
// nolint: staticcheck // required for backwards compatibility
sendLog, flushAndClose := agentsdk.LogsSender(agentsdk.ExternalLogSourceID, client.PatchLogs, slog.Logger{})
var mu sync.Mutex
sendLogs, flushLogs := agentsdk.LogsSender(agentsdk.ExternalLogSourceID, client.PatchLogs, slog.Logger{})
return func(lvl Level, msg string, args ...any) {
log := agentsdk.Log{
CreatedAt: time.Now(),
Output: fmt.Sprintf(msg, args...),
Level: codersdk.LogLevel(lvl),
}
mu.Lock()
defer mu.Unlock()
if err := sendLog(ctx, log); err != nil {
if err := sendLogs(ctx, log); err != nil {
l.Warn(ctx, "failed to send logs to Coder", slog.Error(err))
}
}, func() {
ctx, cancel := context.WithTimeout(ctx, logSendGracePeriod)
defer cancel()
if err := flushAndClose(ctx); err != nil {
if err := flushLogs(ctx); err != nil {
l.Warn(ctx, "failed to flush logs", slog.Error(err))
}
}
}

// sendLogsV2 uses the v2 agent API to send logs. Only compatibile with coder versions >= 2.9.
func sendLogsV2(ctx context.Context, dest agentsdk.LogDest, ls coderLogSender, l slog.Logger) (logger Func, closer func()) {
sendCtx, sendCancel := context.WithCancel(ctx)
func sendLogsV2(ctx context.Context, dest agentsdk.LogDest, ls coderLogSender, l slog.Logger) (Func, func()) {
done := make(chan struct{})
uid := uuid.New()
go func() {
defer close(done)
if err := ls.SendLoop(sendCtx, dest); err != nil {
if err := ls.SendLoop(ctx, dest); err != nil {
if !errors.Is(err, context.Canceled) {
l.Warn(ctx, "failed to send logs to Coder", slog.Error(err))
}
}

// Wait for up to 10 seconds for logs to finish sending.
sendCtx, sendCancel := context.WithTimeout(context.Background(), logSendGracePeriod)
defer sendCancel()
// Try once more to send any pending logs
if err := ls.SendLoop(sendCtx, dest); err != nil {
if !errors.Is(err, context.DeadlineExceeded) {
l.Warn(ctx, "failed to send remaining logs to Coder", slog.Error(err))
}
}
ls.Flush(uid)
if err := ls.WaitUntilEmpty(sendCtx); err != nil {
if !errors.Is(err, context.DeadlineExceeded) {
l.Warn(ctx, "log sender did not empty", slog.Error(err))
}
}
}()

var closeOnce sync.Once
return func(l Level, msg string, args ...any) {
ls.Enqueue(uid, agentsdk.Log{
CreatedAt: time.Now(),
Output: fmt.Sprintf(msg, args...),
Level: codersdk.LogLevel(l),
})
}, func() {
closeOnce.Do(func() {
// Trigger a flush and wait for logs to be sent.
ls.Flush(uid)
ctx, cancel := context.WithTimeout(ctx, logSendGracePeriod)
defer cancel()
err := ls.WaitUntilEmpty(ctx)
if err != nil {
l.Warn(ctx, "log sender did not empty", slog.Error(err))
}
logFunc := func(l Level, msg string, args ...any) {
ls.Enqueue(uid, agentsdk.Log{
CreatedAt: time.Now(),
Output: fmt.Sprintf(msg, args...),
Level: codersdk.LogLevel(l),
})
}

// Stop the send loop.
sendCancel()
})
doneFunc := func() {
<-done
}

// Wait for the send loop to finish.
<-done
}
return logFunc, doneFunc
}
Loading