diff --git a/internal/handlers/app.go b/internal/handlers/app.go index 6e2ac71..7c0c4cf 100644 --- a/internal/handlers/app.go +++ b/internal/handlers/app.go @@ -314,7 +314,7 @@ func (h *Handlers) HandleAppDeploy() http.HandlerFunc { deployCtx := context.WithoutCancel(request.Context()) go func(ctx context.Context, appToDeploy *models.App) { - deployErr := h.deploy.Deploy(ctx, appToDeploy, nil) + deployErr := h.deploy.Deploy(ctx, appToDeploy, nil, false) if deployErr != nil { h.log.Error( "deployment failed", diff --git a/internal/models/deployment.go b/internal/models/deployment.go index b830e9e..492ba82 100644 --- a/internal/models/deployment.go +++ b/internal/models/deployment.go @@ -19,6 +19,7 @@ const ( DeploymentStatusDeploying DeploymentStatus = "deploying" DeploymentStatusSuccess DeploymentStatus = "success" DeploymentStatusFailed DeploymentStatus = "failed" + DeploymentStatusCancelled DeploymentStatus = "cancelled" ) // Display constants. diff --git a/internal/service/deploy/deploy.go b/internal/service/deploy/deploy.go index b51e15a..dc260e4 100644 --- a/internal/service/deploy/deploy.go +++ b/internal/service/deploy/deploy.go @@ -43,6 +43,8 @@ var ( ErrContainerUnhealthy = errors.New("container unhealthy after 60 seconds") // ErrDeploymentInProgress indicates another deployment is already running. ErrDeploymentInProgress = errors.New("deployment already in progress for this app") + // ErrDeployCancelled indicates the deployment was cancelled by a newer deploy. + ErrDeployCancelled = errors.New("deployment cancelled by newer deploy") // ErrBuildTimeout indicates the build phase exceeded the timeout. ErrBuildTimeout = errors.New("build timeout exceeded") // ErrDeployTimeout indicates the deploy phase exceeded the timeout. @@ -205,15 +207,22 @@ type ServiceParams struct { Notify *notify.Service } +// activeDeploy tracks a running deployment so it can be cancelled. +type activeDeploy struct { + cancel context.CancelFunc + done chan struct{} +} + // Service provides deployment functionality. type Service struct { - log *slog.Logger - db *database.Database - docker *docker.Client - notify *notify.Service - config *config.Config - params *ServiceParams - appLocks sync.Map // map[string]*sync.Mutex - per-app deployment locks + log *slog.Logger + db *database.Database + docker *docker.Client + notify *notify.Service + config *config.Config + params *ServiceParams + activeDeploys sync.Map // map[string]*activeDeploy - per-app active deployment tracking + appLocks sync.Map // map[string]*sync.Mutex - per-app deployment locks } // New creates a new deploy Service. @@ -274,12 +283,19 @@ func (svc *Service) GetLogFilePath(app *models.App, deployment *models.Deploymen return filepath.Join(svc.config.DataDir, "logs", hostname, app.Name, filename) } -// Deploy deploys an app. +// Deploy deploys an app. If cancelExisting is true (e.g. webhook-triggered), +// any in-progress deploy for the same app will be cancelled before starting. +// If cancelExisting is false and a deploy is in progress, ErrDeploymentInProgress is returned. func (svc *Service) Deploy( ctx context.Context, app *models.App, webhookEventID *int64, + cancelExisting bool, ) error { + if cancelExisting { + svc.cancelActiveDeploy(app.ID) + } + // Try to acquire per-app deployment lock if !svc.tryLockApp(app.ID) { svc.log.Warn("deployment already in progress", "app", app.Name) @@ -288,45 +304,80 @@ func (svc *Service) Deploy( } defer svc.unlockApp(app.ID) + // Set up cancellable context and register as active deploy + deployCtx, cancel := context.WithCancel(ctx) + done := make(chan struct{}) + ad := &activeDeploy{cancel: cancel, done: done} + svc.activeDeploys.Store(app.ID, ad) + + defer func() { + cancel() + close(done) + svc.activeDeploys.Delete(app.ID) + }() + // Fetch webhook event and create deployment record - webhookEvent := svc.fetchWebhookEvent(ctx, webhookEventID) + webhookEvent := svc.fetchWebhookEvent(deployCtx, webhookEventID) - deployment, err := svc.createDeploymentRecord(ctx, app, webhookEventID, webhookEvent) + // Use a background context for DB operations that must complete regardless of cancellation + bgCtx := context.WithoutCancel(deployCtx) + + deployment, err := svc.createDeploymentRecord(bgCtx, app, webhookEventID, webhookEvent) if err != nil { return err } - svc.logWebhookPayload(ctx, deployment, webhookEvent) + svc.logWebhookPayload(bgCtx, deployment, webhookEvent) - err = svc.updateAppStatusBuilding(ctx, app) + err = svc.updateAppStatusBuilding(bgCtx, app) if err != nil { return err } - svc.notify.NotifyBuildStart(ctx, app, deployment) + svc.notify.NotifyBuildStart(bgCtx, app, deployment) + return svc.runBuildAndDeploy(deployCtx, bgCtx, app, deployment) +} + +// runBuildAndDeploy executes the build and deploy phases, handling cancellation. +func (svc *Service) runBuildAndDeploy( + deployCtx context.Context, + bgCtx context.Context, + app *models.App, + deployment *models.Deployment, +) error { // Build phase with timeout - imageID, err := svc.buildImageWithTimeout(ctx, app, deployment) + imageID, err := svc.buildImageWithTimeout(deployCtx, app, deployment) if err != nil { + cancelErr := svc.checkCancelled(deployCtx, bgCtx, app, deployment) + if cancelErr != nil { + return cancelErr + } + return err } - svc.notify.NotifyBuildSuccess(ctx, app, deployment) + svc.notify.NotifyBuildSuccess(bgCtx, app, deployment) // Deploy phase with timeout - err = svc.deployContainerWithTimeout(ctx, app, deployment, imageID) + err = svc.deployContainerWithTimeout(deployCtx, app, deployment, imageID) if err != nil { + cancelErr := svc.checkCancelled(deployCtx, bgCtx, app, deployment) + if cancelErr != nil { + return cancelErr + } + return err } - err = svc.updateAppRunning(ctx, app, imageID) + err = svc.updateAppRunning(bgCtx, app, imageID) if err != nil { return err } // Use context.WithoutCancel to ensure health check completes even if // the parent context is cancelled (e.g., HTTP request ends). - go svc.checkHealthAfterDelay(context.WithoutCancel(ctx), app, deployment) + go svc.checkHealthAfterDelay(bgCtx, app, deployment) return nil } @@ -463,6 +514,43 @@ func (svc *Service) unlockApp(appID string) { svc.getAppLock(appID).Unlock() } +// cancelActiveDeploy cancels any in-progress deployment for the given app +// and waits for it to finish before returning. +func (svc *Service) cancelActiveDeploy(appID string) { + val, ok := svc.activeDeploys.Load(appID) + if !ok { + return + } + + ad, ok := val.(*activeDeploy) + if !ok { + return + } + + svc.log.Info("cancelling in-progress deployment", "app_id", appID) + ad.cancel() + <-ad.done +} + +// checkCancelled checks if the deploy context was cancelled (by a newer deploy) +// and if so, marks the deployment as cancelled. Returns ErrDeployCancelled or nil. +func (svc *Service) checkCancelled( + deployCtx context.Context, + bgCtx context.Context, + app *models.App, + deployment *models.Deployment, +) error { + if !errors.Is(deployCtx.Err(), context.Canceled) { + return nil + } + + svc.log.Info("deployment cancelled by newer deploy", "app", app.Name) + + _ = deployment.MarkFinished(bgCtx, models.DeploymentStatusCancelled) + + return ErrDeployCancelled +} + func (svc *Service) fetchWebhookEvent( ctx context.Context, webhookEventID *int64, diff --git a/internal/service/deploy/deploy_cancel_test.go b/internal/service/deploy/deploy_cancel_test.go new file mode 100644 index 0000000..21d3dae --- /dev/null +++ b/internal/service/deploy/deploy_cancel_test.go @@ -0,0 +1,133 @@ +package deploy_test + +import ( + "context" + "log/slog" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "git.eeqj.de/sneak/upaas/internal/service/deploy" +) + +func TestCancelActiveDeploy_NoExisting(t *testing.T) { + t.Parallel() + + svc := deploy.NewTestService(slog.Default()) + + // Should not panic or block when no active deploy exists + svc.CancelActiveDeploy("nonexistent-app") +} + +func TestCancelActiveDeploy_CancelsAndWaits(t *testing.T) { + t.Parallel() + + svc := deploy.NewTestService(slog.Default()) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + + svc.RegisterActiveDeploy("app-1", cancel, done) + + // Simulate a running deploy that respects cancellation + var deployFinished bool + + go func() { + <-ctx.Done() + + deployFinished = true + + close(done) + }() + + svc.CancelActiveDeploy("app-1") + assert.True(t, deployFinished, "deploy should have finished after cancellation") +} + +func TestCancelActiveDeploy_BlocksUntilDone(t *testing.T) { + t.Parallel() + + svc := deploy.NewTestService(slog.Default()) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + + svc.RegisterActiveDeploy("app-2", cancel, done) + + // Simulate slow cleanup after cancellation + go func() { + <-ctx.Done() + time.Sleep(50 * time.Millisecond) + close(done) + }() + + start := time.Now() + + svc.CancelActiveDeploy("app-2") + + elapsed := time.Since(start) + + assert.GreaterOrEqual(t, elapsed, 50*time.Millisecond, + "cancelActiveDeploy should block until the deploy finishes") +} + +func TestTryLockApp_PreventsConcurrent(t *testing.T) { + t.Parallel() + + svc := deploy.NewTestService(slog.Default()) + + assert.True(t, svc.TryLockApp("app-1"), "first lock should succeed") + assert.False(t, svc.TryLockApp("app-1"), "second lock should fail") + + svc.UnlockApp("app-1") + + assert.True(t, svc.TryLockApp("app-1"), "lock after unlock should succeed") + + svc.UnlockApp("app-1") +} + +func TestCancelActiveDeploy_AllowsNewDeploy(t *testing.T) { + t.Parallel() + + svc := deploy.NewTestService(slog.Default()) + + // Simulate an active deploy holding the lock + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + + svc.RegisterActiveDeploy("app-3", cancel, done) + + // Lock the app as if a deploy is in progress + assert.True(t, svc.TryLockApp("app-3")) + + // Simulate deploy goroutine: release lock on cancellation + var mu sync.Mutex + + released := false + + go func() { + <-ctx.Done() + + svc.UnlockApp("app-3") + + mu.Lock() + released = true + mu.Unlock() + + close(done) + }() + + // Cancel should cause the old deploy to release its lock + svc.CancelActiveDeploy("app-3") + + mu.Lock() + assert.True(t, released) + mu.Unlock() + + // Now a new deploy should be able to acquire the lock + assert.True(t, svc.TryLockApp("app-3"), "should be able to lock after cancellation") + + svc.UnlockApp("app-3") +} diff --git a/internal/service/deploy/export_test.go b/internal/service/deploy/export_test.go new file mode 100644 index 0000000..1f3a73a --- /dev/null +++ b/internal/service/deploy/export_test.go @@ -0,0 +1,33 @@ +package deploy + +import ( + "context" + "log/slog" +) + +// NewTestService creates a Service with minimal dependencies for testing. +func NewTestService(log *slog.Logger) *Service { + return &Service{ + log: log, + } +} + +// CancelActiveDeploy exposes cancelActiveDeploy for testing. +func (svc *Service) CancelActiveDeploy(appID string) { + svc.cancelActiveDeploy(appID) +} + +// RegisterActiveDeploy registers an active deploy for testing. +func (svc *Service) RegisterActiveDeploy(appID string, cancel context.CancelFunc, done chan struct{}) { + svc.activeDeploys.Store(appID, &activeDeploy{cancel: cancel, done: done}) +} + +// TryLockApp exposes tryLockApp for testing. +func (svc *Service) TryLockApp(appID string) bool { + return svc.tryLockApp(appID) +} + +// UnlockApp exposes unlockApp for testing. +func (svc *Service) UnlockApp(appID string) { + svc.unlockApp(appID) +} diff --git a/internal/service/webhook/webhook.go b/internal/service/webhook/webhook.go index 9687192..dbdd23d 100644 --- a/internal/service/webhook/webhook.go +++ b/internal/service/webhook/webhook.go @@ -143,7 +143,7 @@ func (svc *Service) triggerDeployment( // even if the HTTP request context is cancelled. deployCtx := context.WithoutCancel(ctx) - deployErr := svc.deploy.Deploy(deployCtx, app, &eventID) + deployErr := svc.deploy.Deploy(deployCtx, app, &eventID, true) if deployErr != nil { svc.log.Error("deployment failed", "error", deployErr, "app", appName) }