@@ -84,25 +84,25 @@ func (a *App) buildDeps() {
84
84
}
85
85
}
86
86
87
- func (a App ) buildMultiplexer () * consumers.Multiplexer {
87
+ func (a App ) buildMultiplexer (log logutil. Log ) * consumers.Multiplexer {
88
88
rpf := processors .NewRepoProcessorFactory (& processors.StaticRepoConfig {})
89
89
90
- // it's important to use a.log, not a.trackedLog
91
- repoAnalyzer := analyzesConsumers .NewAnalyzeRepo (rpf , a . log , a .errTracker , a .cfg , a .ec )
90
+ // it's important to use log ( a.log.Child) , not a.trackedLog
91
+ repoAnalyzer := analyzesConsumers .NewAnalyzeRepo (rpf , log , a .errTracker , a .cfg , a .ec )
92
92
repoAnalyzesRunner := repoanalyzesqueue .NewConsumer (repoAnalyzer )
93
93
94
- // it's important to use a.log, not a.trackedLog
95
- pullAnalyzer := analyzesConsumers .NewAnalyzePR (a .ppf , a . log , a .errTracker , a .cfg , a .ec )
94
+ // it's important to use log ( a.log.Child) , not a.trackedLog
95
+ pullAnalyzer := analyzesConsumers .NewAnalyzePR (a .ppf , log , a .errTracker , a .cfg , a .ec )
96
96
pullAnalyzesRunner := pullanalyzesqueue .NewConsumer (pullAnalyzer )
97
97
98
98
multiplexer := consumers .NewMultiplexer ()
99
99
multiplexer .SetResultLogger (func (error ) {}) // already logged, no double logging
100
100
101
101
if err := repoAnalyzesRunner .Register (multiplexer , a .distLockFactory ); err != nil {
102
- a . log .Fatalf ("Failed to register repo analyzer consumer: %s" , err )
102
+ log .Fatalf ("Failed to register repo analyzer consumer: %s" , err )
103
103
}
104
104
if err := pullAnalyzesRunner .Register (multiplexer , a .distLockFactory ); err != nil {
105
- a . log .Fatalf ("Failed to register pull analyzer consumer: %s" , err )
105
+ log .Fatalf ("Failed to register pull analyzer consumer: %s" , err )
106
106
}
107
107
108
108
return multiplexer
@@ -144,8 +144,6 @@ func (a App) BuildTestDeps() *TestDeps {
144
144
}
145
145
146
146
func (a App ) Run () {
147
- consumerMultiplexer := a .buildMultiplexer ()
148
-
149
147
consumersCount := a .cfg .GetInt ("CONSUMERS_COUNT" , 1 )
150
148
a .log .Infof ("Starting %d consumers..." , consumersCount )
151
149
@@ -155,7 +153,9 @@ func (a App) Run() {
155
153
go func (i int ) {
156
154
defer wg .Done ()
157
155
158
- trackedLog := a .trackedLog .Child (fmt .Sprintf ("consumer #%d" , i ))
156
+ logName := fmt .Sprintf ("consumer #%d" , i )
157
+ consumerMultiplexer := a .buildMultiplexer (a .log .Child (logName ))
158
+ trackedLog := a .trackedLog .Child (logName )
159
159
analyzesSQS := sqs .NewQueue (a .cfg .GetString ("SQS_ANALYZES_QUEUE_URL" ),
160
160
a .awsSess , trackedLog , analyzesqueue .VisibilityTimeoutSec )
161
161
consumer := consumer .NewSQS (trackedLog , a .cfg , analyzesSQS ,
0 commit comments