Skip to content

Commit 3adca0d

Browse files
authored
Add concurrency to backfill-redis (#1504)
* Add concurrency to backfill-redis Signed-off-by: Cody Soyland <[email protected]> * Add result summary Signed-off-by: Cody Soyland <[email protected]> --------- Signed-off-by: Cody Soyland <[email protected]>
1 parent 795a236 commit 3adca0d

File tree

1 file changed

+99
-33
lines changed

1 file changed

+99
-33
lines changed

cmd/backfill-redis/main.go

Lines changed: 99 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
2121
To run:
2222
go run cmd/backfill-redis/main.go --rekor-address <address> \
23-
--hostname <redis-hostname> --port <redis-port>
24-
--start <first index to backfill> --end <last index to backfill>
23+
--hostname <redis-hostname> --port <redis-port> --concurrency <num-of-workers> \
24+
--start <first index to backfill> --end <last index to backfill> [--dry-run]
2525
*/
2626

2727
package main
@@ -30,13 +30,17 @@ import (
3030
"bytes"
3131
"context"
3232
"encoding/base64"
33+
"errors"
3334
"flag"
3435
"fmt"
3536
"log"
3637
"os"
38+
"os/signal"
39+
"syscall"
3740

3841
"github.com/go-openapi/runtime"
3942
"github.com/redis/go-redis/v9"
43+
"golang.org/x/sync/errgroup"
4044
"sigs.k8s.io/release-utils/version"
4145

4246
"github.com/sigstore/rekor/pkg/client"
@@ -66,6 +70,8 @@ var (
6670
endIndex = flag.Int("end", -1, "Last index to backfill")
6771
rekorAddress = flag.String("rekor-address", "", "Address for Rekor, e.g. https://rekor.sigstore.dev")
6872
versionFlag = flag.Bool("version", false, "Print the current version of Backfill Redis")
73+
concurrency = flag.Int("concurrency", 1, "Number of workers to use for backfill")
74+
dryRun = flag.Bool("dry-run", false, "Dry run - don't actually insert into Redis")
6975
)
7076

7177
func main() {
@@ -106,45 +112,99 @@ func main() {
106112
log.Fatalf("creating rekor client: %v", err)
107113
}
108114

109-
for i := *startIndex; i <= *endIndex; i++ {
110-
params := entries.NewGetLogEntryByIndexParamsWithContext(context.Background())
111-
params.SetLogIndex(int64(i))
112-
resp, err := rekorClient.Entries.GetLogEntryByIndex(params)
113-
if err != nil {
114-
log.Fatalf("retrieving log uuid by index: %v", err)
115+
ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
116+
group, ctx := errgroup.WithContext(ctx)
117+
group.SetLimit(*concurrency)
118+
119+
type result struct {
120+
index int
121+
parseErrs []error
122+
insertErrs []error
123+
}
124+
var resultChan = make(chan result)
125+
parseErrs := make([]int, 0)
126+
insertErrs := make([]int, 0)
127+
128+
go func() {
129+
for r := range resultChan {
130+
if len(r.parseErrs) > 0 {
131+
parseErrs = append(parseErrs, r.index)
132+
}
133+
if len(r.insertErrs) > 0 {
134+
insertErrs = append(insertErrs, r.index)
135+
}
115136
}
116-
var insertErrs []error
117-
for uuid, entry := range resp.Payload {
118-
// uuid is the global UUID - tree ID and entry UUID
119-
e, _, _, err := unmarshalEntryImpl(entry.Body.(string))
137+
}()
138+
139+
for i := *startIndex; i <= *endIndex; i++ {
140+
index := i // capture loop variable for closure
141+
group.Go(func() error {
142+
params := entries.NewGetLogEntryByIndexParamsWithContext(ctx)
143+
params.SetLogIndex(int64(index))
144+
resp, err := rekorClient.Entries.GetLogEntryByIndex(params)
120145
if err != nil {
121-
insertErrs = append(insertErrs, fmt.Errorf("error unmarshalling entry for %s: %v", uuid, err))
122-
continue
146+
// in case of sigterm, just return to exit gracefully
147+
if errors.Is(err, context.Canceled) {
148+
return nil
149+
}
150+
log.Fatalf("retrieving log uuid by index: %v", err)
123151
}
124-
keys, err := e.IndexKeys()
125-
if err != nil {
126-
insertErrs = append(insertErrs, fmt.Errorf("error building index keys for %s: %v", uuid, err))
127-
continue
152+
var parseErrs []error
153+
var insertErrs []error
154+
for uuid, entry := range resp.Payload {
155+
// uuid is the global UUID - tree ID and entry UUID
156+
e, _, _, err := unmarshalEntryImpl(entry.Body.(string))
157+
if err != nil {
158+
parseErrs = append(parseErrs, fmt.Errorf("error unmarshalling entry for %s: %v", uuid, err))
159+
continue
160+
}
161+
keys, err := e.IndexKeys()
162+
if err != nil {
163+
parseErrs = append(parseErrs, fmt.Errorf("error building index keys for %s: %v", uuid, err))
164+
continue
165+
}
166+
for _, key := range keys {
167+
// remove the key-value pair from the index in case it already exists
168+
if err := removeFromIndex(ctx, redisClient, key, uuid); err != nil {
169+
insertErrs = append(insertErrs, fmt.Errorf("error removing UUID %s with key %s: %v", uuid, key, err))
170+
}
171+
if err := addToIndex(ctx, redisClient, key, uuid); err != nil {
172+
insertErrs = append(insertErrs, fmt.Errorf("error inserting UUID %s with key %s: %v", uuid, key, err))
173+
}
174+
fmt.Printf("Uploaded Redis entry %s, index %d, key %s\n", uuid, index, key)
175+
}
128176
}
129-
for _, key := range keys {
130-
// remove the key-value pair from the index in case it already exists
131-
if err := removeFromIndex(context.Background(), redisClient, key, uuid); err != nil {
132-
insertErrs = append(insertErrs, fmt.Errorf("error removing UUID %s with key %s: %v", uuid, key, err))
177+
if len(insertErrs) != 0 || len(parseErrs) != 0 {
178+
fmt.Printf("Errors with log index %d:\n", index)
179+
for _, e := range insertErrs {
180+
fmt.Println(e)
133181
}
134-
if err := addToIndex(context.Background(), redisClient, key, uuid); err != nil {
135-
insertErrs = append(insertErrs, fmt.Errorf("error inserting UUID %s with key %s: %v", uuid, key, err))
182+
for _, e := range parseErrs {
183+
fmt.Println(e)
136184
}
137-
fmt.Printf("Uploaded Redis entry %s, index %d, key %s\n", uuid, i, key)
185+
} else {
186+
fmt.Printf("Completed log index %d\n", index)
138187
}
139-
}
140-
if len(insertErrs) != 0 {
141-
fmt.Printf("Errors with log index %d:\n", i)
142-
for _, e := range insertErrs {
143-
fmt.Println(e)
188+
resultChan <- result{
189+
index: index,
190+
parseErrs: parseErrs,
191+
insertErrs: insertErrs,
144192
}
145-
} else {
146-
fmt.Printf("Completed log index %d\n", i)
147-
}
193+
194+
return nil
195+
})
196+
}
197+
err = group.Wait()
198+
if err != nil {
199+
log.Fatalf("error running backfill: %v", err)
200+
}
201+
close(resultChan)
202+
fmt.Println("Backfill complete")
203+
if len(parseErrs) > 0 {
204+
fmt.Printf("Failed to parse %d entries: %v\n", len(parseErrs), parseErrs)
205+
}
206+
if len(insertErrs) > 0 {
207+
fmt.Printf("Failed to insert/remove %d entries: %v\n", len(insertErrs), insertErrs)
148208
}
149209
}
150210

@@ -171,12 +231,18 @@ func unmarshalEntryImpl(e string) (types.EntryImpl, string, string, error) {
171231
// removeFromIndex removes all occurrences of a value from a given key. This guards against
172232
// multiple invocations of backfilling creating duplicates.
173233
func removeFromIndex(ctx context.Context, redisClient *redis.Client, key, value string) error {
234+
if *dryRun {
235+
return nil
236+
}
174237
_, err := redisClient.LRem(ctx, key, 0, value).Result()
175238
return err
176239
}
177240

178241
// addToIndex pushes a value onto a key of type list.
179242
func addToIndex(ctx context.Context, redisClient *redis.Client, key, value string) error {
243+
if *dryRun {
244+
return nil
245+
}
180246
_, err := redisClient.LPush(ctx, key, value).Result()
181247
return err
182248
}

0 commit comments

Comments
 (0)