// Package deploy provides deployment services. package deploy import ( "bytes" "context" "database/sql" "encoding/json" "errors" "fmt" "log/slog" "os" "path/filepath" "sync" "time" "go.uber.org/fx" "git.eeqj.de/sneak/upaas/internal/config" "git.eeqj.de/sneak/upaas/internal/database" "git.eeqj.de/sneak/upaas/internal/docker" "git.eeqj.de/sneak/upaas/internal/logger" "git.eeqj.de/sneak/upaas/internal/models" "git.eeqj.de/sneak/upaas/internal/service/notify" ) // Time constants. const ( healthCheckDelaySeconds = 60 // upaasLabelCount is the number of upaas-specific labels added to containers. upaasLabelCount = 1 // buildsDirPermissions is the permission mode for the builds directory. buildsDirPermissions = 0o750 // buildTimeout is the maximum duration for the build phase. buildTimeout = 30 * time.Minute // deployTimeout is the maximum duration for the deploy phase (container swap). deployTimeout = 5 * time.Minute ) // Sentinel errors for deployment failures. var ( // ErrContainerUnhealthy indicates the container failed health check. ErrContainerUnhealthy = errors.New("container unhealthy after 60 seconds") // ErrDeploymentInProgress indicates another deployment is already running. ErrDeploymentInProgress = errors.New("deployment already in progress for this app") // ErrBuildTimeout indicates the build phase exceeded the timeout. ErrBuildTimeout = errors.New("build timeout exceeded") // ErrDeployTimeout indicates the deploy phase exceeded the timeout. ErrDeployTimeout = errors.New("deploy timeout exceeded") ) // logFlushInterval is how often to flush buffered logs to the database. const logFlushInterval = time.Second // dockerLogMessage represents a Docker build log message. type dockerLogMessage struct { Stream string `json:"stream"` Error string `json:"error"` } // deploymentLogWriter implements io.Writer and periodically flushes to deployment logs. // It parses Docker JSON log format and extracts the stream content. type deploymentLogWriter struct { deployment *models.Deployment buffer bytes.Buffer lineBuffer bytes.Buffer // buffer for incomplete lines mu sync.Mutex done chan struct{} flushCtx context.Context //nolint:containedctx // needed for async flush goroutine } func newDeploymentLogWriter(ctx context.Context, deployment *models.Deployment) *deploymentLogWriter { w := &deploymentLogWriter{ deployment: deployment, done: make(chan struct{}), flushCtx: ctx, } go w.runFlushLoop() return w } // Write implements io.Writer. // It parses Docker JSON log format and extracts the stream content. func (w *deploymentLogWriter) Write(p []byte) (int, error) { w.mu.Lock() defer w.mu.Unlock() // Add incoming data to line buffer w.lineBuffer.Write(p) // Process complete lines data := w.lineBuffer.Bytes() lastNewline := bytes.LastIndexByte(data, '\n') if lastNewline == -1 { // No complete lines yet return len(p), nil } // Process all complete lines completeData := data[:lastNewline+1] remaining := data[lastNewline+1:] for line := range bytes.SplitSeq(completeData, []byte{'\n'}) { w.processLine(line) } // Keep any remaining incomplete line data w.lineBuffer.Reset() if len(remaining) > 0 { w.lineBuffer.Write(remaining) } return len(p), nil } // Close stops the flush loop and performs a final flush. func (w *deploymentLogWriter) Close() { close(w.done) } func (w *deploymentLogWriter) runFlushLoop() { ticker := time.NewTicker(logFlushInterval) defer ticker.Stop() for { select { case <-ticker.C: w.doFlush() case <-w.done: w.doFlush() return } } } func (w *deploymentLogWriter) doFlush() { w.mu.Lock() data := w.buffer.String() w.buffer.Reset() w.mu.Unlock() if data != "" { _ = w.deployment.AppendLog(w.flushCtx, data) } } // processLine parses a JSON log line and extracts the stream content. func (w *deploymentLogWriter) processLine(line []byte) { if len(line) == 0 { return } var msg dockerLogMessage err := json.Unmarshal(line, &msg) if err != nil { // Not valid JSON, write as-is w.buffer.Write(line) w.buffer.WriteByte('\n') return } if msg.Error != "" { w.buffer.WriteString("ERROR: ") w.buffer.WriteString(msg.Error) w.buffer.WriteByte('\n') } if msg.Stream != "" { w.buffer.WriteString(msg.Stream) } } // ServiceParams contains dependencies for Service. type ServiceParams struct { fx.In Logger *logger.Logger Config *config.Config Database *database.Database Docker *docker.Client Notify *notify.Service } // Service provides deployment functionality. type Service struct { log *slog.Logger db *database.Database docker *docker.Client notify *notify.Service config *config.Config params *ServiceParams appLocks sync.Map // map[string]*sync.Mutex - per-app deployment locks } // New creates a new deploy Service. func New(lc fx.Lifecycle, params ServiceParams) (*Service, error) { svc := &Service{ log: params.Logger.Get(), db: params.Database, docker: params.Docker, notify: params.Notify, config: params.Config, params: ¶ms, } if lc != nil { lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { return svc.cleanupStuckDeployments(ctx) }, }) } return svc, nil } // GetBuildDir returns the build directory path for an app. func (svc *Service) GetBuildDir(appID string) string { return filepath.Join(svc.config.DataDir, "builds", appID) } // Deploy deploys an app. func (svc *Service) Deploy( ctx context.Context, app *models.App, webhookEventID *int64, ) error { // Try to acquire per-app deployment lock if !svc.tryLockApp(app.ID) { svc.log.Warn("deployment already in progress", "app", app.Name) return ErrDeploymentInProgress } defer svc.unlockApp(app.ID) // Fetch webhook event and create deployment record webhookEvent := svc.fetchWebhookEvent(ctx, webhookEventID) deployment, err := svc.createDeploymentRecord(ctx, app, webhookEventID, webhookEvent) if err != nil { return err } svc.logWebhookPayload(ctx, deployment, webhookEvent) err = svc.updateAppStatusBuilding(ctx, app) if err != nil { return err } svc.notify.NotifyBuildStart(ctx, app, deployment) // Build phase with timeout imageID, err := svc.buildImageWithTimeout(ctx, app, deployment) if err != nil { return err } svc.notify.NotifyBuildSuccess(ctx, app, deployment) // Deploy phase with timeout err = svc.deployContainerWithTimeout(ctx, app, deployment, imageID) if err != nil { return err } err = svc.updateAppRunning(ctx, app, imageID) if err != nil { return err } // Use context.WithoutCancel to ensure health check completes even if // the parent context is cancelled (e.g., HTTP request ends). go svc.checkHealthAfterDelay(context.WithoutCancel(ctx), app, deployment) return nil } // buildImageWithTimeout runs the build phase with a timeout. func (svc *Service) buildImageWithTimeout( ctx context.Context, app *models.App, deployment *models.Deployment, ) (string, error) { buildCtx, cancel := context.WithTimeout(ctx, buildTimeout) defer cancel() imageID, err := svc.buildImage(buildCtx, app, deployment) if err != nil { if errors.Is(buildCtx.Err(), context.DeadlineExceeded) { timeoutErr := fmt.Errorf("%w after %v", ErrBuildTimeout, buildTimeout) svc.failDeployment(ctx, app, deployment, timeoutErr) return "", timeoutErr } return "", err } return imageID, nil } // deployContainerWithTimeout runs the deploy phase with a timeout. // It removes the old container and starts the new one. func (svc *Service) deployContainerWithTimeout( ctx context.Context, app *models.App, deployment *models.Deployment, imageID string, ) error { deployCtx, cancel := context.WithTimeout(ctx, deployTimeout) defer cancel() err := svc.updateDeploymentDeploying(deployCtx, deployment) if err != nil { return err } // Remove old container first to free up the name svc.removeOldContainer(deployCtx, app, deployment) // Create and start the new container _, err = svc.createAndStartContainer(deployCtx, app, deployment, imageID) if err != nil { if errors.Is(deployCtx.Err(), context.DeadlineExceeded) { timeoutErr := fmt.Errorf("%w after %v", ErrDeployTimeout, deployTimeout) svc.failDeployment(ctx, app, deployment, timeoutErr) return timeoutErr } return err } return nil } // cleanupStuckDeployments marks any deployments stuck in building/deploying as failed. func (svc *Service) cleanupStuckDeployments(ctx context.Context) error { query := ` UPDATE deployments SET status = ?, finished_at = ?, logs = COALESCE(logs, '') || ? WHERE status IN (?, ?) ` msg := "\n[System] Deployment marked as failed: process was interrupted\n" _, err := svc.db.DB().ExecContext( ctx, query, models.DeploymentStatusFailed, time.Now(), msg, models.DeploymentStatusBuilding, models.DeploymentStatusDeploying, ) if err != nil { svc.log.Error("failed to cleanup stuck deployments", "error", err) return fmt.Errorf("failed to cleanup stuck deployments: %w", err) } svc.log.Info("cleaned up stuck deployments") return nil } func (svc *Service) getAppLock(appID string) *sync.Mutex { lock, _ := svc.appLocks.LoadOrStore(appID, &sync.Mutex{}) mu, ok := lock.(*sync.Mutex) if !ok { // This should never happen, but handle it gracefully newMu := &sync.Mutex{} svc.appLocks.Store(appID, newMu) return newMu } return mu } func (svc *Service) tryLockApp(appID string) bool { return svc.getAppLock(appID).TryLock() } func (svc *Service) unlockApp(appID string) { svc.getAppLock(appID).Unlock() } func (svc *Service) fetchWebhookEvent( ctx context.Context, webhookEventID *int64, ) *models.WebhookEvent { if webhookEventID == nil { return nil } event, err := models.FindWebhookEvent(ctx, svc.db, *webhookEventID) if err != nil { svc.log.Warn("failed to fetch webhook event", "error", err) return nil } return event } func (svc *Service) logWebhookPayload( ctx context.Context, deployment *models.Deployment, webhookEvent *models.WebhookEvent, ) { if webhookEvent == nil || !webhookEvent.Payload.Valid { return } _ = deployment.AppendLog(ctx, "Webhook received:\n"+webhookEvent.Payload.String+"\n") } func (svc *Service) createDeploymentRecord( ctx context.Context, app *models.App, webhookEventID *int64, webhookEvent *models.WebhookEvent, ) (*models.Deployment, error) { deployment := models.NewDeployment(svc.db) deployment.AppID = app.ID if webhookEventID != nil { deployment.WebhookEventID = sql.NullInt64{ Int64: *webhookEventID, Valid: true, } } // Set commit SHA from webhook event if webhookEvent != nil && webhookEvent.CommitSHA.Valid { deployment.CommitSHA = webhookEvent.CommitSHA } deployment.Status = models.DeploymentStatusBuilding saveErr := deployment.Save(ctx) if saveErr != nil { return nil, fmt.Errorf("failed to create deployment: %w", saveErr) } return deployment, nil } func (svc *Service) updateAppStatusBuilding( ctx context.Context, app *models.App, ) error { app.Status = models.AppStatusBuilding saveErr := app.Save(ctx) if saveErr != nil { return fmt.Errorf("failed to update app status: %w", saveErr) } return nil } func (svc *Service) buildImage( ctx context.Context, app *models.App, deployment *models.Deployment, ) (string, error) { workDir, cleanup, err := svc.cloneRepository(ctx, app, deployment) if err != nil { return "", err } defer cleanup() imageTag := "upaas/" + app.Name + ":latest" // Create log writer that flushes build output to deployment logs every second logWriter := newDeploymentLogWriter(ctx, deployment) defer logWriter.Close() // BuildImage creates a tar archive from the local filesystem, // so it needs the container path where files exist, not the host path. imageID, err := svc.docker.BuildImage(ctx, docker.BuildImageOptions{ ContextDir: workDir, DockerfilePath: app.DockerfilePath, Tags: []string{imageTag}, LogWriter: logWriter, }) if err != nil { svc.notify.NotifyBuildFailed(ctx, app, deployment, err) svc.failDeployment( ctx, app, deployment, fmt.Errorf("failed to build image: %w", err), ) return "", fmt.Errorf("failed to build image: %w", err) } deployment.ImageID = sql.NullString{String: imageID, Valid: true} _ = deployment.AppendLog(ctx, "Image built: "+imageID) return imageID, nil } func (svc *Service) cloneRepository( ctx context.Context, app *models.App, deployment *models.Deployment, ) (string, func(), error) { // Use a subdirectory of DataDir for builds since it's mounted from the host // and accessible to Docker for bind mounts (unlike /tmp inside the container). // Structure: builds//-/ // deploy_key <- SSH key for cloning // work/ <- cloned repository appBuildsDir := filepath.Join(svc.config.DataDir, "builds", app.Name) err := os.MkdirAll(appBuildsDir, buildsDirPermissions) if err != nil { svc.failDeployment(ctx, app, deployment, fmt.Errorf("failed to create builds dir: %w", err)) return "", nil, fmt.Errorf("failed to create builds dir: %w", err) } buildDir, err := os.MkdirTemp(appBuildsDir, fmt.Sprintf("%d-*", deployment.ID)) if err != nil { svc.failDeployment(ctx, app, deployment, fmt.Errorf("failed to create temp dir: %w", err)) return "", nil, fmt.Errorf("failed to create temp dir: %w", err) } cleanup := func() { _ = os.RemoveAll(buildDir) } // Calculate the host path for Docker bind mounts. // When upaas runs in a container, DataDir is the container path but Docker // needs the host path. HostDataDir provides the host-side equivalent. // CloneRepo needs both: container path for writing files, host path for Docker mounts. hostBuildDir := svc.containerToHostPath(buildDir) // Get commit SHA from deployment if available var commitSHA string if deployment.CommitSHA.Valid { commitSHA = deployment.CommitSHA.String } cloneResult, cloneErr := svc.docker.CloneRepo( ctx, app.RepoURL, app.Branch, commitSHA, app.SSHPrivateKey, buildDir, hostBuildDir, ) if cloneErr != nil { cleanup() svc.failDeployment(ctx, app, deployment, fmt.Errorf("failed to clone repo: %w", cloneErr)) return "", nil, fmt.Errorf("failed to clone repo: %w", cloneErr) } // Log clone success with git output if commitSHA != "" { _ = deployment.AppendLog(ctx, "Repository cloned at commit "+commitSHA) } else { _ = deployment.AppendLog(ctx, "Repository cloned (branch: "+app.Branch+")") } if cloneResult != nil && cloneResult.Output != "" { _ = deployment.AppendLog(ctx, cloneResult.Output) } // Return the 'work' subdirectory where the repo was cloned workDir := filepath.Join(buildDir, "work") return workDir, cleanup, nil } // containerToHostPath converts a container-local path to the equivalent host path. // This is needed when upaas runs inside a container but needs to pass paths to Docker. func (svc *Service) containerToHostPath(containerPath string) string { if svc.config.HostDataDir == svc.config.DataDir { return containerPath } // Get relative path from DataDir relPath, err := filepath.Rel(svc.config.DataDir, containerPath) if err != nil { // Fall back to original path if we can't compute relative path return containerPath } return filepath.Join(svc.config.HostDataDir, relPath) } func (svc *Service) updateDeploymentDeploying( ctx context.Context, deployment *models.Deployment, ) error { deployment.Status = models.DeploymentStatusDeploying saveErr := deployment.Save(ctx) if saveErr != nil { return fmt.Errorf("failed to update deployment status: %w", saveErr) } return nil } // removeOldContainer stops and removes the old container before deploying a new one. func (svc *Service) removeOldContainer( ctx context.Context, app *models.App, deployment *models.Deployment, ) { containerInfo, err := svc.docker.FindContainerByAppID(ctx, app.ID) if err != nil || containerInfo == nil { return } svc.log.Info("removing old container", "id", containerInfo.ID) if containerInfo.Running { stopErr := svc.docker.StopContainer(ctx, containerInfo.ID) if stopErr != nil { svc.log.Warn("failed to stop old container", "error", stopErr) } } removeErr := svc.docker.RemoveContainer(ctx, containerInfo.ID, true) if removeErr != nil { svc.log.Warn("failed to remove old container", "error", removeErr) } _ = deployment.AppendLog(ctx, "Old container removed: "+containerInfo.ID[:12]) } func (svc *Service) createAndStartContainer( ctx context.Context, app *models.App, deployment *models.Deployment, imageID string, ) (string, error) { containerOpts, err := svc.buildContainerOptions(ctx, app, imageID) if err != nil { svc.failDeployment(ctx, app, deployment, err) return "", err } containerID, err := svc.docker.CreateContainer(ctx, containerOpts) if err != nil { svc.notify.NotifyDeployFailed(ctx, app, deployment, err) svc.failDeployment( ctx, app, deployment, fmt.Errorf("failed to create container: %w", err), ) return "", fmt.Errorf("failed to create container: %w", err) } deployment.ContainerID = sql.NullString{String: containerID, Valid: true} _ = deployment.AppendLog(ctx, "Container created: "+containerID) startErr := svc.docker.StartContainer(ctx, containerID) if startErr != nil { svc.notify.NotifyDeployFailed(ctx, app, deployment, startErr) svc.failDeployment( ctx, app, deployment, fmt.Errorf("failed to start container: %w", startErr), ) return "", fmt.Errorf("failed to start container: %w", startErr) } _ = deployment.AppendLog(ctx, "Container started") return containerID, nil } func (svc *Service) buildContainerOptions( ctx context.Context, app *models.App, _ string, ) (docker.CreateContainerOptions, error) { envVars, err := app.GetEnvVars(ctx) if err != nil { return docker.CreateContainerOptions{}, fmt.Errorf("failed to get env vars: %w", err) } labels, err := app.GetLabels(ctx) if err != nil { return docker.CreateContainerOptions{}, fmt.Errorf("failed to get labels: %w", err) } volumes, err := app.GetVolumes(ctx) if err != nil { return docker.CreateContainerOptions{}, fmt.Errorf("failed to get volumes: %w", err) } ports, err := app.GetPorts(ctx) if err != nil { return docker.CreateContainerOptions{}, fmt.Errorf("failed to get ports: %w", err) } envMap := make(map[string]string, len(envVars)) for _, envVar := range envVars { envMap[envVar.Key] = envVar.Value } network := "" if app.DockerNetwork.Valid { network = app.DockerNetwork.String } return docker.CreateContainerOptions{ Name: "upaas-" + app.Name, Image: "upaas/" + app.Name + ":latest", Env: envMap, Labels: buildLabelMap(app, labels), Volumes: buildVolumeMounts(volumes), Ports: buildPortMappings(ports), Network: network, }, nil } func buildLabelMap(app *models.App, labels []*models.Label) map[string]string { labelMap := make(map[string]string, len(labels)+upaasLabelCount) for _, label := range labels { labelMap[label.Key] = label.Value } // Add the upaas.id label to identify this container labelMap[docker.LabelUpaasID] = app.ID return labelMap } func buildVolumeMounts(volumes []*models.Volume) []docker.VolumeMount { mounts := make([]docker.VolumeMount, 0, len(volumes)) for _, vol := range volumes { mounts = append(mounts, docker.VolumeMount{ HostPath: vol.HostPath, ContainerPath: vol.ContainerPath, ReadOnly: vol.ReadOnly, }) } return mounts } func buildPortMappings(ports []*models.Port) []docker.PortMapping { mappings := make([]docker.PortMapping, 0, len(ports)) for _, port := range ports { mappings = append(mappings, docker.PortMapping{ HostPort: port.HostPort, ContainerPort: port.ContainerPort, Protocol: string(port.Protocol), }) } return mappings } func (svc *Service) updateAppRunning( ctx context.Context, app *models.App, imageID string, ) error { app.ImageID = sql.NullString{String: imageID, Valid: true} app.Status = models.AppStatusRunning saveErr := app.Save(ctx) if saveErr != nil { return fmt.Errorf("failed to update app: %w", saveErr) } return nil } func (svc *Service) checkHealthAfterDelay( ctx context.Context, app *models.App, deployment *models.Deployment, ) { svc.log.Info( "waiting 60 seconds to check container health", "app", app.Name, ) time.Sleep(healthCheckDelaySeconds * time.Second) // Reload app to get current state reloadedApp, err := models.FindApp(ctx, svc.db, app.ID) if err != nil || reloadedApp == nil { svc.log.Error("failed to reload app for health check", "error", err) return } containerInfo, containerErr := svc.docker.FindContainerByAppID(ctx, app.ID) if containerErr != nil || containerInfo == nil { return } healthy, err := svc.docker.IsContainerHealthy(ctx, containerInfo.ID) if err != nil { svc.log.Error("failed to check container health", "error", err) svc.notify.NotifyDeployFailed(ctx, reloadedApp, deployment, err) _ = deployment.MarkFinished(ctx, models.DeploymentStatusFailed) return } if healthy { svc.log.Info("container healthy after 60 seconds", "app", reloadedApp.Name) svc.notify.NotifyDeploySuccess(ctx, reloadedApp, deployment) _ = deployment.MarkFinished(ctx, models.DeploymentStatusSuccess) } else { svc.log.Warn( "container unhealthy after 60 seconds", "app", reloadedApp.Name, ) svc.notify.NotifyDeployFailed(ctx, reloadedApp, deployment, ErrContainerUnhealthy) _ = deployment.MarkFinished(ctx, models.DeploymentStatusFailed) reloadedApp.Status = models.AppStatusError _ = reloadedApp.Save(ctx) } } func (svc *Service) failDeployment( ctx context.Context, app *models.App, deployment *models.Deployment, deployErr error, ) { svc.log.Error("deployment failed", "app", app.Name, "error", deployErr) _ = deployment.AppendLog(ctx, "ERROR: "+deployErr.Error()) _ = deployment.MarkFinished(ctx, models.DeploymentStatusFailed) app.Status = models.AppStatusError _ = app.Save(ctx) }