Skip to content

Commit 356de34

Browse files
committed
Prevent deadlock in indexer initialisation during graceful restart
1 parent bcbc9f3 commit 356de34

File tree

4 files changed

+90
-78
lines changed

4 files changed

+90
-78
lines changed

models/repo_indexer.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,10 @@ func InitRepoIndexer() {
7070
return
7171
}
7272
repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength)
73-
indexer.InitRepoIndexer(populateRepoIndexerAsynchronously)
74-
go processRepoIndexerOperationQueue()
73+
go func() {
74+
indexer.InitRepoIndexer(populateRepoIndexerAsynchronously)
75+
go processRepoIndexerOperationQueue()
76+
}()
7577
}
7678

7779
// populateRepoIndexerAsynchronously asynchronously populates the repo indexer

modules/indexer/issues/indexer.go

Lines changed: 83 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
package issues
66

77
import (
8-
"fmt"
8+
"sync"
99

1010
"code.gitea.io/gitea/models"
1111
"code.gitea.io/gitea/modules/log"
@@ -46,77 +46,98 @@ type Indexer interface {
4646
}
4747

4848
var (
49+
issueIndexerChannel = make(chan *IndexerData, setting.Indexer.UpdateQueueLength)
4950
// issueIndexerQueue queue of issue ids to be updated
5051
issueIndexerQueue Queue
5152
issueIndexer Indexer
53+
wg sync.WaitGroup
5254
)
5355

5456
// InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until
5557
// all issue index done.
56-
func InitIssueIndexer(syncReindex bool) error {
57-
var populate bool
58-
var dummyQueue bool
59-
switch setting.Indexer.IssueType {
60-
case "bleve":
61-
issueIndexer = NewBleveIndexer(setting.Indexer.IssuePath)
62-
exist, err := issueIndexer.Init()
63-
if err != nil {
64-
return err
58+
func InitIssueIndexer(syncReindex bool) {
59+
wg.Add(1)
60+
go func() {
61+
var populate bool
62+
var dummyQueue bool
63+
switch setting.Indexer.IssueType {
64+
case "bleve":
65+
issueIndexer = NewBleveIndexer(setting.Indexer.IssuePath)
66+
exist, err := issueIndexer.Init()
67+
if err != nil {
68+
log.Fatal("Unable to initialise issueIndexer: %v", err)
69+
}
70+
populate = !exist
71+
case "db":
72+
issueIndexer = &DBIndexer{}
73+
dummyQueue = true
74+
default:
75+
log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
6576
}
66-
populate = !exist
67-
case "db":
68-
issueIndexer = &DBIndexer{}
69-
dummyQueue = true
70-
default:
71-
return fmt.Errorf("unknow issue indexer type: %s", setting.Indexer.IssueType)
72-
}
7377

74-
if dummyQueue {
75-
issueIndexerQueue = &DummyQueue{}
76-
return nil
77-
}
78+
if dummyQueue {
79+
issueIndexerQueue = &DummyQueue{}
80+
} else {
81+
var err error
82+
switch setting.Indexer.IssueQueueType {
83+
case setting.LevelQueueType:
84+
issueIndexerQueue, err = NewLevelQueue(
85+
issueIndexer,
86+
setting.Indexer.IssueQueueDir,
87+
setting.Indexer.IssueQueueBatchNumber)
88+
if err != nil {
89+
log.Fatal(
90+
"Unable create level queue for issue queue dir: %s batch number: %d : %v",
91+
setting.Indexer.IssueQueueDir,
92+
setting.Indexer.IssueQueueBatchNumber,
93+
err)
94+
}
95+
case setting.ChannelQueueType:
96+
issueIndexerQueue = NewChannelQueue(issueIndexer, setting.Indexer.IssueQueueBatchNumber)
97+
case setting.RedisQueueType:
98+
addrs, pass, idx, err := parseConnStr(setting.Indexer.IssueQueueConnStr)
99+
if err != nil {
100+
log.Fatal("Unable to parse connection string for RedisQueueType: %s : %v",
101+
setting.Indexer.IssueQueueConnStr,
102+
err)
103+
}
104+
issueIndexerQueue, err = NewRedisQueue(addrs, pass, idx, issueIndexer, setting.Indexer.IssueQueueBatchNumber)
105+
if err != nil {
106+
log.Fatal("Unable to create RedisQueue: %s : %v",
107+
setting.Indexer.IssueQueueConnStr,
108+
err)
109+
}
110+
default:
111+
log.Fatal("Unsupported indexer queue type: %v",
112+
setting.Indexer.IssueQueueType)
113+
}
78114

79-
var err error
80-
switch setting.Indexer.IssueQueueType {
81-
case setting.LevelQueueType:
82-
issueIndexerQueue, err = NewLevelQueue(
83-
issueIndexer,
84-
setting.Indexer.IssueQueueDir,
85-
setting.Indexer.IssueQueueBatchNumber)
86-
if err != nil {
87-
return err
88-
}
89-
case setting.ChannelQueueType:
90-
issueIndexerQueue = NewChannelQueue(issueIndexer, setting.Indexer.IssueQueueBatchNumber)
91-
case setting.RedisQueueType:
92-
addrs, pass, idx, err := parseConnStr(setting.Indexer.IssueQueueConnStr)
93-
if err != nil {
94-
return err
95-
}
96-
issueIndexerQueue, err = NewRedisQueue(addrs, pass, idx, issueIndexer, setting.Indexer.IssueQueueBatchNumber)
97-
if err != nil {
98-
return err
115+
go func() {
116+
err = issueIndexerQueue.Run()
117+
if err != nil {
118+
log.Error("issueIndexerQueue.Run: %v", err)
119+
}
120+
}()
99121
}
100-
default:
101-
return fmt.Errorf("Unsupported indexer queue type: %v", setting.Indexer.IssueQueueType)
102-
}
103122

104-
go func() {
105-
err = issueIndexerQueue.Run()
106-
if err != nil {
107-
log.Error("issueIndexerQueue.Run: %v", err)
108-
}
109-
}()
123+
go func() {
124+
for data := range issueIndexerChannel {
125+
_ = issueIndexerQueue.Push(data)
126+
}
127+
}()
110128

111-
if populate {
112-
if syncReindex {
113-
populateIssueIndexer()
114-
} else {
115-
go populateIssueIndexer()
129+
if populate {
130+
if syncReindex {
131+
populateIssueIndexer()
132+
} else {
133+
go populateIssueIndexer()
134+
}
116135
}
136+
wg.Done()
137+
}()
138+
if syncReindex {
139+
wg.Wait()
117140
}
118-
119-
return nil
120141
}
121142

122143
// populateIssueIndexer populate the issue indexer with issue data
@@ -166,13 +187,13 @@ func UpdateIssueIndexer(issue *models.Issue) {
166187
comments = append(comments, comment.Content)
167188
}
168189
}
169-
_ = issueIndexerQueue.Push(&IndexerData{
190+
issueIndexerChannel <- &IndexerData{
170191
ID: issue.ID,
171192
RepoID: issue.RepoID,
172193
Title: issue.Title,
173194
Content: issue.Content,
174195
Comments: comments,
175-
})
196+
}
176197
}
177198

178199
// DeleteRepoIssueIndexer deletes repo's all issues indexes
@@ -188,14 +209,15 @@ func DeleteRepoIssueIndexer(repo *models.Repository) {
188209
return
189210
}
190211

191-
_ = issueIndexerQueue.Push(&IndexerData{
212+
issueIndexerChannel <- &IndexerData{
192213
IDs: ids,
193214
IsDelete: true,
194-
})
215+
}
195216
}
196217

197218
// SearchIssuesByKeyword search issue ids by keywords and repo id
198219
func SearchIssuesByKeyword(repoID int64, keyword string) ([]int64, error) {
220+
wg.Wait()
199221
var issueIDs []int64
200222
res, err := issueIndexer.Search(keyword, repoID, 1000, 0)
201223
if err != nil {

modules/indexer/issues/indexer_test.go

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package issues
66

77
import (
8-
"fmt"
98
"os"
109
"path/filepath"
1110
"testing"
@@ -17,11 +16,6 @@ import (
1716
"github.com/stretchr/testify/assert"
1817
)
1918

20-
func fatalTestError(fmtStr string, args ...interface{}) {
21-
fmt.Fprintf(os.Stderr, fmtStr, args...)
22-
os.Exit(1)
23-
}
24-
2519
func TestMain(m *testing.M) {
2620
models.MainTest(m, filepath.Join("..", "..", ".."))
2721
}
@@ -32,9 +26,7 @@ func TestBleveSearchIssues(t *testing.T) {
3226
os.RemoveAll(setting.Indexer.IssueQueueDir)
3327
os.RemoveAll(setting.Indexer.IssuePath)
3428
setting.Indexer.IssueType = "bleve"
35-
if err := InitIssueIndexer(true); err != nil {
36-
fatalTestError("Error InitIssueIndexer: %v\n", err)
37-
}
29+
InitIssueIndexer(true)
3830

3931
time.Sleep(5 * time.Second)
4032

@@ -59,9 +51,7 @@ func TestDBSearchIssues(t *testing.T) {
5951
assert.NoError(t, models.PrepareTestDatabase())
6052

6153
setting.Indexer.IssueType = "db"
62-
if err := InitIssueIndexer(true); err != nil {
63-
fatalTestError("Error InitIssueIndexer: %v\n", err)
64-
}
54+
InitIssueIndexer(true)
6555

6656
ids, err := SearchIssuesByKeyword(1, "issue2")
6757
assert.NoError(t, err)

routers/init.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,7 @@ func GlobalInit() {
9494

9595
// Booting long running goroutines.
9696
cron.NewContext()
97-
if err := issue_indexer.InitIssueIndexer(false); err != nil {
98-
log.Fatal("Failed to initialize issue indexer: %v", err)
99-
}
97+
issue_indexer.InitIssueIndexer(false)
10098
models.InitRepoIndexer()
10199
models.InitSyncMirrors()
102100
models.InitDeliverHooks()

0 commit comments

Comments
 (0)