Fix auto concurrency cancellation skips commit status updates (#33764)

* add missing commit status
* conflicts with concurrency support

Closes #33763

Co-authored-by: Giteabot <teabot@gitea.io>
pull/33845/head
ChristopherHX 3 days ago committed by GitHub
parent e47bba046c
commit a92d5f65ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -194,7 +194,7 @@ func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) err
// CancelPreviousJobs cancels all previous jobs of the same repository, reference, workflow, and event. // CancelPreviousJobs cancels all previous jobs of the same repository, reference, workflow, and event.
// It's useful when a new run is triggered, and all previous runs needn't be continued anymore. // It's useful when a new run is triggered, and all previous runs needn't be continued anymore.
func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error { func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) ([]*ActionRunJob, error) {
// Find all runs in the specified repository, reference, and workflow with non-final status // Find all runs in the specified repository, reference, and workflow with non-final status
runs, total, err := db.FindAndCount[ActionRun](ctx, FindRunOptions{ runs, total, err := db.FindAndCount[ActionRun](ctx, FindRunOptions{
RepoID: repoID, RepoID: repoID,
@ -204,14 +204,16 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
Status: []Status{StatusRunning, StatusWaiting, StatusBlocked}, Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
}) })
if err != nil { if err != nil {
return err return nil, err
} }
// If there are no runs found, there's no need to proceed with cancellation, so return nil. // If there are no runs found, there's no need to proceed with cancellation, so return nil.
if total == 0 { if total == 0 {
return nil return nil, nil
} }
cancelledJobs := make([]*ActionRunJob, 0, total)
// Iterate over each found run and cancel its associated jobs. // Iterate over each found run and cancel its associated jobs.
for _, run := range runs { for _, run := range runs {
// Find all jobs associated with the current run. // Find all jobs associated with the current run.
@ -219,7 +221,7 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
RunID: run.ID, RunID: run.ID,
}) })
if err != nil { if err != nil {
return err return cancelledJobs, err
} }
// Iterate over each job and attempt to cancel it. // Iterate over each job and attempt to cancel it.
@ -238,27 +240,29 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
// Update the job's status and stopped time in the database. // Update the job's status and stopped time in the database.
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped") n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
if err != nil { if err != nil {
return err return cancelledJobs, err
} }
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again. // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
if n == 0 { if n == 0 {
return fmt.Errorf("job has changed, try again") return cancelledJobs, fmt.Errorf("job has changed, try again")
} }
cancelledJobs = append(cancelledJobs, job)
// Continue with the next job. // Continue with the next job.
continue continue
} }
// If the job has an associated task, try to stop the task, effectively cancelling the job. // If the job has an associated task, try to stop the task, effectively cancelling the job.
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil { if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
return err return cancelledJobs, err
} }
cancelledJobs = append(cancelledJobs, job)
} }
} }
// Return nil to indicate successful cancellation of all running and waiting jobs. // Return nil to indicate successful cancellation of all running and waiting jobs.
return nil return cancelledJobs, nil
} }
// InsertRun inserts a run // InsertRun inserts a run

@ -117,21 +117,22 @@ func DeleteScheduleTaskByRepo(ctx context.Context, id int64) error {
return committer.Commit() return committer.Commit()
} }
func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository) error { func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository) ([]*ActionRunJob, error) {
// If actions disabled when there is schedule task, this will remove the outdated schedule tasks // If actions disabled when there is schedule task, this will remove the outdated schedule tasks
// There is no other place we can do this because the app.ini will be changed manually // There is no other place we can do this because the app.ini will be changed manually
if err := DeleteScheduleTaskByRepo(ctx, repo.ID); err != nil { if err := DeleteScheduleTaskByRepo(ctx, repo.ID); err != nil {
return fmt.Errorf("DeleteCronTaskByRepo: %v", err) return nil, fmt.Errorf("DeleteCronTaskByRepo: %v", err)
} }
// cancel running cron jobs of this repository and delete old schedules // cancel running cron jobs of this repository and delete old schedules
if err := CancelPreviousJobs( jobs, err := CancelPreviousJobs(
ctx, ctx,
repo.ID, repo.ID,
repo.DefaultBranch, repo.DefaultBranch,
"", "",
webhook_module.HookEventSchedule, webhook_module.HookEventSchedule,
); err != nil { )
return fmt.Errorf("CancelPreviousJobs: %v", err) if err != nil {
return jobs, fmt.Errorf("CancelPreviousJobs: %v", err)
} }
return nil return jobs, nil
} }

