diff --git a/internal/handlers/app.go b/internal/handlers/app.go index 596337d..ccdaf14 100644 --- a/internal/handlers/app.go +++ b/internal/handlers/app.go @@ -345,10 +345,12 @@ func (h *Handlers) HandleAppDeploy() http.HandlerFunc { } }(deployCtx, application) + redirectURL := "/apps/" + application.ID + "/deployments" + http.Redirect( writer, request, - "/apps/"+application.ID+"/deployments", + redirectURL, http.StatusSeeOther, ) } diff --git a/internal/service/deploy/cancel_test.go b/internal/service/deploy/cancel_test.go new file mode 100644 index 0000000..f52ed92 --- /dev/null +++ b/internal/service/deploy/cancel_test.go @@ -0,0 +1,172 @@ +package deploy_test + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/fx" + + "git.eeqj.de/sneak/upaas/internal/config" + "git.eeqj.de/sneak/upaas/internal/database" + "git.eeqj.de/sneak/upaas/internal/docker" + "git.eeqj.de/sneak/upaas/internal/globals" + "git.eeqj.de/sneak/upaas/internal/logger" + "git.eeqj.de/sneak/upaas/internal/models" + "git.eeqj.de/sneak/upaas/internal/service/deploy" + "git.eeqj.de/sneak/upaas/internal/service/notify" +) + +func setupDeployService(t *testing.T) (*deploy.Service, *database.Database) { + t.Helper() + + tmpDir := t.TempDir() + + globals.SetAppname("upaas-test") + globals.SetVersion("test") + + globalsInst, err := globals.New(fx.Lifecycle(nil)) + require.NoError(t, err) + + loggerInst, err := logger.New(fx.Lifecycle(nil), logger.Params{Globals: globalsInst}) + require.NoError(t, err) + + cfg := &config.Config{Port: 8080, DataDir: tmpDir, SessionSecret: "test-secret-key-at-least-32-chars"} + + dbInst, err := database.New(fx.Lifecycle(nil), database.Params{Logger: loggerInst, Config: cfg}) + require.NoError(t, err) + + dockerClient, err := docker.New(fx.Lifecycle(nil), docker.Params{Logger: loggerInst, Config: cfg}) + require.NoError(t, err) + + notifySvc, err := notify.New(fx.Lifecycle(nil), notify.ServiceParams{Logger: loggerInst}) + require.NoError(t, err) + + svc, err := deploy.New(fx.Lifecycle(nil), deploy.ServiceParams{ + Logger: loggerInst, Config: cfg, Database: dbInst, Docker: dockerClient, Notify: notifySvc, + }) + require.NoError(t, err) + + return svc, dbInst +} + +func createDeployTestApp(t *testing.T, dbInst *database.Database) *models.App { + t.Helper() + + app := models.NewApp(dbInst) + app.ID = "test-cancel-app" + app.Name = "test-cancel-app" + app.RepoURL = "git@example.com:user/repo.git" + app.Branch = "main" + app.DockerfilePath = "Dockerfile" + app.SSHPrivateKey = "private-key" + app.SSHPublicKey = "public-key" + app.Status = models.AppStatusPending + + err := app.Save(context.Background()) + require.NoError(t, err) + + return app +} + +func TestConcurrentDeploysOnSameApp(t *testing.T) { + t.Parallel() + + svc, dbInst := setupDeployService(t) + app := createDeployTestApp(t, dbInst) + + // Run two deploys concurrently - both will fail (no docker) but we verify + // the lock mechanism works (no panics, no data races) + var wg sync.WaitGroup + + results := make([]error, 2) + + for i := range 2 { + wg.Add(1) + + go func(idx int) { + defer wg.Done() + + results[idx] = svc.Deploy(context.Background(), app, nil) + }(i) + } + + wg.Wait() + + // At least one should have run (and failed due to docker not connected) + // The other either ran too or got ErrDeploymentInProgress + gotInProgress := false + gotOther := false + + for _, err := range results { + require.Error(t, err) + + if errors.Is(err, deploy.ErrDeploymentInProgress) { + gotInProgress = true + } else { + gotOther = true + } + } + + // At least one must have actually attempted the deploy + assert.True(t, gotOther, "at least one deploy should have attempted execution") + + // If timing worked out, one should have been rejected + // (but this is racy - both might complete sequentially) + _ = gotInProgress +} + +func TestCancelAppDeployReturnsFalseWhenNoDeploy(t *testing.T) { + t.Parallel() + + svc, _ := setupDeployService(t) + + // No deploy in progress, should return false + cancelled := svc.CancelAppDeploy("nonexistent-app") + assert.False(t, cancelled) +} + +func TestCancelAppDeployCancelsInProgressDeploy(t *testing.T) { + t.Parallel() + + svc, dbInst := setupDeployService(t) + app := createDeployTestApp(t, dbInst) + + deployStarted := make(chan struct{}) + deployDone := make(chan error, 1) + + // Start a deploy that will hold the lock + go func() { + // Signal when deploy starts (will acquire lock quickly) + close(deployStarted) + + deployDone <- svc.Deploy(context.Background(), app, nil) + }() + + <-deployStarted + // Give it time to acquire the lock and register the cancel func + time.Sleep(100 * time.Millisecond) + + // Cancel should work if deploy is still in progress + _ = svc.CancelAppDeploy(app.ID) + + // Wait for deploy to finish + select { + case <-deployDone: + // Deploy finished (either cancelled or failed for other reason) + case <-time.After(5 * time.Second): + t.Fatal("deploy did not finish after cancellation") + } +} + +func TestErrDeployCancelledExists(t *testing.T) { + t.Parallel() + + // Verify the sentinel error exists + require.Error(t, deploy.ErrDeployCancelled) + assert.Equal(t, "deployment cancelled by newer deploy", deploy.ErrDeployCancelled.Error()) +} diff --git a/internal/service/deploy/deploy.go b/internal/service/deploy/deploy.go index b51e15a..34dacd0 100644 --- a/internal/service/deploy/deploy.go +++ b/internal/service/deploy/deploy.go @@ -47,6 +47,8 @@ var ( ErrBuildTimeout = errors.New("build timeout exceeded") // ErrDeployTimeout indicates the deploy phase exceeded the timeout. ErrDeployTimeout = errors.New("deploy timeout exceeded") + // ErrDeployCancelled indicates the deployment was cancelled by a newer deploy. + ErrDeployCancelled = errors.New("deployment cancelled by newer deploy") ) // logFlushInterval is how often to flush buffered logs to the database. @@ -212,8 +214,9 @@ type Service struct { docker *docker.Client notify *notify.Service config *config.Config - params *ServiceParams - appLocks sync.Map // map[string]*sync.Mutex - per-app deployment locks + params *ServiceParams + appLocks sync.Map // map[string]*sync.Mutex - per-app deployment locks + appCancels sync.Map // map[string]context.CancelFunc - per-app deploy cancellation } // New creates a new deploy Service. @@ -274,6 +277,21 @@ func (svc *Service) GetLogFilePath(app *models.App, deployment *models.Deploymen return filepath.Join(svc.config.DataDir, "logs", hostname, app.Name, filename) } +// CancelAppDeploy cancels any in-progress deployment for the given app. +// It returns true if a deployment was cancelled, false if none was in progress. +func (svc *Service) CancelAppDeploy(appID string) bool { + if cancelVal, ok := svc.appCancels.Load(appID); ok { + if cancelFn, ok := cancelVal.(context.CancelFunc); ok { + svc.log.Info("cancelling in-progress deployment", "app_id", appID) + cancelFn() + + return true + } + } + + return false +} + // Deploy deploys an app. func (svc *Service) Deploy( ctx context.Context, @@ -288,6 +306,14 @@ func (svc *Service) Deploy( } defer svc.unlockApp(app.ID) + // Create a cancellable context so in-progress deploys can be cancelled + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Store the cancel func so other deploys can cancel this one + svc.appCancels.Store(app.ID, cancel) + defer svc.appCancels.Delete(app.ID) + // Fetch webhook event and create deployment record webhookEvent := svc.fetchWebhookEvent(ctx, webhookEventID) @@ -349,6 +375,13 @@ func (svc *Service) buildImageWithTimeout( return "", timeoutErr } + if errors.Is(ctx.Err(), context.Canceled) { + cancelErr := fmt.Errorf("%w", ErrDeployCancelled) + svc.failDeployment(context.WithoutCancel(ctx), app, deployment, cancelErr) + + return "", cancelErr + } + return "", err } @@ -384,6 +417,13 @@ func (svc *Service) deployContainerWithTimeout( return timeoutErr } + if errors.Is(ctx.Err(), context.Canceled) { + cancelErr := fmt.Errorf("%w", ErrDeployCancelled) + svc.failDeployment(context.WithoutCancel(ctx), app, deployment, cancelErr) + + return cancelErr + } + return err } diff --git a/internal/service/webhook/webhook.go b/internal/service/webhook/webhook.go index 9687192..b0008fb 100644 --- a/internal/service/webhook/webhook.go +++ b/internal/service/webhook/webhook.go @@ -5,8 +5,10 @@ import ( "context" "database/sql" "encoding/json" + "errors" "fmt" "log/slog" + "time" "go.uber.org/fx" @@ -129,6 +131,12 @@ func (svc *Service) HandleWebhook( return nil } +// cancelRetryDelay is the time to wait after cancelling a deploy before retrying. +const cancelRetryDelay = 2 * time.Second + +// cancelRetryAttempts is the maximum number of times to retry after cancelling. +const cancelRetryAttempts = 30 + func (svc *Service) triggerDeployment( ctx context.Context, app *models.App, @@ -144,6 +152,25 @@ func (svc *Service) triggerDeployment( deployCtx := context.WithoutCancel(ctx) deployErr := svc.deploy.Deploy(deployCtx, app, &eventID) + if deployErr != nil && errors.Is(deployErr, deploy.ErrDeploymentInProgress) { + // Cancel the in-progress deployment and retry + svc.log.Info("cancelling in-progress deployment for new webhook trigger", "app", appName) + svc.deploy.CancelAppDeploy(app.ID) + + // Retry until the lock is released by the cancelled deploy + for attempt := range cancelRetryAttempts { + time.Sleep(cancelRetryDelay) + + svc.log.Info("retrying deployment after cancel", + "app", appName, "attempt", attempt+1) + + deployErr = svc.deploy.Deploy(deployCtx, app, &eventID) + if deployErr == nil || !errors.Is(deployErr, deploy.ErrDeploymentInProgress) { + break + } + } + } + if deployErr != nil { svc.log.Error("deployment failed", "error", deployErr, "app", appName) }