Skip to content
This repository was archived by the owner on Jun 2, 2023. It is now read-only.

Commit 5ec28b3

Browse files
committed
consumers count and experiments for logging
1 parent feea50d commit 5ec28b3

File tree

5 files changed

+51
-20
lines changed

5 files changed

+51
-20
lines changed

pkg/worker/analyze/analyzequeue/consumers/analyze_pr.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"github.com/golangci/golangci-api/pkg/worker/analytics"
1717
"github.com/golangci/golangci-api/pkg/worker/analyze/processors"
18+
"github.com/golangci/golangci-api/pkg/worker/lib/experiments"
1819
"github.com/golangci/golangci-api/pkg/worker/lib/github"
1920
)
2021

@@ -26,11 +27,12 @@ type AnalyzePR struct {
2627
log logutil.Log
2728
}
2829

29-
func NewAnalyzePR(pf processors.PullProcessorFactory, log logutil.Log, errTracker apperrors.Tracker, cfg config.Config) *AnalyzePR {
30+
func NewAnalyzePR(pf processors.PullProcessorFactory, log logutil.Log, errTracker apperrors.Tracker, cfg config.Config, ec *experiments.Checker) *AnalyzePR {
3031
return &AnalyzePR{
3132
baseConsumer: baseConsumer{
3233
eventName: analytics.EventPRChecked,
3334
cfg: cfg,
35+
ec: ec,
3436
},
3537
pf: pf,
3638
errTracker: errTracker,

pkg/worker/analyze/analyzequeue/consumers/analyze_pr_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"strings"
88
"testing"
99

10+
"github.com/golangci/golangci-api/pkg/worker/lib/experiments"
11+
1012
"github.com/golangci/golangci-api/internal/shared/apperrors"
1113

1214
"github.com/golangci/golangci-api/internal/shared/config"
@@ -41,8 +43,9 @@ func TestAnalyzeRepo(t *testing.T) {
4143
log := logutil.NewStderrLog("")
4244
cfg := config.NewEnvConfig(log)
4345
errTracker := apperrors.NewNopTracker()
46+
ec := experiments.NewChecker(cfg, log)
4447

45-
err := NewAnalyzePR(pf, log, errTracker, cfg).Consume(context.Background(), repoOwner, repoName,
48+
err := NewAnalyzePR(pf, log, errTracker, cfg, ec).Consume(context.Background(), repoOwner, repoName,
4649
false, cfg.GetString("TEST_GITHUB_TOKEN"), prNumber, "", userID, "test-guid", "commit-sha")
4750
assert.NoError(t, err)
4851
}

pkg/worker/analyze/analyzequeue/consumers/analyze_repo.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/golangci/golangci-api/pkg/worker/analytics"
1515
"github.com/golangci/golangci-api/pkg/worker/analyze/processors"
16+
"github.com/golangci/golangci-api/pkg/worker/lib/experiments"
1617
"github.com/golangci/golangci-api/pkg/worker/lib/github"
1718
"github.com/pkg/errors"
1819
)
@@ -26,11 +27,12 @@ type AnalyzeRepo struct {
2627
cfg config.Config
2728
}
2829

29-
func NewAnalyzeRepo(rpf *processors.RepoProcessorFactory, log logutil.Log, errTracker apperrors.Tracker, cfg config.Config) *AnalyzeRepo {
30+
func NewAnalyzeRepo(rpf *processors.RepoProcessorFactory, log logutil.Log, errTracker apperrors.Tracker, cfg config.Config, ec *experiments.Checker) *AnalyzeRepo {
3031
return &AnalyzeRepo{
3132
baseConsumer: baseConsumer{
3233
eventName: analytics.EventRepoAnalyzed,
3334
cfg: cfg,
35+
ec: ec,
3436
},
3537
rpf: rpf,
3638
log: log,

pkg/worker/analyze/analyzequeue/consumers/base_consumer.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"strings"
88
"time"
99

10+
"github.com/golangci/golangci-api/pkg/worker/lib/experiments"
11+
1012
"github.com/golangci/golangci-api/internal/shared/config"
1113

1214
"github.com/golangci/golangci-api/internal/shared/logutil"
@@ -16,6 +18,7 @@ import (
1618
type baseConsumer struct {
1719
eventName analytics.EventName
1820
cfg config.Config
21+
ec *experiments.Checker
1922
}
2023

2124
const statusOk = "ok"
@@ -28,22 +31,18 @@ func (c baseConsumer) prepareContext(ctx context.Context, trackingProps map[stri
2831
}
2932

3033
func (c baseConsumer) getUnrecoverableErrorLogger(log logutil.Log, repoFullName string) logutil.Func {
31-
ignoredRepos := c.cfg.GetStringList("LOG_AS_INFO_UNRECOVERABLE_ERRORS_FOR_REPOS")
32-
for _, ignoredRepo := range ignoredRepos {
33-
if strings.EqualFold(ignoredRepo, repoFullName) {
34-
return log.Infof
35-
}
34+
repoParts := strings.SplitN(repoFullName, "/", 2)
35+
if c.ec.IsActiveForRepo("LOG_AS_INFO_UNRECOVERABLE_ERRORS_FOR", repoParts[0], repoParts[1]) {
36+
return log.Infof
3637
}
3738

3839
return log.Warnf
3940
}
4041

4142
func (c baseConsumer) getRecoverableErrorLogger(log logutil.Log, repoFullName string) logutil.Func {
42-
ignoredRepos := c.cfg.GetStringList("LOG_AS_INFO_RECOVERABLE_ERRORS_FOR_REPOS")
43-
for _, ignoredRepo := range ignoredRepos {
44-
if strings.EqualFold(ignoredRepo, repoFullName) {
45-
return log.Infof
46-
}
43+
repoParts := strings.SplitN(repoFullName, "/", 2)
44+
if c.ec.IsActiveForRepo("LOG_AS_INFO_RECOVERABLE_ERRORS_FOR", repoParts[0], repoParts[1]) {
45+
return log.Infof
4746
}
4847

4948
return log.Errorf

pkg/worker/app/app.go

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package app
22

33
import (
4+
"fmt"
5+
"sync"
6+
7+
"github.com/golangci/golangci-api/pkg/worker/lib/experiments"
8+
49
"github.com/aws/aws-sdk-go/aws"
510
"github.com/aws/aws-sdk-go/aws/session"
611
redigo "github.com/garyburd/redigo/redis"
@@ -25,6 +30,7 @@ type App struct {
2530
trackedLog logutil.Log
2631
errTracker apperrors.Tracker
2732
cfg config.Config
33+
ec *experiments.Checker
2834
redisPool *redigo.Pool
2935
distLockFactory *redsync.Redsync
3036
awsSess *session.Session
@@ -73,17 +79,20 @@ func (a *App) buildDeps() {
7379
if a.ppf == nil {
7480
a.ppf = processors.NewBasicPullProcessorFactory(&processors.BasicPullConfig{})
7581
}
82+
if a.ec == nil {
83+
a.ec = experiments.NewChecker(a.cfg, a.trackedLog)
84+
}
7685
}
7786

7887
func (a App) buildMultiplexer() *consumers.Multiplexer {
7988
rpf := processors.NewRepoProcessorFactory(&processors.StaticRepoConfig{})
8089

8190
// it's important to use a.log, not a.trackedLog
82-
repoAnalyzer := analyzesConsumers.NewAnalyzeRepo(rpf, a.log, a.errTracker, a.cfg)
91+
repoAnalyzer := analyzesConsumers.NewAnalyzeRepo(rpf, a.log, a.errTracker, a.cfg, a.ec)
8392
repoAnalyzesRunner := repoanalyzesqueue.NewConsumer(repoAnalyzer)
8493

8594
// it's important to use a.log, not a.trackedLog
86-
pullAnalyzer := analyzesConsumers.NewAnalyzePR(a.ppf, a.log, a.errTracker, a.cfg)
95+
pullAnalyzer := analyzesConsumers.NewAnalyzePR(a.ppf, a.log, a.errTracker, a.cfg, a.ec)
8796
pullAnalyzesRunner := pullanalyzesqueue.NewConsumer(pullAnalyzer)
8897

8998
multiplexer := consumers.NewMultiplexer()
@@ -136,10 +145,26 @@ func (a App) BuildTestDeps() *TestDeps {
136145

137146
func (a App) Run() {
138147
consumerMultiplexer := a.buildMultiplexer()
139-
analyzesSQS := sqs.NewQueue(a.cfg.GetString("SQS_ANALYZES_QUEUE_URL"),
140-
a.awsSess, a.trackedLog, analyzesqueue.VisibilityTimeoutSec)
141-
consumer := consumer.NewSQS(a.trackedLog, a.cfg, analyzesSQS,
142-
consumerMultiplexer, "analyzes", analyzesqueue.VisibilityTimeoutSec)
143148

144-
consumer.Run()
149+
consumersCount := a.cfg.GetInt("CONSUMERS_COUNT", 1)
150+
a.log.Infof("Starting %d consumers...", consumersCount)
151+
152+
var wg sync.WaitGroup
153+
wg.Add(consumersCount)
154+
for i := 0; i < consumersCount; i++ {
155+
go func(i int) {
156+
defer wg.Done()
157+
158+
trackedLog := a.trackedLog.Child(fmt.Sprintf("consumer #%d", i))
159+
analyzesSQS := sqs.NewQueue(a.cfg.GetString("SQS_ANALYZES_QUEUE_URL"),
160+
a.awsSess, trackedLog, analyzesqueue.VisibilityTimeoutSec)
161+
consumer := consumer.NewSQS(trackedLog, a.cfg, analyzesSQS,
162+
consumerMultiplexer, "analyzes", analyzesqueue.VisibilityTimeoutSec)
163+
164+
consumer.Run()
165+
}(i)
166+
}
167+
168+
a.log.Infof("Started %d consumers", consumersCount)
169+
wg.Wait()
145170
}

0 commit comments

Comments
 (0)