@ -12,7 +12,6 @@ import (
"strings" "strings"
"time" "time"
actions_model "code.gitea.io/gitea/models/actions"
activities_model "code.gitea.io/gitea/models/activities" activities_model "code.gitea.io/gitea/models/activities"
"code.gitea.io/gitea/models/db" "code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/models/organization" "code.gitea.io/gitea/models/organization"
@ -1049,7 +1048,7 @@ func updateRepoArchivedState(ctx *context.APIContext, opts api.EditRepoOption) e
ctx.APIErrorInternal(err) ctx.APIErrorInternal(err)
return err return err
} }
if err := actions_model.CleanRepoScheduleTasks(ctx, repo); err != nil { if err := actions_service.CleanRepoScheduleTasks(ctx, repo); err != nil {
log.Error("CleanRepoScheduleTasks for archived repo %s/%s: %v", ctx.Repo.Owner.Name, repo.Name, err) log.Error("CleanRepoScheduleTasks for archived repo %s/%s: %v", ctx.Repo.Owner.Name, repo.Name, err)
} }
log.Trace("Repository was archived: %s/%s", ctx.Repo.Owner.Name, repo.Name) log.Trace("Repository was archived: %s/%s", ctx.Repo.Owner.Name, repo.Name)

@ -11,7 +11,6 @@ import (
"strings" "strings"
"time" "time"
actions_model "code.gitea.io/gitea/models/actions"
"code.gitea.io/gitea/models/db" "code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/models/organization" "code.gitea.io/gitea/models/organization"
"code.gitea.io/gitea/models/perm" "code.gitea.io/gitea/models/perm"
@ -906,7 +905,7 @@ func SettingsPost(ctx *context.Context) {
return return
} }
if err := actions_model.CleanRepoScheduleTasks(ctx, repo); err != nil { if err := actions_service.CleanRepoScheduleTasks(ctx, repo); err != nil {
log.Error("CleanRepoScheduleTasks for archived repo %s/%s: %v", ctx.Repo.Owner.Name, repo.Name, err) log.Error("CleanRepoScheduleTasks for archived repo %s/%s: %v", ctx.Repo.Owner.Name, repo.Name, err)
} }

