[GITEA] avoid superfluous synchronized pull_request run when opening a PR (#2236)

* Split TestPullRequest out of AddTestPullRequestTask
* Before scheduling the task, AddTestPullRequestTask stores the max
  index of the repository
* When the task runs, it does not take into account pull requests that
  have an index higher than the recorded max index

When AddTestPullRequestTask is called with isSync == true, it is the
direct consequence of a new commit being pushed. Forgejo knows nothing
of this new commit yet. If a PR is created later and its head
references the new commit, it will have an index that is higher and
must not be taken into account. It would be acting and triggering a
notification for a PR based on an event that happened before it
existed.

Refs: https://codeberg.org/forgejo/forgejo/issues/2009
Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/2236
Reviewed-by: Gusted <gusted@noreply.codeberg.org>
Co-authored-by: Earl Warren <contact@earl-warren.org>
Co-committed-by: Earl Warren <contact@earl-warren.org>
This commit is contained in:
Earl Warren 2024-02-04 22:28:18 +00:00 committed by Earl Warren
parent f36726b922
commit b3be895a30
9 changed files with 333 additions and 91 deletions

View file

@ -9,6 +9,14 @@ import (
"code.gitea.io/gitea/models/db" "code.gitea.io/gitea/models/db"
) )
func GetMaxIssueIndexForRepo(ctx context.Context, repoID int64) (int64, error) {
var max int64
if _, err := db.GetEngine(ctx).Select("MAX(`index`)").Table("issue").Where("repo_id=?", repoID).Get(&max); err != nil {
return 0, err
}
return max, nil
}
// RecalculateIssueIndexForRepo create issue_index for repo if not exist and // RecalculateIssueIndexForRepo create issue_index for repo if not exist and
// update it based on highest index of existing issues assigned to a repo // update it based on highest index of existing issues assigned to a repo
func RecalculateIssueIndexForRepo(ctx context.Context, repoID int64) error { func RecalculateIssueIndexForRepo(ctx context.Context, repoID int64) error {
@ -18,8 +26,8 @@ func RecalculateIssueIndexForRepo(ctx context.Context, repoID int64) error {
} }
defer committer.Close() defer committer.Close()
var max int64 max, err := GetMaxIssueIndexForRepo(ctx, repoID)
if _, err = db.GetEngine(ctx).Select(" MAX(`index`)").Table("issue").Where("repo_id=?", repoID).Get(&max); err != nil { if err != nil {
return err return err
} }

View file

@ -0,0 +1,38 @@
// Copyright 2024 The Forgejo Authors
// SPDX-License-Identifier: MIT
package issues_test
import (
"testing"
"code.gitea.io/gitea/models/db"
issues_model "code.gitea.io/gitea/models/issues"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/models/unittest"
"github.com/stretchr/testify/assert"
)
func TestGetMaxIssueIndexForRepo(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: 1})
maxPR, err := issues_model.GetMaxIssueIndexForRepo(db.DefaultContext, repo.ID)
assert.NoError(t, err)
issue := testCreateIssue(t, repo.ID, repo.OwnerID, "title1", "content1", false)
assert.Greater(t, issue.Index, maxPR)
maxPR, err = issues_model.GetMaxIssueIndexForRepo(db.DefaultContext, repo.ID)
assert.NoError(t, err)
pull := testCreateIssue(t, repo.ID, repo.OwnerID, "title2", "content2", true)
assert.Greater(t, pull.Index, maxPR)
maxPR, err = issues_model.GetMaxIssueIndexForRepo(db.DefaultContext, repo.ID)
assert.NoError(t, err)
assert.Equal(t, maxPR, pull.Index)
}

View file

