diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index 020659c82b..ef06d8862a 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -204,12 +204,13 @@ func getIssueIndexerQueueHandler(ctx context.Context) func(items ...*IndexerMeta func populateIssueIndexer(ctx context.Context) { ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: PopulateIssueIndexer", process.SystemProcessType, true) defer finished() - if err := PopulateIssueIndexer(ctx, true); err != nil { + ctx = contextWithKeepRetry(ctx) // keep retrying since it's a background task + if err := PopulateIssueIndexer(ctx); err != nil { log.Error("Issue indexer population failed: %v", err) } } -func PopulateIssueIndexer(ctx context.Context, keepRetrying bool) error { +func PopulateIssueIndexer(ctx context.Context) error { for page := 1; ; page++ { select { case <-ctx.Done(): @@ -232,20 +233,8 @@ func PopulateIssueIndexer(ctx context.Context, keepRetrying bool) error { } for _, repo := range repos { - for { - select { - case <-ctx.Done(): - return fmt.Errorf("shutdown before completion: %w", ctx.Err()) - default: - } - if err := updateRepoIndexer(ctx, repo.ID); err != nil { - if keepRetrying && ctx.Err() == nil { - log.Warn("Retry to populate issue indexer for repo %d: %v", repo.ID, err) - continue - } - return fmt.Errorf("populate issue indexer for repo %d: %v", repo.ID, err) - } - break + if err := updateRepoIndexer(ctx, repo.ID); err != nil { + return fmt.Errorf("populate issue indexer for repo %d: %v", repo.ID, err) } } } @@ -259,8 +248,8 @@ func UpdateRepoIndexer(ctx context.Context, repoID int64) { } // UpdateIssueIndexer add/update an issue to the issue indexer -func UpdateIssueIndexer(issueID int64) { - if err := updateIssueIndexer(issueID); err != nil { +func UpdateIssueIndexer(ctx context.Context, issueID int64) { + if err := updateIssueIndexer(ctx, issueID); err != nil { log.Error("Unable to push issue %d to issue indexer: %v", issueID, err) } } diff --git a/modules/indexer/issues/util.go b/modules/indexer/issues/util.go index ca4ff6d42f..510b4060b2 100644 --- a/modules/indexer/issues/util.go +++ b/modules/indexer/issues/util.go @@ -127,15 +127,15 @@ func updateRepoIndexer(ctx context.Context, repoID int64) error { return fmt.Errorf("issue_model.GetIssueIDsByRepoID: %w", err) } for _, id := range ids { - if err := updateIssueIndexer(id); err != nil { + if err := updateIssueIndexer(ctx, id); err != nil { return err } } return nil } -func updateIssueIndexer(issueID int64) error { - return pushIssueIndexerQueue(&IndexerMetadata{ID: issueID}) +func updateIssueIndexer(ctx context.Context, issueID int64) error { + return pushIssueIndexerQueue(ctx, &IndexerMetadata{ID: issueID}) } func deleteRepoIssueIndexer(ctx context.Context, repoID int64) error { @@ -148,13 +148,21 @@ func deleteRepoIssueIndexer(ctx context.Context, repoID int64) error { if len(ids) == 0 { return nil } - return pushIssueIndexerQueue(&IndexerMetadata{ + return pushIssueIndexerQueue(ctx, &IndexerMetadata{ IDs: ids, IsDelete: true, }) } -func pushIssueIndexerQueue(data *IndexerMetadata) error { +type keepRetryKey struct{} + +// contextWithKeepRetry returns a context with a key indicating that the indexer should keep retrying. +// Please note that it's for background tasks only, and it should not be used for user requests, or it may cause blocking. +func contextWithKeepRetry(ctx context.Context) context.Context { + return context.WithValue(ctx, keepRetryKey{}, true) +} + +func pushIssueIndexerQueue(ctx context.Context, data *IndexerMetadata) error { if issueIndexerQueue == nil { // Some unit tests will trigger indexing, but the queue is not initialized. // It's OK to ignore it, but log a warning message in case it's not a unit test. @@ -162,12 +170,26 @@ func pushIssueIndexerQueue(data *IndexerMetadata) error { return nil } - err := issueIndexerQueue.Push(data) - if errors.Is(err, queue.ErrAlreadyInQueue) { - return nil + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + err := issueIndexerQueue.Push(data) + if errors.Is(err, queue.ErrAlreadyInQueue) { + return nil + } + if errors.Is(err, context.DeadlineExceeded) { // the queue is full + log.Warn("It seems that issue indexer is slow and the queue is full. Please check the issue indexer or increase the queue size.") + if ctx.Value(keepRetryKey{}) == nil { + return err + } + // It will be better to increase the queue size instead of retrying, but users may ignore the previous warning message. + // However, even it retries, it may still cause index loss when there's a deadline in the context. + log.Debug("Retry to push %+v to issue indexer queue", data) + continue + } + return err } - if errors.Is(err, context.DeadlineExceeded) { - log.Warn("It seems that issue indexer is slow and the queue is full. Please check the issue indexer or increase the queue size.") - } - return err } diff --git a/services/cron/tasks_extended.go b/services/cron/tasks_extended.go index b9fd1dfcff..1dd5d70a38 100644 --- a/services/cron/tasks_extended.go +++ b/services/cron/tasks_extended.go @@ -219,7 +219,7 @@ func registerRebuildIssueIndexer() { RunAtStart: false, Schedule: "@annually", }, func(ctx context.Context, _ *user_model.User, config Config) error { - return issue_indexer.PopulateIssueIndexer(ctx, false) + return issue_indexer.PopulateIssueIndexer(ctx) }) } diff --git a/services/indexer/notify.go b/services/indexer/notify.go index a07bf38b06..e0b87faedb 100644 --- a/services/indexer/notify.go +++ b/services/indexer/notify.go @@ -36,11 +36,11 @@ func (r *indexerNotifier) AdoptRepository(ctx context.Context, doer, u *user_mod func (r *indexerNotifier) CreateIssueComment(ctx context.Context, doer *user_model.User, repo *repo_model.Repository, issue *issues_model.Issue, comment *issues_model.Comment, mentions []*user_model.User, ) { - issue_indexer.UpdateIssueIndexer(issue.ID) + issue_indexer.UpdateIssueIndexer(ctx, issue.ID) } func (r *indexerNotifier) NewIssue(ctx context.Context, issue *issues_model.Issue, mentions []*user_model.User) { - issue_indexer.UpdateIssueIndexer(issue.ID) + issue_indexer.UpdateIssueIndexer(ctx, issue.ID) } func (r *indexerNotifier) NewPullRequest(ctx context.Context, pr *issues_model.PullRequest, mentions []*user_model.User) { @@ -48,7 +48,7 @@ func (r *indexerNotifier) NewPullRequest(ctx context.Context, pr *issues_model.P log.Error("LoadIssue: %v", err) return } - issue_indexer.UpdateIssueIndexer(pr.Issue.ID) + issue_indexer.UpdateIssueIndexer(ctx, pr.Issue.ID) } func (r *indexerNotifier) UpdateComment(ctx context.Context, doer *user_model.User, c *issues_model.Comment, oldContent string) { @@ -56,7 +56,7 @@ func (r *indexerNotifier) UpdateComment(ctx context.Context, doer *user_model.Us log.Error("LoadIssue: %v", err) return } - issue_indexer.UpdateIssueIndexer(c.Issue.ID) + issue_indexer.UpdateIssueIndexer(ctx, c.Issue.ID) } func (r *indexerNotifier) DeleteComment(ctx context.Context, doer *user_model.User, comment *issues_model.Comment) { @@ -64,7 +64,7 @@ func (r *indexerNotifier) DeleteComment(ctx context.Context, doer *user_model.Us log.Error("LoadIssue: %v", err) return } - issue_indexer.UpdateIssueIndexer(comment.Issue.ID) + issue_indexer.UpdateIssueIndexer(ctx, comment.Issue.ID) } func (r *indexerNotifier) DeleteRepository(ctx context.Context, doer *user_model.User, repo *repo_model.Repository) { @@ -120,13 +120,13 @@ func (r *indexerNotifier) ChangeDefaultBranch(ctx context.Context, repo *repo_mo } func (r *indexerNotifier) IssueChangeContent(ctx context.Context, doer *user_model.User, issue *issues_model.Issue, oldContent string) { - issue_indexer.UpdateIssueIndexer(issue.ID) + issue_indexer.UpdateIssueIndexer(ctx, issue.ID) } func (r *indexerNotifier) IssueChangeTitle(ctx context.Context, doer *user_model.User, issue *issues_model.Issue, oldTitle string) { - issue_indexer.UpdateIssueIndexer(issue.ID) + issue_indexer.UpdateIssueIndexer(ctx, issue.ID) } func (r *indexerNotifier) IssueChangeRef(ctx context.Context, doer *user_model.User, issue *issues_model.Issue, oldRef string) { - issue_indexer.UpdateIssueIndexer(issue.ID) + issue_indexer.UpdateIssueIndexer(ctx, issue.ID) } diff --git a/tests/integration/issue_test.go b/tests/integration/issue_test.go index 853e565b0f..ac06b487db 100644 --- a/tests/integration/issue_test.go +++ b/tests/integration/issue_test.go @@ -4,6 +4,7 @@ package integration import ( + "context" "fmt" "net/http" "net/url" @@ -99,7 +100,7 @@ func TestViewIssuesKeyword(t *testing.T) { RepoID: repo.ID, Index: 1, }) - issues.UpdateIssueIndexer(issue.ID) + issues.UpdateIssueIndexer(context.Background(), issue.ID) time.Sleep(time.Second * 1) const keyword = "first" req := NewRequestf(t, "GET", "%s/issues?q=%s", repo.Link(), keyword)