@ -10,10 +10,12 @@ import (
actions_model "code.gitea.io/gitea/models/actions" actions_model "code.gitea.io/gitea/models/actions"
"code.gitea.io/gitea/models/db" "code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/modules/actions" "code.gitea.io/gitea/modules/actions"
"code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/timeutil"
webhook_module "code.gitea.io/gitea/modules/webhook"
) )
// StopZombieTasks stops the task which have running status, but haven't been updated for a long time // StopZombieTasks stops the task which have running status, but haven't been updated for a long time
@ -32,6 +34,24 @@ func StopEndlessTasks(ctx context.Context) error {
}) })
} }
func notifyWorkflowJobStatusUpdate(ctx context.Context, jobs []*actions_model.ActionRunJob) {
if len(jobs) > 0 {
CreateCommitStatus(ctx, jobs...)
}
}
func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error {
jobs, err := actions_model.CancelPreviousJobs(ctx, repoID, ref, workflowID, event)
notifyWorkflowJobStatusUpdate(ctx, jobs)
return err
}
func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository) error {
jobs, err := actions_model.CleanRepoScheduleTasks(ctx, repo)
notifyWorkflowJobStatusUpdate(ctx, jobs)
return err
}
func stopTasks(ctx context.Context, opts actions_model.FindTaskOptions) error { func stopTasks(ctx context.Context, opts actions_model.FindTaskOptions) error {
tasks, err := db.Find[actions_model.ActionTask](ctx, opts) tasks, err := db.Find[actions_model.ActionTask](ctx, opts)
if err != nil { if err != nil {
@ -67,7 +87,7 @@ func stopTasks(ctx context.Context, opts actions_model.FindTaskOptions) error {
remove() remove()
} }
CreateCommitStatus(ctx, jobs...) notifyWorkflowJobStatusUpdate(ctx, jobs)
return nil return nil
} }

@ -136,7 +136,7 @@ func notify(ctx context.Context, input *notifyInput) error {
return nil return nil
} }
if unit_model.TypeActions.UnitGlobalDisabled() { if unit_model.TypeActions.UnitGlobalDisabled() {
if err := actions_model.CleanRepoScheduleTasks(ctx, input.Repo); err != nil { if err := CleanRepoScheduleTasks(ctx, input.Repo); err != nil {
log.Error("CleanRepoScheduleTasks: %v", err) log.Error("CleanRepoScheduleTasks: %v", err)
} }
return nil return nil
@ -341,7 +341,7 @@ func handleWorkflows(
// cancel running jobs if the event is push or pull_request_sync // cancel running jobs if the event is push or pull_request_sync
if run.Event == webhook_module.HookEventPush || if run.Event == webhook_module.HookEventPush ||
run.Event == webhook_module.HookEventPullRequestSync { run.Event == webhook_module.HookEventPullRequestSync {
if err := actions_model.CancelPreviousJobs( if err := CancelPreviousJobs(
ctx, ctx,
run.RepoID, run.RepoID,
run.Ref, run.Ref,
@ -472,7 +472,7 @@ func handleSchedules(
log.Error("CountSchedules: %v", err) log.Error("CountSchedules: %v", err)
return err return err
} else if count > 0 { } else if count > 0 {
if err := actions_model.CleanRepoScheduleTasks(ctx, input.Repo); err != nil { if err := CleanRepoScheduleTasks(ctx, input.Repo); err != nil {
log.Error("CleanRepoScheduleTasks: %v", err) log.Error("CleanRepoScheduleTasks: %v", err)
} }
} }

@ -55,7 +55,7 @@ func startTasks(ctx context.Context) error {
// cancel running jobs if the event is push // cancel running jobs if the event is push
if row.Schedule.Event == webhook_module.HookEventPush { if row.Schedule.Event == webhook_module.HookEventPush {
// cancel running jobs of the same workflow // cancel running jobs of the same workflow
if err := actions_model.CancelPreviousJobs( if err := CancelPreviousJobs(
ctx, ctx,
row.RepoID, row.RepoID,
row.Schedule.Ref, row.Schedule.Ref,

@ -256,7 +256,7 @@ func DispatchActionWorkflow(ctx reqctx.RequestContext, doer *user_model.User, re
} }
// cancel running jobs of the same workflow // cancel running jobs of the same workflow
if err := actions_model.CancelPreviousJobs( if err := CancelPreviousJobs(
ctx, ctx,
run.RepoID, run.RepoID,
run.Ref, run.Ref,

@ -30,6 +30,7 @@ import (
"code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/modules/util"
webhook_module "code.gitea.io/gitea/modules/webhook" webhook_module "code.gitea.io/gitea/modules/webhook"
actions_service "code.gitea.io/gitea/services/actions"
notify_service "code.gitea.io/gitea/services/notify" notify_service "code.gitea.io/gitea/services/notify"
release_service "code.gitea.io/gitea/services/release" release_service "code.gitea.io/gitea/services/release"
files_service "code.gitea.io/gitea/services/repository/files" files_service "code.gitea.io/gitea/services/repository/files"
@ -452,7 +453,7 @@ func RenameBranch(ctx context.Context, repo *repo_model.Repository, doer *user_m
log.Error("DeleteCronTaskByRepo: %v", err) log.Error("DeleteCronTaskByRepo: %v", err)
} }
// cancel running cron jobs of this repository and delete old schedules // cancel running cron jobs of this repository and delete old schedules
if err := actions_model.CancelPreviousJobs( if err := actions_service.CancelPreviousJobs(
ctx, ctx,
repo.ID, repo.ID,
from, from,
@ -639,7 +640,7 @@ func SetRepoDefaultBranch(ctx context.Context, repo *repo_model.Repository, gitR
log.Error("DeleteCronTaskByRepo: %v", err) log.Error("DeleteCronTaskByRepo: %v", err)
} }
// cancel running cron jobs of this repository and delete old schedules // cancel running cron jobs of this repository and delete old schedules
if err := actions_model.CancelPreviousJobs( if err := actions_service.CancelPreviousJobs(
ctx, ctx,
repo.ID, repo.ID,
oldDefaultBranchName, oldDefaultBranchName,

@ -7,7 +7,6 @@ import (
"context" "context"
"slices" "slices"
actions_model "code.gitea.io/gitea/models/actions"
"code.gitea.io/gitea/models/db" "code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo" repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/models/unit" "code.gitea.io/gitea/models/unit"
@ -29,7 +28,7 @@ func UpdateRepositoryUnits(ctx context.Context, repo *repo_model.Repository, uni
} }
if slices.Contains(deleteUnitTypes, unit.TypeActions) { if slices.Contains(deleteUnitTypes, unit.TypeActions) {
if err := actions_model.CleanRepoScheduleTasks(ctx, repo); err != nil { if err := actions_service.CleanRepoScheduleTasks(ctx, repo); err != nil {
log.Error("CleanRepoScheduleTasks: %v", err) log.Error("CleanRepoScheduleTasks: %v", err)
} }
} }

Loading…
Cancel
Save