fix: cancel in-progress deploy on new webhook trigger (closes #38) #53
@ -345,10 +345,12 @@ func (h *Handlers) HandleAppDeploy() http.HandlerFunc {
|
|||||||
}
|
}
|
||||||
}(deployCtx, application)
|
}(deployCtx, application)
|
||||||
|
|
||||||
|
redirectURL := "/apps/" + application.ID + "/deployments"
|
||||||
|
|
||||||
http.Redirect(
|
http.Redirect(
|
||||||
writer,
|
writer,
|
||||||
request,
|
request,
|
||||||
"/apps/"+application.ID+"/deployments",
|
redirectURL,
|
||||||
http.StatusSeeOther,
|
http.StatusSeeOther,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|||||||
172
internal/service/deploy/cancel_test.go
Normal file
172
internal/service/deploy/cancel_test.go
Normal file
@ -0,0 +1,172 @@
|
|||||||
|
package deploy_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.uber.org/fx"
|
||||||
|
|
||||||
|
"git.eeqj.de/sneak/upaas/internal/config"
|
||||||
|
"git.eeqj.de/sneak/upaas/internal/database"
|
||||||
|
"git.eeqj.de/sneak/upaas/internal/docker"
|
||||||
|
"git.eeqj.de/sneak/upaas/internal/globals"
|
||||||
|
"git.eeqj.de/sneak/upaas/internal/logger"
|
||||||
|
"git.eeqj.de/sneak/upaas/internal/models"
|
||||||
|
"git.eeqj.de/sneak/upaas/internal/service/deploy"
|
||||||
|
"git.eeqj.de/sneak/upaas/internal/service/notify"
|
||||||
|
)
|
||||||
|
|
||||||
|
func setupDeployService(t *testing.T) (*deploy.Service, *database.Database) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
tmpDir := t.TempDir()
|
||||||
|
|
||||||
|
globals.SetAppname("upaas-test")
|
||||||
|
globals.SetVersion("test")
|
||||||
|
|
||||||
|
globalsInst, err := globals.New(fx.Lifecycle(nil))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
loggerInst, err := logger.New(fx.Lifecycle(nil), logger.Params{Globals: globalsInst})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
cfg := &config.Config{Port: 8080, DataDir: tmpDir, SessionSecret: "test-secret-key-at-least-32-chars"}
|
||||||
|
|
||||||
|
dbInst, err := database.New(fx.Lifecycle(nil), database.Params{Logger: loggerInst, Config: cfg})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
dockerClient, err := docker.New(fx.Lifecycle(nil), docker.Params{Logger: loggerInst, Config: cfg})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
notifySvc, err := notify.New(fx.Lifecycle(nil), notify.ServiceParams{Logger: loggerInst})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
svc, err := deploy.New(fx.Lifecycle(nil), deploy.ServiceParams{
|
||||||
|
Logger: loggerInst, Config: cfg, Database: dbInst, Docker: dockerClient, Notify: notifySvc,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
return svc, dbInst
|
||||||
|
}
|
||||||
|
|
||||||
|
func createDeployTestApp(t *testing.T, dbInst *database.Database) *models.App {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
app := models.NewApp(dbInst)
|
||||||
|
app.ID = "test-cancel-app"
|
||||||
|
app.Name = "test-cancel-app"
|
||||||
|
app.RepoURL = "git@example.com:user/repo.git"
|
||||||
|
app.Branch = "main"
|
||||||
|
app.DockerfilePath = "Dockerfile"
|
||||||
|
app.SSHPrivateKey = "private-key"
|
||||||
|
app.SSHPublicKey = "public-key"
|
||||||
|
app.Status = models.AppStatusPending
|
||||||
|
|
||||||
|
err := app.Save(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
return app
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConcurrentDeploysOnSameApp(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
svc, dbInst := setupDeployService(t)
|
||||||
|
app := createDeployTestApp(t, dbInst)
|
||||||
|
|
||||||
|
// Run two deploys concurrently - both will fail (no docker) but we verify
|
||||||
|
// the lock mechanism works (no panics, no data races)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
results := make([]error, 2)
|
||||||
|
|
||||||
|
for i := range 2 {
|
||||||
|
wg.Add(1)
|
||||||
|
|
||||||
|
go func(idx int) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
results[idx] = svc.Deploy(context.Background(), app, nil)
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// At least one should have run (and failed due to docker not connected)
|
||||||
|
// The other either ran too or got ErrDeploymentInProgress
|
||||||
|
gotInProgress := false
|
||||||
|
gotOther := false
|
||||||
|
|
||||||
|
for _, err := range results {
|
||||||
|
require.Error(t, err)
|
||||||
|
|
||||||
|
if errors.Is(err, deploy.ErrDeploymentInProgress) {
|
||||||
|
gotInProgress = true
|
||||||
|
} else {
|
||||||
|
gotOther = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// At least one must have actually attempted the deploy
|
||||||
|
assert.True(t, gotOther, "at least one deploy should have attempted execution")
|
||||||
|
|
||||||
|
// If timing worked out, one should have been rejected
|
||||||
|
// (but this is racy - both might complete sequentially)
|
||||||
|
_ = gotInProgress
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCancelAppDeployReturnsFalseWhenNoDeploy(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
svc, _ := setupDeployService(t)
|
||||||
|
|
||||||
|
// No deploy in progress, should return false
|
||||||
|
cancelled := svc.CancelAppDeploy("nonexistent-app")
|
||||||
|
assert.False(t, cancelled)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCancelAppDeployCancelsInProgressDeploy(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
svc, dbInst := setupDeployService(t)
|
||||||
|
app := createDeployTestApp(t, dbInst)
|
||||||
|
|
||||||
|
deployStarted := make(chan struct{})
|
||||||
|
deployDone := make(chan error, 1)
|
||||||
|
|
||||||
|
// Start a deploy that will hold the lock
|
||||||
|
go func() {
|
||||||
|
// Signal when deploy starts (will acquire lock quickly)
|
||||||
|
close(deployStarted)
|
||||||
|
|
||||||
|
deployDone <- svc.Deploy(context.Background(), app, nil)
|
||||||
|
}()
|
||||||
|
|
||||||
|
<-deployStarted
|
||||||
|
// Give it time to acquire the lock and register the cancel func
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
// Cancel should work if deploy is still in progress
|
||||||
|
_ = svc.CancelAppDeploy(app.ID)
|
||||||
|
|
||||||
|
// Wait for deploy to finish
|
||||||
|
select {
|
||||||
|
case <-deployDone:
|
||||||
|
// Deploy finished (either cancelled or failed for other reason)
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatal("deploy did not finish after cancellation")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestErrDeployCancelledExists(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
// Verify the sentinel error exists
|
||||||
|
require.Error(t, deploy.ErrDeployCancelled)
|
||||||
|
assert.Equal(t, "deployment cancelled by newer deploy", deploy.ErrDeployCancelled.Error())
|
||||||
|
}
|
||||||
@ -47,6 +47,8 @@ var (
|
|||||||
ErrBuildTimeout = errors.New("build timeout exceeded")
|
ErrBuildTimeout = errors.New("build timeout exceeded")
|
||||||
// ErrDeployTimeout indicates the deploy phase exceeded the timeout.
|
// ErrDeployTimeout indicates the deploy phase exceeded the timeout.
|
||||||
ErrDeployTimeout = errors.New("deploy timeout exceeded")
|
ErrDeployTimeout = errors.New("deploy timeout exceeded")
|
||||||
|
// ErrDeployCancelled indicates the deployment was cancelled by a newer deploy.
|
||||||
|
ErrDeployCancelled = errors.New("deployment cancelled by newer deploy")
|
||||||
)
|
)
|
||||||
|
|
||||||
// logFlushInterval is how often to flush buffered logs to the database.
|
// logFlushInterval is how often to flush buffered logs to the database.
|
||||||
@ -214,6 +216,7 @@ type Service struct {
|
|||||||
config *config.Config
|
config *config.Config
|
||||||
params *ServiceParams
|
params *ServiceParams
|
||||||
appLocks sync.Map // map[string]*sync.Mutex - per-app deployment locks
|
appLocks sync.Map // map[string]*sync.Mutex - per-app deployment locks
|
||||||
|
appCancels sync.Map // map[string]context.CancelFunc - per-app deploy cancellation
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new deploy Service.
|
// New creates a new deploy Service.
|
||||||
@ -274,6 +277,21 @@ func (svc *Service) GetLogFilePath(app *models.App, deployment *models.Deploymen
|
|||||||
return filepath.Join(svc.config.DataDir, "logs", hostname, app.Name, filename)
|
return filepath.Join(svc.config.DataDir, "logs", hostname, app.Name, filename)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CancelAppDeploy cancels any in-progress deployment for the given app.
|
||||||
|
// It returns true if a deployment was cancelled, false if none was in progress.
|
||||||
|
func (svc *Service) CancelAppDeploy(appID string) bool {
|
||||||
|
if cancelVal, ok := svc.appCancels.Load(appID); ok {
|
||||||
|
if cancelFn, ok := cancelVal.(context.CancelFunc); ok {
|
||||||
|
svc.log.Info("cancelling in-progress deployment", "app_id", appID)
|
||||||
|
cancelFn()
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// Deploy deploys an app.
|
// Deploy deploys an app.
|
||||||
func (svc *Service) Deploy(
|
func (svc *Service) Deploy(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
@ -288,6 +306,14 @@ func (svc *Service) Deploy(
|
|||||||
}
|
}
|
||||||
defer svc.unlockApp(app.ID)
|
defer svc.unlockApp(app.ID)
|
||||||
|
|
||||||
|
// Create a cancellable context so in-progress deploys can be cancelled
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Store the cancel func so other deploys can cancel this one
|
||||||
|
svc.appCancels.Store(app.ID, cancel)
|
||||||
|
defer svc.appCancels.Delete(app.ID)
|
||||||
|
|
||||||
// Fetch webhook event and create deployment record
|
// Fetch webhook event and create deployment record
|
||||||
webhookEvent := svc.fetchWebhookEvent(ctx, webhookEventID)
|
webhookEvent := svc.fetchWebhookEvent(ctx, webhookEventID)
|
||||||
|
|
||||||
@ -349,6 +375,13 @@ func (svc *Service) buildImageWithTimeout(
|
|||||||
return "", timeoutErr
|
return "", timeoutErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if errors.Is(ctx.Err(), context.Canceled) {
|
||||||
|
cancelErr := fmt.Errorf("%w", ErrDeployCancelled)
|
||||||
|
svc.failDeployment(context.WithoutCancel(ctx), app, deployment, cancelErr)
|
||||||
|
|
||||||
|
return "", cancelErr
|
||||||
|
}
|
||||||
|
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -384,6 +417,13 @@ func (svc *Service) deployContainerWithTimeout(
|
|||||||
return timeoutErr
|
return timeoutErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if errors.Is(ctx.Err(), context.Canceled) {
|
||||||
|
cancelErr := fmt.Errorf("%w", ErrDeployCancelled)
|
||||||
|
svc.failDeployment(context.WithoutCancel(ctx), app, deployment, cancelErr)
|
||||||
|
|
||||||
|
return cancelErr
|
||||||
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -5,8 +5,10 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
|
"time"
|
||||||
|
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
|
|
||||||
@ -129,6 +131,12 @@ func (svc *Service) HandleWebhook(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cancelRetryDelay is the time to wait after cancelling a deploy before retrying.
|
||||||
|
const cancelRetryDelay = 2 * time.Second
|
||||||
|
|
||||||
|
// cancelRetryAttempts is the maximum number of times to retry after cancelling.
|
||||||
|
const cancelRetryAttempts = 30
|
||||||
|
|
||||||
func (svc *Service) triggerDeployment(
|
func (svc *Service) triggerDeployment(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
app *models.App,
|
app *models.App,
|
||||||
@ -144,6 +152,25 @@ func (svc *Service) triggerDeployment(
|
|||||||
deployCtx := context.WithoutCancel(ctx)
|
deployCtx := context.WithoutCancel(ctx)
|
||||||
|
|
||||||
deployErr := svc.deploy.Deploy(deployCtx, app, &eventID)
|
deployErr := svc.deploy.Deploy(deployCtx, app, &eventID)
|
||||||
|
if deployErr != nil && errors.Is(deployErr, deploy.ErrDeploymentInProgress) {
|
||||||
|
// Cancel the in-progress deployment and retry
|
||||||
|
svc.log.Info("cancelling in-progress deployment for new webhook trigger", "app", appName)
|
||||||
|
svc.deploy.CancelAppDeploy(app.ID)
|
||||||
|
|
||||||
|
// Retry until the lock is released by the cancelled deploy
|
||||||
|
for attempt := range cancelRetryAttempts {
|
||||||
|
time.Sleep(cancelRetryDelay)
|
||||||
|
|
||||||
|
svc.log.Info("retrying deployment after cancel",
|
||||||
|
"app", appName, "attempt", attempt+1)
|
||||||
|
|
||||||
|
deployErr = svc.deploy.Deploy(deployCtx, app, &eventID)
|
||||||
|
if deployErr == nil || !errors.Is(deployErr, deploy.ErrDeploymentInProgress) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if deployErr != nil {
|
if deployErr != nil {
|
||||||
svc.log.Error("deployment failed", "error", deployErr, "app", appName)
|
svc.log.Error("deployment failed", "error", deployErr, "app", appName)
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user