@ -50,6 +50,14 @@ func listPullRequestStatement(ctx context.Context, baseRepoID int64, opts *PullR
return sess, nil return sess, nil
} }
func GetUnmergedPullRequestsByHeadInfoMax(ctx context.Context, repoID, maxIndex int64, branch string) ([]*PullRequest, error) {
prs := make([]*PullRequest, 0, 2)
sess := db.GetEngine(ctx).
Join("INNER", "issue", "issue.id = `pull_request`.issue_id").
Where("`pull_request`.head_repo_id = ? AND `pull_request`.head_branch = ? AND `pull_request`.has_merged = ? AND `issue`.is_closed = ? AND `pull_request`.flow = ? AND `issue`.`index` <= ?", repoID, branch, false, false, PullRequestFlowGithub, maxIndex)
return prs, sess.Find(&prs)
}
// GetUnmergedPullRequestsByHeadInfo returns all pull requests that are open and has not been merged // GetUnmergedPullRequestsByHeadInfo returns all pull requests that are open and has not been merged
func GetUnmergedPullRequestsByHeadInfo(ctx context.Context, repoID int64, branch string) ([]*PullRequest, error) { func GetUnmergedPullRequestsByHeadInfo(ctx context.Context, repoID int64, branch string) ([]*PullRequest, error) {
prs := make([]*PullRequest, 0, 2) prs := make([]*PullRequest, 0, 2)

View file

@ -4,6 +4,7 @@
package issues_test package issues_test
import ( import (
"fmt"
"testing" "testing"
"code.gitea.io/gitea/models/db" "code.gitea.io/gitea/models/db"
@ -158,6 +159,91 @@ func TestGetUnmergedPullRequestsByHeadInfo(t *testing.T) {
} }
} }
func TestGetUnmergedPullRequestsByHeadInfoMax(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
repoID := int64(1)
maxPR := int64(0)
prs, err := issues_model.GetUnmergedPullRequestsByHeadInfoMax(db.DefaultContext, repoID, maxPR, "branch2")
assert.NoError(t, err)
assert.Len(t, prs, 0)
maxPR, err = issues_model.GetMaxIssueIndexForRepo(db.DefaultContext, repoID)
assert.NoError(t, err)
prs, err = issues_model.GetUnmergedPullRequestsByHeadInfoMax(db.DefaultContext, repoID, maxPR, "branch2")
assert.NoError(t, err)
assert.Len(t, prs, 1)
for _, pr := range prs {
assert.Equal(t, int64(1), pr.HeadRepoID)
assert.Equal(t, "branch2", pr.HeadBranch)
}
pr := prs[0]
for _, testCase := range []struct {
table string
field string
id int64
match any
nomatch any
}{
{
table: "issue",
field: "is_closed",
id: pr.IssueID,
match: false,
nomatch: true,
},
{
table: "pull_request",
field: "flow",
id: pr.ID,
match: issues_model.PullRequestFlowGithub,
nomatch: issues_model.PullRequestFlowAGit,
},
{
table: "pull_request",
field: "head_repo_id",
id: pr.ID,
match: pr.HeadRepoID,
nomatch: 0,
},
{
table: "pull_request",
field: "head_branch",
id: pr.ID,
match: pr.HeadBranch,
nomatch: "something else",
},
{
table: "pull_request",
field: "has_merged",
id: pr.ID,
match: false,
nomatch: true,
},
} {
t.Run(testCase.field, func(t *testing.T) {
update := fmt.Sprintf("UPDATE `%s` SET `%s` = ? WHERE `id` = ?", testCase.table, testCase.field)
// expect no match
_, err = db.GetEngine(db.DefaultContext).Exec(update, testCase.nomatch, testCase.id)
assert.NoError(t, err)
prs, err = issues_model.GetUnmergedPullRequestsByHeadInfoMax(db.DefaultContext, repoID, maxPR, "branch2")
assert.NoError(t, err)
assert.Len(t, prs, 0)
// expect one match
_, err = db.GetEngine(db.DefaultContext).Exec(update, testCase.match, testCase.id)
assert.NoError(t, err)
prs, err = issues_model.GetUnmergedPullRequestsByHeadInfoMax(db.DefaultContext, repoID, maxPR, "branch2")
assert.NoError(t, err)
assert.Len(t, prs, 1)
// identical to the known PR
assert.Equal(t, pr.ID, prs[0].ID)
})
}
}
func TestGetUnmergedPullRequestsByBaseInfo(t *testing.T) { func TestGetUnmergedPullRequestsByBaseInfo(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase()) assert.NoError(t, unittest.PrepareTestDatabase())
prs, err := issues_model.GetUnmergedPullRequestsByBaseInfo(db.DefaultContext, 1, "master") prs, err := issues_model.GetUnmergedPullRequestsByBaseInfo(db.DefaultContext, 1, "master")

View file

@ -181,7 +181,7 @@ func Merge(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.U
} }
defer func() { defer func() {
go AddTestPullRequestTask(doer, pr.BaseRepo.ID, pr.BaseBranch, false, "", "") AddTestPullRequestTask(ctx, doer, pr.BaseRepo.ID, pr.BaseBranch, false, "", "")
}() }()
pr.MergedCommitID, err = doMergeAndPush(ctx, pr, doer, mergeStyle, expectedHeadCommitID, message) pr.MergedCommitID, err = doMergeAndPush(ctx, pr, doer, mergeStyle, expectedHeadCommitID, message)

View file

@ -287,113 +287,129 @@ func checkForInvalidation(ctx context.Context, requests issues_model.PullRequest
// AddTestPullRequestTask adds new test tasks by given head/base repository and head/base branch, // AddTestPullRequestTask adds new test tasks by given head/base repository and head/base branch,
// and generate new patch for testing as needed. // and generate new patch for testing as needed.
func AddTestPullRequestTask(doer *user_model.User, repoID int64, branch string, isSync bool, oldCommitID, newCommitID string) { func AddTestPullRequestTask(ctx context.Context, doer *user_model.User, repoID int64, branch string, isSync bool, oldCommitID, newCommitID string) {
log.Trace("AddTestPullRequestTask [head_repo_id: %d, head_branch: %s]: finding pull requests", repoID, branch) // When TestPullRequest runs it must ignore any PR with an index > maxPR because they
graceful.GetManager().RunWithShutdownContext(func(ctx context.Context) { // would have been created after the goroutine started. They are in the future.
// This guards the following race:
// * commit A is pushed
// * goroutine starts but does not run TestPullRequest yet
// * a pull request with commit A as the head is created
// * goroutine continues and runs TestPullRequest
maxPR, err := issues_model.GetMaxIssueIndexForRepo(ctx, repoID)
if err != nil {
log.Error("AddTestPullRequestTask GetMaxIssueIndexForRepo(%d): %v", repoID, err)
return
}
log.Trace("AddTestPullRequestTask [head_repo_id: %d, head_branch: %s]: only pull requests with index <= %d will be considered", repoID, branch, maxPR)
go graceful.GetManager().RunWithShutdownContext(func(ctx context.Context) {
// There is no sensible way to shut this down ":-(" // There is no sensible way to shut this down ":-("
// If you don't let it run all the way then you will lose data // If you don't let it run all the way then you will lose data
// TODO: graceful: AddTestPullRequestTask needs to become a queue! // TODO: graceful: TestPullRequest needs to become a queue!
// GetUnmergedPullRequestsByHeadInfo() only return open and unmerged PR. TestPullRequest(ctx, doer, repoID, maxPR, branch, isSync, oldCommitID, newCommitID)
prs, err := issues_model.GetUnmergedPullRequestsByHeadInfo(ctx, repoID, branch) })
if err != nil { }
log.Error("Find pull requests [head_repo_id: %d, head_branch: %s]: %v", repoID, branch, err)
return
}
for _, pr := range prs { func TestPullRequest(ctx context.Context, doer *user_model.User, repoID, maxPR int64, branch string, isSync bool, oldCommitID, newCommitID string) {
log.Trace("Updating PR[%d]: composing new test task", pr.ID) // GetUnmergedPullRequestsByHeadInfo() only return open and unmerged PR.
if pr.Flow == issues_model.PullRequestFlowGithub { prs, err := issues_model.GetUnmergedPullRequestsByHeadInfoMax(ctx, repoID, maxPR, branch)
if err := PushToBaseRepo(ctx, pr); err != nil { if err != nil {
log.Error("PushToBaseRepo: %v", err) log.Error("Find pull requests [head_repo_id: %d, head_branch: %s]: %v", repoID, branch, err)
continue return
} }
} else {
for _, pr := range prs {
log.Trace("Updating PR[id=%d,index=%d]: composing new test task", pr.ID, pr.Index)
if pr.Flow == issues_model.PullRequestFlowGithub {
if err := PushToBaseRepo(ctx, pr); err != nil {
log.Error("PushToBaseRepo: %v", err)
continue continue
} }
} else {
AddToTaskQueue(ctx, pr) continue
comment, err := CreatePushPullComment(ctx, doer, pr, oldCommitID, newCommitID)
if err == nil && comment != nil {
notify_service.PullRequestPushCommits(ctx, doer, pr, comment)
}
} }
if isSync { AddToTaskQueue(ctx, pr)
requests := issues_model.PullRequestList(prs) comment, err := CreatePushPullComment(ctx, doer, pr, oldCommitID, newCommitID)
if err = requests.LoadAttributes(ctx); err != nil { if err == nil && comment != nil {
log.Error("PullRequestList.LoadAttributes: %v", err) notify_service.PullRequestPushCommits(ctx, doer, pr, comment)
} }
if invalidationErr := checkForInvalidation(ctx, requests, repoID, doer, branch); invalidationErr != nil { }
log.Error("checkForInvalidation: %v", invalidationErr)
}
if err == nil {
for _, pr := range prs {
objectFormat, _ := git.GetObjectFormatOfRepo(ctx, pr.BaseRepo.RepoPath())
if newCommitID != "" && newCommitID != objectFormat.EmptyObjectID().String() {
changed, err := checkIfPRContentChanged(ctx, pr, oldCommitID, newCommitID)
if err != nil {
log.Error("checkIfPRContentChanged: %v", err)
}
if changed {
// Mark old reviews as stale if diff to mergebase has changed
if err := issues_model.MarkReviewsAsStale(ctx, pr.IssueID); err != nil {
log.Error("MarkReviewsAsStale: %v", err)
}
// dismiss all approval reviews if protected branch rule item enabled. if isSync {
pb, err := git_model.GetFirstMatchProtectedBranchRule(ctx, pr.BaseRepoID, pr.BaseBranch) requests := issues_model.PullRequestList(prs)
if err != nil { if err = requests.LoadAttributes(ctx); err != nil {
log.Error("GetFirstMatchProtectedBranchRule: %v", err) log.Error("PullRequestList.LoadAttributes: %v", err)
} }
if pb != nil && pb.DismissStaleApprovals { if invalidationErr := checkForInvalidation(ctx, requests, repoID, doer, branch); invalidationErr != nil {
if err := DismissApprovalReviews(ctx, doer, pr); err != nil { log.Error("checkForInvalidation: %v", invalidationErr)
log.Error("DismissApprovalReviews: %v", err) }
} if err == nil {
} for _, pr := range prs {
objectFormat, _ := git.GetObjectFormatOfRepo(ctx, pr.BaseRepo.RepoPath())
if newCommitID != "" && newCommitID != objectFormat.EmptyObjectID().String() {
changed, err := checkIfPRContentChanged(ctx, pr, oldCommitID, newCommitID)
if err != nil {
log.Error("checkIfPRContentChanged: %v", err)
}
if changed {
// Mark old reviews as stale if diff to mergebase has changed
if err := issues_model.MarkReviewsAsStale(ctx, pr.IssueID); err != nil {
log.Error("MarkReviewsAsStale: %v", err)
} }
if err := issues_model.MarkReviewsAsNotStale(ctx, pr.IssueID, newCommitID); err != nil {
log.Error("MarkReviewsAsNotStale: %v", err) // dismiss all approval reviews if protected branch rule item enabled.
} pb, err := git_model.GetFirstMatchProtectedBranchRule(ctx, pr.BaseRepoID, pr.BaseBranch)
divergence, err := GetDiverging(ctx, pr)
if err != nil { if err != nil {
log.Error("GetDiverging: %v", err) log.Error("GetFirstMatchProtectedBranchRule: %v", err)
} else { }
err = pr.UpdateCommitDivergence(ctx, divergence.Ahead, divergence.Behind) if pb != nil && pb.DismissStaleApprovals {
if err != nil { if err := DismissApprovalReviews(ctx, doer, pr); err != nil {
log.Error("UpdateCommitDivergence: %v", err) log.Error("DismissApprovalReviews: %v", err)
} }
} }
} }
if err := issues_model.MarkReviewsAsNotStale(ctx, pr.IssueID, newCommitID); err != nil {
notify_service.PullRequestSynchronized(ctx, doer, pr) log.Error("MarkReviewsAsNotStale: %v", err)
}
divergence, err := GetDiverging(ctx, pr)
if err != nil {
log.Error("GetDiverging: %v", err)
} else {
err = pr.UpdateCommitDivergence(ctx, divergence.Ahead, divergence.Behind)
if err != nil {
log.Error("UpdateCommitDivergence: %v", err)
}
}
} }
notify_service.PullRequestSynchronized(ctx, doer, pr)
} }
} }
}
log.Trace("AddTestPullRequestTask [base_repo_id: %d, base_branch: %s]: finding pull requests", repoID, branch) log.Trace("TestPullRequest [base_repo_id: %d, base_branch: %s]: finding pull requests", repoID, branch)
prs, err = issues_model.GetUnmergedPullRequestsByBaseInfo(ctx, repoID, branch) prs, err = issues_model.GetUnmergedPullRequestsByBaseInfo(ctx, repoID, branch)
if err != nil {
log.Error("Find pull requests [base_repo_id: %d, base_branch: %s]: %v", repoID, branch, err)
return
}
for _, pr := range prs {
divergence, err := GetDiverging(ctx, pr)
if err != nil { if err != nil {
log.Error("Find pull requests [base_repo_id: %d, base_branch: %s]: %v", repoID, branch, err) if git_model.IsErrBranchNotExist(err) && !git.IsBranchExist(ctx, pr.HeadRepo.RepoPath(), pr.HeadBranch) {
return log.Warn("Cannot test PR %s/%d: head_branch %s no longer exists", pr.BaseRepo.Name, pr.IssueID, pr.HeadBranch)
}
for _, pr := range prs {
divergence, err := GetDiverging(ctx, pr)
if err != nil {
if git_model.IsErrBranchNotExist(err) && !git.IsBranchExist(ctx, pr.HeadRepo.RepoPath(), pr.HeadBranch) {
log.Warn("Cannot test PR %s/%d: head_branch %s no longer exists", pr.BaseRepo.Name, pr.IssueID, pr.HeadBranch)
} else {
log.Error("GetDiverging: %v", err)
}
} else { } else {
err = pr.UpdateCommitDivergence(ctx, divergence.Ahead, divergence.Behind) log.Error("GetDiverging: %v", err)
if err != nil { }
log.Error("UpdateCommitDivergence: %v", err) } else {
} err = pr.UpdateCommitDivergence(ctx, divergence.Ahead, divergence.Behind)
if err != nil {
log.Error("UpdateCommitDivergence: %v", err)
} }
AddToTaskQueue(ctx, pr)
} }
}) AddToTaskQueue(ctx, pr)
}
} }
// checkIfPRContentChanged checks if diff to target branch has changed by push // checkIfPRContentChanged checks if diff to target branch has changed by push

View file

@ -36,7 +36,7 @@ func Update(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.
if rebase { if rebase {
defer func() { defer func() {
go AddTestPullRequestTask(doer, pr.BaseRepo.ID, pr.BaseBranch, false, "", "") AddTestPullRequestTask(ctx, doer, pr.BaseRepo.ID, pr.BaseBranch, false, "", "")
}() }()
return updateHeadByRebaseOnToBase(ctx, pr, doer, message) return updateHeadByRebaseOnToBase(ctx, pr, doer, message)
@ -75,7 +75,7 @@ func Update(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.
_, err = doMergeAndPush(ctx, reversePR, doer, repo_model.MergeStyleMerge, "", message) _, err = doMergeAndPush(ctx, reversePR, doer, repo_model.MergeStyleMerge, "", message)
defer func() { defer func() {
go AddTestPullRequestTask(doer, reversePR.HeadRepo.ID, reversePR.HeadBranch, false, "", "") AddTestPullRequestTask(ctx, doer, reversePR.HeadRepo.ID, reversePR.HeadBranch, false, "", "")
}() }()
return err return err

View file

@ -171,7 +171,7 @@ func pushUpdates(optsList []*repo_module.PushUpdateOptions) error {
branch := opts.RefFullName.BranchName() branch := opts.RefFullName.BranchName()
if !opts.IsDelRef() { if !opts.IsDelRef() {
log.Trace("TriggerTask '%s/%s' by %s", repo.Name, branch, pusher.Name) log.Trace("TriggerTask '%s/%s' by %s", repo.Name, branch, pusher.Name)
go pull_service.AddTestPullRequestTask(pusher, repo.ID, branch, true, opts.OldCommitID, opts.NewCommitID) pull_service.AddTestPullRequestTask(ctx, pusher, repo.ID, branch, true, opts.OldCommitID, opts.NewCommitID)
newCommit, err := gitRepo.GetCommit(opts.NewCommitID) newCommit, err := gitRepo.GetCommit(opts.NewCommitID)
if err != nil { if err != nil {

View file

@ -0,0 +1,86 @@
// Copyright 2024 The Forgejo Authors
// SPDX-License-Identifier: MIT
package integration
import (
"context"
"testing"
"time"
issues_model "code.gitea.io/gitea/models/issues"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/models/unittest"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/log"
repo_module "code.gitea.io/gitea/modules/repository"
"code.gitea.io/gitea/modules/test"
pull_service "code.gitea.io/gitea/services/pull"
repo_service "code.gitea.io/gitea/services/repository"
"code.gitea.io/gitea/tests"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestPullRequestSynchronized(t *testing.T) {
defer tests.PrepareTestEnv(t)()
// unmerged pull request of user2/repo1 from branch2 to master
pull := unittest.AssertExistsAndLoadBean(t, &issues_model.PullRequest{ID: 2})
// tip of tests/gitea-repositories-meta/user2/repo1 branch2
pull.HeadCommitID = "985f0301dba5e7b34be866819cd15ad3d8f508ee"
require.Equal(t, pull.HeadRepoID, pull.BaseRepoID)
repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: pull.HeadRepoID})
owner := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: repo.OwnerID})
t.Run("AddTestPullRequestTask", func(t *testing.T) {
logChecker, cleanup := test.NewLogChecker(log.DEFAULT, log.TRACE)
logChecker.Filter("Updating PR").StopMark("TestPullRequest ")
defer cleanup()
opt := &repo_module.PushUpdateOptions{
PusherID: owner.ID,
PusherName: owner.Name,
RepoUserName: owner.Name,
RepoName: repo.Name,
RefFullName: git.RefName("refs/heads/branch2"),
OldCommitID: pull.HeadCommitID,
NewCommitID: pull.HeadCommitID,
}
require.NoError(t, repo_service.PushUpdate(opt))
logFiltered, logStopped := logChecker.Check(5 * time.Second)
assert.True(t, logStopped)
assert.True(t, logFiltered[0])
})
for _, testCase := range []struct {
name string
maxPR int64
expected bool
}{
{
name: "TestPullRequest process PR",
maxPR: pull.Index,
expected: true,
},
{
name: "TestPullRequest skip PR",
maxPR: pull.Index - 1,
expected: false,
},
} {
t.Run(testCase.name, func(t *testing.T) {
logChecker, cleanup := test.NewLogChecker(log.DEFAULT, log.TRACE)
logChecker.Filter("Updating PR").StopMark("TestPullRequest ")
defer cleanup()
pull_service.TestPullRequest(context.Background(), owner, repo.ID, testCase.maxPR, "branch2", true, pull.HeadCommitID, pull.HeadCommitID)
logFiltered, logStopped := logChecker.Check(5 * time.Second)
assert.True(t, logStopped)
assert.Equal(t, testCase.expected, logFiltered[0])
})
}
}