fix: cancel in-progress deploy on new webhook trigger (closes #38) #53
@ -345,10 +345,12 @@ func (h *Handlers) HandleAppDeploy() http.HandlerFunc {
|
||||
}
|
||||
}(deployCtx, application)
|
||||
|
||||
redirectURL := "/apps/" + application.ID + "/deployments"
|
||||
|
||||
http.Redirect(
|
||||
writer,
|
||||
request,
|
||||
"/apps/"+application.ID+"/deployments",
|
||||
redirectURL,
|
||||
http.StatusSeeOther,
|
||||
)
|
||||
}
|
||||
|
||||
172
internal/service/deploy/cancel_test.go
Normal file
172
internal/service/deploy/cancel_test.go
Normal file
@ -0,0 +1,172 @@
|
||||
package deploy_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/fx"
|
||||
|
||||
"git.eeqj.de/sneak/upaas/internal/config"
|
||||
"git.eeqj.de/sneak/upaas/internal/database"
|
||||
"git.eeqj.de/sneak/upaas/internal/docker"
|
||||
"git.eeqj.de/sneak/upaas/internal/globals"
|
||||
"git.eeqj.de/sneak/upaas/internal/logger"
|
||||
"git.eeqj.de/sneak/upaas/internal/models"
|
||||
"git.eeqj.de/sneak/upaas/internal/service/deploy"
|
||||
"git.eeqj.de/sneak/upaas/internal/service/notify"
|
||||
)
|
||||
|
||||
func setupDeployService(t *testing.T) (*deploy.Service, *database.Database) {
|
||||
t.Helper()
|
||||
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
globals.SetAppname("upaas-test")
|
||||
globals.SetVersion("test")
|
||||
|
||||
globalsInst, err := globals.New(fx.Lifecycle(nil))
|
||||
require.NoError(t, err)
|
||||
|
||||
loggerInst, err := logger.New(fx.Lifecycle(nil), logger.Params{Globals: globalsInst})
|
||||
require.NoError(t, err)
|
||||
|
||||
cfg := &config.Config{Port: 8080, DataDir: tmpDir, SessionSecret: "test-secret-key-at-least-32-chars"}
|
||||
|
||||
dbInst, err := database.New(fx.Lifecycle(nil), database.Params{Logger: loggerInst, Config: cfg})
|
||||
require.NoError(t, err)
|
||||
|
||||
dockerClient, err := docker.New(fx.Lifecycle(nil), docker.Params{Logger: loggerInst, Config: cfg})
|
||||
require.NoError(t, err)
|
||||
|
||||
notifySvc, err := notify.New(fx.Lifecycle(nil), notify.ServiceParams{Logger: loggerInst})
|
||||
require.NoError(t, err)
|
||||
|
||||
svc, err := deploy.New(fx.Lifecycle(nil), deploy.ServiceParams{
|
||||
Logger: loggerInst, Config: cfg, Database: dbInst, Docker: dockerClient, Notify: notifySvc,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
return svc, dbInst
|
||||
}
|
||||
|
||||
func createDeployTestApp(t *testing.T, dbInst *database.Database) *models.App {
|
||||
t.Helper()
|
||||
|
||||
app := models.NewApp(dbInst)
|
||||
app.ID = "test-cancel-app"
|
||||
app.Name = "test-cancel-app"
|
||||
app.RepoURL = "git@example.com:user/repo.git"
|
||||
app.Branch = "main"
|
||||
app.DockerfilePath = "Dockerfile"
|
||||
app.SSHPrivateKey = "private-key"
|
||||
app.SSHPublicKey = "public-key"
|
||||
app.Status = models.AppStatusPending
|
||||
|
||||
err := app.Save(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
return app
|
||||
}
|
||||
|
||||
func TestConcurrentDeploysOnSameApp(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
svc, dbInst := setupDeployService(t)
|
||||
app := createDeployTestApp(t, dbInst)
|
||||
|
||||
// Run two deploys concurrently - both will fail (no docker) but we verify
|
||||
// the lock mechanism works (no panics, no data races)
|
||||
var wg sync.WaitGroup
|
||||
|
||||
results := make([]error, 2)
|
||||
|
||||
for i := range 2 {
|
||||
wg.Add(1)
|
||||
|
||||
go func(idx int) {
|
||||
defer wg.Done()
|
||||
|
||||
results[idx] = svc.Deploy(context.Background(), app, nil)
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// At least one should have run (and failed due to docker not connected)
|
||||
// The other either ran too or got ErrDeploymentInProgress
|
||||
gotInProgress := false
|
||||
gotOther := false
|
||||
|
||||
for _, err := range results {
|
||||
require.Error(t, err)
|
||||
|
||||
if errors.Is(err, deploy.ErrDeploymentInProgress) {
|
||||
gotInProgress = true
|
||||
} else {
|
||||
gotOther = true
|
||||
}
|
||||
}
|
||||
|
||||
// At least one must have actually attempted the deploy
|
||||
assert.True(t, gotOther, "at least one deploy should have attempted execution")
|
||||
|
||||
// If timing worked out, one should have been rejected
|
||||
// (but this is racy - both might complete sequentially)
|
||||
_ = gotInProgress
|
||||
}
|
||||
|
||||
func TestCancelAppDeployReturnsFalseWhenNoDeploy(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
svc, _ := setupDeployService(t)
|
||||
|
||||
// No deploy in progress, should return false
|
||||
cancelled := svc.CancelAppDeploy("nonexistent-app")
|
||||
assert.False(t, cancelled)
|
||||
}
|
||||
|
||||
func TestCancelAppDeployCancelsInProgressDeploy(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
svc, dbInst := setupDeployService(t)
|
||||
app := createDeployTestApp(t, dbInst)
|
||||
|
||||
deployStarted := make(chan struct{})
|
||||
deployDone := make(chan error, 1)
|
||||
|
||||
// Start a deploy that will hold the lock
|
||||
go func() {
|
||||
// Signal when deploy starts (will acquire lock quickly)
|
||||
close(deployStarted)
|
||||
|
||||
deployDone <- svc.Deploy(context.Background(), app, nil)
|
||||
}()
|
||||
|
||||
<-deployStarted
|
||||
// Give it time to acquire the lock and register the cancel func
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Cancel should work if deploy is still in progress
|
||||
_ = svc.CancelAppDeploy(app.ID)
|
||||
|
||||
// Wait for deploy to finish
|
||||
select {
|
||||
case <-deployDone:
|
||||
// Deploy finished (either cancelled or failed for other reason)
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("deploy did not finish after cancellation")
|
||||
}
|
||||
}
|
||||
|
||||
func TestErrDeployCancelledExists(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Verify the sentinel error exists
|
||||
require.Error(t, deploy.ErrDeployCancelled)
|
||||
assert.Equal(t, "deployment cancelled by newer deploy", deploy.ErrDeployCancelled.Error())
|
||||
}
|
||||
@ -47,6 +47,8 @@ var (
|
||||
ErrBuildTimeout = errors.New("build timeout exceeded")
|
||||
// ErrDeployTimeout indicates the deploy phase exceeded the timeout.
|
||||
ErrDeployTimeout = errors.New("deploy timeout exceeded")
|
||||
// ErrDeployCancelled indicates the deployment was cancelled by a newer deploy.
|
||||
ErrDeployCancelled = errors.New("deployment cancelled by newer deploy")
|
||||
)
|
||||
|
||||
// logFlushInterval is how often to flush buffered logs to the database.
|
||||
@ -212,8 +214,9 @@ type Service struct {
|
||||
docker *docker.Client
|
||||
notify *notify.Service
|
||||
config *config.Config
|
||||
params *ServiceParams
|
||||
appLocks sync.Map // map[string]*sync.Mutex - per-app deployment locks
|
||||
params *ServiceParams
|
||||
appLocks sync.Map // map[string]*sync.Mutex - per-app deployment locks
|
||||
appCancels sync.Map // map[string]context.CancelFunc - per-app deploy cancellation
|
||||
}
|
||||
|
||||
// New creates a new deploy Service.
|
||||
@ -274,6 +277,21 @@ func (svc *Service) GetLogFilePath(app *models.App, deployment *models.Deploymen
|
||||
return filepath.Join(svc.config.DataDir, "logs", hostname, app.Name, filename)
|
||||
}
|
||||
|
||||
// CancelAppDeploy cancels any in-progress deployment for the given app.
|
||||
// It returns true if a deployment was cancelled, false if none was in progress.
|
||||
func (svc *Service) CancelAppDeploy(appID string) bool {
|
||||
if cancelVal, ok := svc.appCancels.Load(appID); ok {
|
||||
if cancelFn, ok := cancelVal.(context.CancelFunc); ok {
|
||||
svc.log.Info("cancelling in-progress deployment", "app_id", appID)
|
||||
cancelFn()
|
||||
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// Deploy deploys an app.
|
||||
func (svc *Service) Deploy(
|
||||
ctx context.Context,
|
||||
@ -288,6 +306,14 @@ func (svc *Service) Deploy(
|
||||
}
|
||||
defer svc.unlockApp(app.ID)
|
||||
|
||||
// Create a cancellable context so in-progress deploys can be cancelled
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
// Store the cancel func so other deploys can cancel this one
|
||||
svc.appCancels.Store(app.ID, cancel)
|
||||
defer svc.appCancels.Delete(app.ID)
|
||||
|
||||
// Fetch webhook event and create deployment record
|
||||
webhookEvent := svc.fetchWebhookEvent(ctx, webhookEventID)
|
||||
|
||||
@ -349,6 +375,13 @@ func (svc *Service) buildImageWithTimeout(
|
||||
return "", timeoutErr
|
||||
}
|
||||
|
||||
if errors.Is(ctx.Err(), context.Canceled) {
|
||||
cancelErr := fmt.Errorf("%w", ErrDeployCancelled)
|
||||
svc.failDeployment(context.WithoutCancel(ctx), app, deployment, cancelErr)
|
||||
|
||||
return "", cancelErr
|
||||
}
|
||||
|
||||
return "", err
|
||||
}
|
||||
|
||||
@ -384,6 +417,13 @@ func (svc *Service) deployContainerWithTimeout(
|
||||
return timeoutErr
|
||||
}
|
||||
|
||||
if errors.Is(ctx.Err(), context.Canceled) {
|
||||
cancelErr := fmt.Errorf("%w", ErrDeployCancelled)
|
||||
svc.failDeployment(context.WithoutCancel(ctx), app, deployment, cancelErr)
|
||||
|
||||
return cancelErr
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@ -5,8 +5,10 @@ import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"go.uber.org/fx"
|
||||
|
||||
@ -129,6 +131,12 @@ func (svc *Service) HandleWebhook(
|
||||
return nil
|
||||
}
|
||||
|
||||
// cancelRetryDelay is the time to wait after cancelling a deploy before retrying.
|
||||
const cancelRetryDelay = 2 * time.Second
|
||||
|
||||
// cancelRetryAttempts is the maximum number of times to retry after cancelling.
|
||||
const cancelRetryAttempts = 30
|
||||
|
||||
func (svc *Service) triggerDeployment(
|
||||
ctx context.Context,
|
||||
app *models.App,
|
||||
@ -144,6 +152,25 @@ func (svc *Service) triggerDeployment(
|
||||
deployCtx := context.WithoutCancel(ctx)
|
||||
|
||||
deployErr := svc.deploy.Deploy(deployCtx, app, &eventID)
|
||||
if deployErr != nil && errors.Is(deployErr, deploy.ErrDeploymentInProgress) {
|
||||
// Cancel the in-progress deployment and retry
|
||||
svc.log.Info("cancelling in-progress deployment for new webhook trigger", "app", appName)
|
||||
svc.deploy.CancelAppDeploy(app.ID)
|
||||
|
||||
// Retry until the lock is released by the cancelled deploy
|
||||
for attempt := range cancelRetryAttempts {
|
||||
time.Sleep(cancelRetryDelay)
|
||||
|
||||
svc.log.Info("retrying deployment after cancel",
|
||||
"app", appName, "attempt", attempt+1)
|
||||
|
||||
deployErr = svc.deploy.Deploy(deployCtx, app, &eventID)
|
||||
if deployErr == nil || !errors.Is(deployErr, deploy.ErrDeploymentInProgress) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if deployErr != nil {
|
||||
svc.log.Error("deployment failed", "error", deployErr, "app", appName)
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user