upaas/internal/service/deploy/deploy.go
clawbot 3c1525d59e test: add rollback error condition tests
Add tests for Rollback method error paths:
- No previous image available
- Empty previous image string
- App deployment lock held
- App lock already acquired

Relates to #71
2026-02-16 00:27:46 -08:00

1209 lines
32 KiB
Go

// Package deploy provides deployment services.
package deploy
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"log/slog"
"os"
"path/filepath"
"sync"
"time"
"go.uber.org/fx"
"git.eeqj.de/sneak/upaas/internal/config"
"git.eeqj.de/sneak/upaas/internal/database"
"git.eeqj.de/sneak/upaas/internal/docker"
"git.eeqj.de/sneak/upaas/internal/logger"
"git.eeqj.de/sneak/upaas/internal/models"
"git.eeqj.de/sneak/upaas/internal/service/notify"
)
// Time constants.
const (
healthCheckDelaySeconds = 60
// upaasLabelCount is the number of upaas-specific labels added to containers.
upaasLabelCount = 1
// buildsDirPermissions is the permission mode for the builds directory.
buildsDirPermissions = 0o750
// buildTimeout is the maximum duration for the build phase.
buildTimeout = 30 * time.Minute
// deployTimeout is the maximum duration for the deploy phase (container swap).
deployTimeout = 5 * time.Minute
)
// Sentinel errors for deployment failures.
var (
// ErrContainerUnhealthy indicates the container failed health check.
ErrContainerUnhealthy = errors.New("container unhealthy after 60 seconds")
// ErrDeploymentInProgress indicates another deployment is already running.
ErrDeploymentInProgress = errors.New("deployment already in progress for this app")
// ErrDeployCancelled indicates the deployment was cancelled by a newer deploy.
ErrDeployCancelled = errors.New("deployment cancelled by newer deploy")
// ErrBuildTimeout indicates the build phase exceeded the timeout.
ErrBuildTimeout = errors.New("build timeout exceeded")
// ErrDeployTimeout indicates the deploy phase exceeded the timeout.
ErrDeployTimeout = errors.New("deploy timeout exceeded")
// ErrNoPreviousImage indicates there is no previous image to rollback to.
ErrNoPreviousImage = errors.New("no previous image available for rollback")
)
// logFlushInterval is how often to flush buffered logs to the database.
const logFlushInterval = time.Second
// logsDirPermissions is the permission mode for the logs directory.
const logsDirPermissions = 0o750
// logFilePermissions is the permission mode for log files.
const logFilePermissions = 0o640
// logTimestampFormat is the format for log file timestamps.
const logTimestampFormat = "20060102T150405Z"
// logFileShortSHALength is the number of characters to use for commit SHA in log filenames.
const logFileShortSHALength = 12
// dockerLogMessage represents a Docker build log message.
type dockerLogMessage struct {
Stream string `json:"stream"`
Error string `json:"error"`
}
// deploymentLogWriter implements io.Writer and periodically flushes to deployment logs.
// It parses Docker JSON log format and extracts the stream content.
type deploymentLogWriter struct {
deployment *models.Deployment
buffer bytes.Buffer
lineBuffer bytes.Buffer // buffer for incomplete lines
mu sync.Mutex
done chan struct{}
flushed sync.WaitGroup // waits for flush goroutine to finish
flushCtx context.Context //nolint:containedctx // needed for async flush goroutine
}
func newDeploymentLogWriter(ctx context.Context, deployment *models.Deployment) *deploymentLogWriter {
w := &deploymentLogWriter{
deployment: deployment,
done: make(chan struct{}),
flushCtx: ctx,
}
w.flushed.Add(1)
go w.runFlushLoop()
return w
}
// Write implements io.Writer.
// It parses Docker JSON log format and extracts the stream content.
func (w *deploymentLogWriter) Write(p []byte) (int, error) {
w.mu.Lock()
defer w.mu.Unlock()
// Add incoming data to line buffer
w.lineBuffer.Write(p)
// Process complete lines
data := w.lineBuffer.Bytes()
lastNewline := bytes.LastIndexByte(data, '\n')
if lastNewline == -1 {
// No complete lines yet
return len(p), nil
}
// Process all complete lines
completeData := data[:lastNewline+1]
remaining := data[lastNewline+1:]
for line := range bytes.SplitSeq(completeData, []byte{'\n'}) {
w.processLine(line)
}
// Keep any remaining incomplete line data
w.lineBuffer.Reset()
if len(remaining) > 0 {
w.lineBuffer.Write(remaining)
}
return len(p), nil
}
// Close stops the flush loop, waits for the final flush to complete.
func (w *deploymentLogWriter) Close() {
close(w.done)
w.flushed.Wait()
}
func (w *deploymentLogWriter) runFlushLoop() {
defer w.flushed.Done()
ticker := time.NewTicker(logFlushInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
w.doFlush()
case <-w.done:
w.doFlush()
return
}
}
}
func (w *deploymentLogWriter) doFlush() {
w.mu.Lock()
data := w.buffer.String()
w.buffer.Reset()
w.mu.Unlock()
if data != "" {
_ = w.deployment.AppendLog(w.flushCtx, data)
}
}
// processLine parses a JSON log line and extracts the stream content.
func (w *deploymentLogWriter) processLine(line []byte) {
if len(line) == 0 {
return
}
var msg dockerLogMessage
err := json.Unmarshal(line, &msg)
if err != nil {
// Not valid JSON, write as-is
w.buffer.Write(line)
w.buffer.WriteByte('\n')
return
}
if msg.Error != "" {
w.buffer.WriteString("ERROR: ")
w.buffer.WriteString(msg.Error)
w.buffer.WriteByte('\n')
}
if msg.Stream != "" {
w.buffer.WriteString(msg.Stream)
}
}
// ServiceParams contains dependencies for Service.
type ServiceParams struct {
fx.In
Logger *logger.Logger
Config *config.Config
Database *database.Database
Docker *docker.Client
Notify *notify.Service
}
// activeDeploy tracks a running deployment so it can be cancelled.
type activeDeploy struct {
cancel context.CancelFunc
done chan struct{}
}
// Service provides deployment functionality.
type Service struct {
log *slog.Logger
db *database.Database
docker *docker.Client
notify *notify.Service
config *config.Config
params *ServiceParams
activeDeploys sync.Map // map[string]*activeDeploy - per-app active deployment tracking
appLocks sync.Map // map[string]*sync.Mutex - per-app deployment locks
}
// New creates a new deploy Service.
func New(lc fx.Lifecycle, params ServiceParams) (*Service, error) {
svc := &Service{
log: params.Logger.Get(),
db: params.Database,
docker: params.Docker,
notify: params.Notify,
config: params.Config,
params: &params,
}
if lc != nil {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return svc.cleanupStuckDeployments(ctx)
},
})
}
return svc, nil
}
// GetBuildDir returns the build directory path for an app.
func (svc *Service) GetBuildDir(appID string) string {
return filepath.Join(svc.config.DataDir, "builds", appID)
}
// GetLogFilePath returns the path to the log file for a deployment.
// Returns empty string if the path cannot be determined.
func (svc *Service) GetLogFilePath(app *models.App, deployment *models.Deployment) string {
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown"
}
// Get commit SHA
sha := ""
if deployment.CommitSHA.Valid && deployment.CommitSHA.String != "" {
sha = deployment.CommitSHA.String
if len(sha) > logFileShortSHALength {
sha = sha[:logFileShortSHALength]
}
}
// Use started_at timestamp
timestamp := deployment.StartedAt.UTC().Format(logTimestampFormat)
// Build filename: appname_sha_timestamp.log.txt (or appname_timestamp.log.txt if no SHA)
var filename string
if sha != "" {
filename = fmt.Sprintf("%s_%s_%s.log.txt", app.Name, sha, timestamp)
} else {
filename = fmt.Sprintf("%s_%s.log.txt", app.Name, timestamp)
}
return filepath.Join(svc.config.DataDir, "logs", hostname, app.Name, filename)
}
// HasActiveDeploy returns true if there is an active deployment for the given app.
func (svc *Service) HasActiveDeploy(appID string) bool {
_, ok := svc.activeDeploys.Load(appID)
return ok
}
// CancelDeploy cancels any in-progress deployment for the given app
// and waits for it to finish before returning. Returns true if a deployment
// was cancelled, false if there was nothing to cancel.
func (svc *Service) CancelDeploy(appID string) bool {
if !svc.HasActiveDeploy(appID) {
return false
}
svc.cancelActiveDeploy(appID)
return true
}
// Deploy deploys an app. If cancelExisting is true (e.g. webhook-triggered),
// any in-progress deploy for the same app will be cancelled before starting.
// If cancelExisting is false and a deploy is in progress, ErrDeploymentInProgress is returned.
func (svc *Service) Deploy(
ctx context.Context,
app *models.App,
webhookEventID *int64,
cancelExisting bool,
) error {
if cancelExisting {
svc.cancelActiveDeploy(app.ID)
}
// Try to acquire per-app deployment lock
if !svc.tryLockApp(app.ID) {
svc.log.Warn("deployment already in progress", "app", app.Name)
return ErrDeploymentInProgress
}
defer svc.unlockApp(app.ID)
// Set up cancellable context and register as active deploy
deployCtx, cancel := context.WithCancel(ctx)
done := make(chan struct{})
ad := &activeDeploy{cancel: cancel, done: done}
svc.activeDeploys.Store(app.ID, ad)
defer func() {
cancel()
close(done)
svc.activeDeploys.Delete(app.ID)
}()
// Fetch webhook event and create deployment record
webhookEvent := svc.fetchWebhookEvent(deployCtx, webhookEventID)
// Use a background context for DB operations that must complete regardless of cancellation
bgCtx := context.WithoutCancel(deployCtx)
deployment, err := svc.createDeploymentRecord(bgCtx, app, webhookEventID, webhookEvent)
if err != nil {
return err
}
svc.logWebhookPayload(bgCtx, deployment, webhookEvent)
err = svc.updateAppStatusBuilding(bgCtx, app)
if err != nil {
return err
}
svc.notify.NotifyBuildStart(bgCtx, app, deployment)
return svc.runBuildAndDeploy(deployCtx, bgCtx, app, deployment)
}
// Rollback rolls back an app to its previous image.
// It stops the current container, starts a new one with the previous image,
// and creates a deployment record for the rollback.
func (svc *Service) Rollback(ctx context.Context, app *models.App) error {
if !app.PreviousImageID.Valid || app.PreviousImageID.String == "" {
return ErrNoPreviousImage
}
// Acquire per-app deployment lock
if !svc.tryLockApp(app.ID) {
return ErrDeploymentInProgress
}
defer svc.unlockApp(app.ID)
bgCtx := context.WithoutCancel(ctx)
deployment, err := svc.createRollbackDeployment(bgCtx, app)
if err != nil {
return err
}
return svc.executeRollback(ctx, bgCtx, app, deployment)
}
// createRollbackDeployment creates a deployment record for a rollback operation.
func (svc *Service) createRollbackDeployment(
ctx context.Context,
app *models.App,
) (*models.Deployment, error) {
deployment := models.NewDeployment(svc.db)
deployment.AppID = app.ID
deployment.Status = models.DeploymentStatusDeploying
deployment.ImageID = sql.NullString{String: app.PreviousImageID.String, Valid: true}
saveErr := deployment.Save(ctx)
if saveErr != nil {
return nil, fmt.Errorf("failed to create rollback deployment: %w", saveErr)
}
_ = deployment.AppendLog(ctx, "Rolling back to previous image: "+app.PreviousImageID.String)
return deployment, nil
}
// executeRollback performs the container swap for a rollback.
func (svc *Service) executeRollback(
ctx context.Context,
bgCtx context.Context,
app *models.App,
deployment *models.Deployment,
) error {
previousImageID := app.PreviousImageID.String
svc.removeOldContainer(ctx, app, deployment)
rollbackOpts, err := svc.buildContainerOptions(ctx, app, deployment.ID)
if err != nil {
svc.failDeployment(bgCtx, app, deployment, err)
return fmt.Errorf("failed to build container options: %w", err)
}
rollbackOpts.Image = previousImageID
containerID, err := svc.docker.CreateContainer(ctx, rollbackOpts)
if err != nil {
svc.failDeployment(bgCtx, app, deployment, fmt.Errorf("failed to create rollback container: %w", err))
return fmt.Errorf("failed to create rollback container: %w", err)
}
deployment.ContainerID = sql.NullString{String: containerID, Valid: true}
_ = deployment.AppendLog(bgCtx, "Rollback container created: "+containerID)
startErr := svc.docker.StartContainer(ctx, containerID)
if startErr != nil {
svc.failDeployment(bgCtx, app, deployment, fmt.Errorf("failed to start rollback container: %w", startErr))
return fmt.Errorf("failed to start rollback container: %w", startErr)
}
_ = deployment.AppendLog(bgCtx, "Rollback container started")
currentImageID := app.ImageID
app.ImageID = sql.NullString{String: previousImageID, Valid: true}
app.PreviousImageID = currentImageID
app.Status = models.AppStatusRunning
saveErr := app.Save(bgCtx)
if saveErr != nil {
return fmt.Errorf("failed to update app after rollback: %w", saveErr)
}
_ = deployment.MarkFinished(bgCtx, models.DeploymentStatusSuccess)
_ = deployment.AppendLog(bgCtx, "Rollback complete")
svc.log.Info("rollback completed", "app", app.Name, "image", previousImageID)
return nil
}
// runBuildAndDeploy executes the build and deploy phases, handling cancellation.
func (svc *Service) runBuildAndDeploy(
deployCtx context.Context,
bgCtx context.Context,
app *models.App,
deployment *models.Deployment,
) error {
// Build phase with timeout
imageID, err := svc.buildImageWithTimeout(deployCtx, app, deployment)
if err != nil {
cancelErr := svc.checkCancelled(deployCtx, bgCtx, app, deployment)
if cancelErr != nil {
return cancelErr
}
return err
}
svc.notify.NotifyBuildSuccess(bgCtx, app, deployment)
// Deploy phase with timeout
err = svc.deployContainerWithTimeout(deployCtx, app, deployment, imageID)
if err != nil {
cancelErr := svc.checkCancelled(deployCtx, bgCtx, app, deployment)
if cancelErr != nil {
return cancelErr
}
return err
}
// Save current image as previous before updating to new one
if app.ImageID.Valid && app.ImageID.String != "" {
app.PreviousImageID = app.ImageID
}
err = svc.updateAppRunning(bgCtx, app, imageID)
if err != nil {
return err
}
// Use context.WithoutCancel to ensure health check completes even if
// the parent context is cancelled (e.g., HTTP request ends).
go svc.checkHealthAfterDelay(bgCtx, app, deployment)
return nil
}
// buildImageWithTimeout runs the build phase with a timeout.
func (svc *Service) buildImageWithTimeout(
ctx context.Context,
app *models.App,
deployment *models.Deployment,
) (string, error) {
buildCtx, cancel := context.WithTimeout(ctx, buildTimeout)
defer cancel()
imageID, err := svc.buildImage(buildCtx, app, deployment)
if err != nil {
if errors.Is(buildCtx.Err(), context.DeadlineExceeded) {
timeoutErr := fmt.Errorf("%w after %v", ErrBuildTimeout, buildTimeout)
svc.failDeployment(ctx, app, deployment, timeoutErr)
return "", timeoutErr
}
return "", err
}
return imageID, nil
}
// deployContainerWithTimeout runs the deploy phase with a timeout.
// It removes the old container and starts the new one.
func (svc *Service) deployContainerWithTimeout(
ctx context.Context,
app *models.App,
deployment *models.Deployment,
imageID string,
) error {
deployCtx, cancel := context.WithTimeout(ctx, deployTimeout)
defer cancel()
err := svc.updateDeploymentDeploying(deployCtx, deployment)
if err != nil {
return err
}
// Remove old container first to free up the name
svc.removeOldContainer(deployCtx, app, deployment)
// Create and start the new container
_, err = svc.createAndStartContainer(deployCtx, app, deployment, imageID)
if err != nil {
if errors.Is(deployCtx.Err(), context.DeadlineExceeded) {
timeoutErr := fmt.Errorf("%w after %v", ErrDeployTimeout, deployTimeout)
svc.failDeployment(ctx, app, deployment, timeoutErr)
return timeoutErr
}
return err
}
return nil
}
// cleanupStuckDeployments marks any deployments stuck in building/deploying as failed.
func (svc *Service) cleanupStuckDeployments(ctx context.Context) error {
// First, update the deployments
deployQuery := `
UPDATE deployments
SET status = ?, finished_at = ?, logs = COALESCE(logs, '') || ?
WHERE status IN (?, ?)
`
msg := "\n[System] Deployment marked as failed: process was interrupted\n"
_, err := svc.db.DB().ExecContext(
ctx,
deployQuery,
models.DeploymentStatusFailed,
time.Now(),
msg,
models.DeploymentStatusBuilding,
models.DeploymentStatusDeploying,
)
if err != nil {
svc.log.Error("failed to cleanup stuck deployments", "error", err)
return fmt.Errorf("failed to cleanup stuck deployments: %w", err)
}
// Also update app status for apps that were stuck in building
appQuery := `
UPDATE apps
SET status = ?, updated_at = ?
WHERE status = ?
`
_, err = svc.db.DB().ExecContext(
ctx,
appQuery,
models.AppStatusError,
time.Now(),
models.AppStatusBuilding,
)
if err != nil {
svc.log.Error("failed to cleanup stuck app statuses", "error", err)
return fmt.Errorf("failed to cleanup stuck app statuses: %w", err)
}
svc.log.Info("cleaned up stuck deployments and app statuses")
return nil
}
func (svc *Service) getAppLock(appID string) *sync.Mutex {
lock, _ := svc.appLocks.LoadOrStore(appID, &sync.Mutex{})
mu, ok := lock.(*sync.Mutex)
if !ok {
// This should never happen, but handle it gracefully
newMu := &sync.Mutex{}
svc.appLocks.Store(appID, newMu)
return newMu
}
return mu
}
func (svc *Service) tryLockApp(appID string) bool {
return svc.getAppLock(appID).TryLock()
}
func (svc *Service) unlockApp(appID string) {
svc.getAppLock(appID).Unlock()
}
// cancelActiveDeploy cancels any in-progress deployment for the given app
// and waits for it to finish before returning.
func (svc *Service) cancelActiveDeploy(appID string) {
val, ok := svc.activeDeploys.Load(appID)
if !ok {
return
}
ad, ok := val.(*activeDeploy)
if !ok {
return
}
svc.log.Info("cancelling in-progress deployment", "app_id", appID)
ad.cancel()
<-ad.done
}
// checkCancelled checks if the deploy context was cancelled (by a newer deploy)
// and if so, marks the deployment as cancelled. Returns ErrDeployCancelled or nil.
func (svc *Service) checkCancelled(
deployCtx context.Context,
bgCtx context.Context,
app *models.App,
deployment *models.Deployment,
) error {
if !errors.Is(deployCtx.Err(), context.Canceled) {
return nil
}
svc.log.Info("deployment cancelled by newer deploy", "app", app.Name)
_ = deployment.MarkFinished(bgCtx, models.DeploymentStatusCancelled)
return ErrDeployCancelled
}
func (svc *Service) fetchWebhookEvent(
ctx context.Context,
webhookEventID *int64,
) *models.WebhookEvent {
if webhookEventID == nil {
return nil
}
event, err := models.FindWebhookEvent(ctx, svc.db, *webhookEventID)
if err != nil {
svc.log.Warn("failed to fetch webhook event", "error", err)
return nil
}
return event
}
func (svc *Service) logWebhookPayload(
ctx context.Context,
deployment *models.Deployment,
webhookEvent *models.WebhookEvent,
) {
if webhookEvent == nil || !webhookEvent.Payload.Valid {
return
}
_ = deployment.AppendLog(ctx, "Webhook received:\n"+webhookEvent.Payload.String+"\n")
}
func (svc *Service) createDeploymentRecord(
ctx context.Context,
app *models.App,
webhookEventID *int64,
webhookEvent *models.WebhookEvent,
) (*models.Deployment, error) {
deployment := models.NewDeployment(svc.db)
deployment.AppID = app.ID
if webhookEventID != nil {
deployment.WebhookEventID = sql.NullInt64{
Int64: *webhookEventID,
Valid: true,
}
}
// Set commit SHA and URL from webhook event
if webhookEvent != nil {
if webhookEvent.CommitSHA.Valid {
deployment.CommitSHA = webhookEvent.CommitSHA
}
if webhookEvent.CommitURL.Valid {
deployment.CommitURL = webhookEvent.CommitURL
}
}
deployment.Status = models.DeploymentStatusBuilding
saveErr := deployment.Save(ctx)
if saveErr != nil {
return nil, fmt.Errorf("failed to create deployment: %w", saveErr)
}
return deployment, nil
}
func (svc *Service) updateAppStatusBuilding(
ctx context.Context,
app *models.App,
) error {
app.Status = models.AppStatusBuilding
saveErr := app.Save(ctx)
if saveErr != nil {
return fmt.Errorf("failed to update app status: %w", saveErr)
}
return nil
}
func (svc *Service) buildImage(
ctx context.Context,
app *models.App,
deployment *models.Deployment,
) (string, error) {
workDir, cleanup, err := svc.cloneRepository(ctx, app, deployment)
if err != nil {
return "", err
}
defer cleanup()
imageTag := fmt.Sprintf("upaas-%s:%d", app.Name, deployment.ID)
// Create log writer that flushes build output to deployment logs every second
logWriter := newDeploymentLogWriter(ctx, deployment)
defer logWriter.Close()
// BuildImage creates a tar archive from the local filesystem,
// so it needs the container path where files exist, not the host path.
imageID, err := svc.docker.BuildImage(ctx, docker.BuildImageOptions{
ContextDir: workDir,
DockerfilePath: app.DockerfilePath,
Tags: []string{imageTag},
LogWriter: logWriter,
})
if err != nil {
svc.notify.NotifyBuildFailed(ctx, app, deployment, err)
svc.failDeployment(
ctx,
app,
deployment,
fmt.Errorf("failed to build image: %w", err),
)
return "", fmt.Errorf("failed to build image: %w", err)
}
deployment.ImageID = sql.NullString{String: imageID, Valid: true}
_ = deployment.AppendLog(ctx, "Image built: "+imageID)
return imageID, nil
}
func (svc *Service) cloneRepository(
ctx context.Context,
app *models.App,
deployment *models.Deployment,
) (string, func(), error) {
// Use a subdirectory of DataDir for builds since it's mounted from the host
// and accessible to Docker for bind mounts (unlike /tmp inside the container).
// Structure: builds/<appname>/<deployment-id>-<random>/
// deploy_key <- SSH key for cloning
// work/ <- cloned repository
appBuildsDir := filepath.Join(svc.config.DataDir, "builds", app.Name)
err := os.MkdirAll(appBuildsDir, buildsDirPermissions)
if err != nil {
svc.failDeployment(ctx, app, deployment, fmt.Errorf("failed to create builds dir: %w", err))
return "", nil, fmt.Errorf("failed to create builds dir: %w", err)
}
buildDir, err := os.MkdirTemp(appBuildsDir, fmt.Sprintf("%d-*", deployment.ID))
if err != nil {
svc.failDeployment(ctx, app, deployment, fmt.Errorf("failed to create temp dir: %w", err))
return "", nil, fmt.Errorf("failed to create temp dir: %w", err)
}
cleanup := func() { _ = os.RemoveAll(buildDir) }
// Calculate the host path for Docker bind mounts.
// When upaas runs in a container, DataDir is the container path but Docker
// needs the host path. HostDataDir provides the host-side equivalent.
// CloneRepo needs both: container path for writing files, host path for Docker mounts.
hostBuildDir := svc.containerToHostPath(buildDir)
// Get commit SHA from deployment if available
var commitSHA string
if deployment.CommitSHA.Valid {
commitSHA = deployment.CommitSHA.String
}
cloneResult, cloneErr := svc.docker.CloneRepo(
ctx,
app.RepoURL,
app.Branch,
commitSHA,
app.SSHPrivateKey,
buildDir,
hostBuildDir,
)
if cloneErr != nil {
cleanup()
svc.failDeployment(ctx, app, deployment, fmt.Errorf("failed to clone repo: %w", cloneErr))
return "", nil, fmt.Errorf("failed to clone repo: %w", cloneErr)
}
svc.processCloneResult(ctx, app, deployment, cloneResult, commitSHA)
// Return the 'work' subdirectory where the repo was cloned
workDir := filepath.Join(buildDir, "work")
return workDir, cleanup, nil
}
// processCloneResult handles the result of a git clone operation.
// It sets the commit SHA on the deployment if not already set, and logs the result.
func (svc *Service) processCloneResult(
ctx context.Context,
app *models.App,
deployment *models.Deployment,
cloneResult *docker.CloneResult,
originalCommitSHA string,
) {
// Set commit SHA from clone result if not already set (e.g., manual deploys)
if cloneResult != nil && cloneResult.CommitSHA != "" && !deployment.CommitSHA.Valid {
deployment.CommitSHA = sql.NullString{String: cloneResult.CommitSHA, Valid: true}
_ = deployment.Save(ctx)
}
// Log clone success with git output
actualCommitSHA := originalCommitSHA
if cloneResult != nil && cloneResult.CommitSHA != "" {
actualCommitSHA = cloneResult.CommitSHA
}
if actualCommitSHA != "" {
_ = deployment.AppendLog(ctx, "Repository cloned at commit "+actualCommitSHA)
} else {
_ = deployment.AppendLog(ctx, "Repository cloned (branch: "+app.Branch+")")
}
if cloneResult != nil && cloneResult.Output != "" {
_ = deployment.AppendLog(ctx, cloneResult.Output)
}
}
// containerToHostPath converts a container-local path to the equivalent host path.
// This is needed when upaas runs inside a container but needs to pass paths to Docker.
func (svc *Service) containerToHostPath(containerPath string) string {
if svc.config.HostDataDir == svc.config.DataDir {
return containerPath
}
// Get relative path from DataDir
relPath, err := filepath.Rel(svc.config.DataDir, containerPath)
if err != nil {
// Fall back to original path if we can't compute relative path
return containerPath
}
return filepath.Join(svc.config.HostDataDir, relPath)
}
func (svc *Service) updateDeploymentDeploying(
ctx context.Context,
deployment *models.Deployment,
) error {
deployment.Status = models.DeploymentStatusDeploying
saveErr := deployment.Save(ctx)
if saveErr != nil {
return fmt.Errorf("failed to update deployment status: %w", saveErr)
}
return nil
}
// removeOldContainer stops and removes the old container before deploying a new one.
func (svc *Service) removeOldContainer(
ctx context.Context,
app *models.App,
deployment *models.Deployment,
) {
containerInfo, err := svc.docker.FindContainerByAppID(ctx, app.ID)
if err != nil || containerInfo == nil {
return
}
svc.log.Info("removing old container", "id", containerInfo.ID)
if containerInfo.Running {
stopErr := svc.docker.StopContainer(ctx, containerInfo.ID)
if stopErr != nil {
svc.log.Warn("failed to stop old container", "error", stopErr)
}
}
removeErr := svc.docker.RemoveContainer(ctx, containerInfo.ID, true)
if removeErr != nil {
svc.log.Warn("failed to remove old container", "error", removeErr)
}
_ = deployment.AppendLog(ctx, "Old container removed: "+containerInfo.ID[:12])
}
func (svc *Service) createAndStartContainer(
ctx context.Context,
app *models.App,
deployment *models.Deployment,
_ string,
) (string, error) {
containerOpts, err := svc.buildContainerOptions(ctx, app, deployment.ID)
if err != nil {
svc.failDeployment(ctx, app, deployment, err)
return "", err
}
containerID, err := svc.docker.CreateContainer(ctx, containerOpts)
if err != nil {
svc.notify.NotifyDeployFailed(ctx, app, deployment, err)
svc.failDeployment(
ctx,
app,
deployment,
fmt.Errorf("failed to create container: %w", err),
)
return "", fmt.Errorf("failed to create container: %w", err)
}
deployment.ContainerID = sql.NullString{String: containerID, Valid: true}
_ = deployment.AppendLog(ctx, "Container created: "+containerID)
startErr := svc.docker.StartContainer(ctx, containerID)
if startErr != nil {
svc.notify.NotifyDeployFailed(ctx, app, deployment, startErr)
svc.failDeployment(
ctx,
app,
deployment,
fmt.Errorf("failed to start container: %w", startErr),
)
return "", fmt.Errorf("failed to start container: %w", startErr)
}
_ = deployment.AppendLog(ctx, "Container started")
return containerID, nil
}
func (svc *Service) buildContainerOptions(
ctx context.Context,
app *models.App,
deploymentID int64,
) (docker.CreateContainerOptions, error) {
envVars, err := app.GetEnvVars(ctx)
if err != nil {
return docker.CreateContainerOptions{}, fmt.Errorf("failed to get env vars: %w", err)
}
labels, err := app.GetLabels(ctx)
if err != nil {
return docker.CreateContainerOptions{}, fmt.Errorf("failed to get labels: %w", err)
}
volumes, err := app.GetVolumes(ctx)
if err != nil {
return docker.CreateContainerOptions{}, fmt.Errorf("failed to get volumes: %w", err)
}
ports, err := app.GetPorts(ctx)
if err != nil {
return docker.CreateContainerOptions{}, fmt.Errorf("failed to get ports: %w", err)
}
envMap := make(map[string]string, len(envVars))
for _, envVar := range envVars {
envMap[envVar.Key] = envVar.Value
}
network := ""
if app.DockerNetwork.Valid {
network = app.DockerNetwork.String
}
return docker.CreateContainerOptions{
Name: "upaas-" + app.Name,
Image: fmt.Sprintf("upaas-%s:%d", app.Name, deploymentID),
Env: envMap,
Labels: buildLabelMap(app, labels),
Volumes: buildVolumeMounts(volumes),
Ports: buildPortMappings(ports),
Network: network,
}, nil
}
func buildLabelMap(app *models.App, labels []*models.Label) map[string]string {
labelMap := make(map[string]string, len(labels)+upaasLabelCount)
for _, label := range labels {
labelMap[label.Key] = label.Value
}
// Add the upaas.id label to identify this container
labelMap[docker.LabelUpaasID] = app.ID
return labelMap
}
func buildVolumeMounts(volumes []*models.Volume) []docker.VolumeMount {
mounts := make([]docker.VolumeMount, 0, len(volumes))
for _, vol := range volumes {
mounts = append(mounts, docker.VolumeMount{
HostPath: vol.HostPath,
ContainerPath: vol.ContainerPath,
ReadOnly: vol.ReadOnly,
})
}
return mounts
}
func buildPortMappings(ports []*models.Port) []docker.PortMapping {
mappings := make([]docker.PortMapping, 0, len(ports))
for _, port := range ports {
mappings = append(mappings, docker.PortMapping{
HostPort: port.HostPort,
ContainerPort: port.ContainerPort,
Protocol: string(port.Protocol),
})
}
return mappings
}
func (svc *Service) updateAppRunning(
ctx context.Context,
app *models.App,
imageID string,
) error {
app.ImageID = sql.NullString{String: imageID, Valid: true}
app.Status = models.AppStatusRunning
saveErr := app.Save(ctx)
if saveErr != nil {
return fmt.Errorf("failed to update app: %w", saveErr)
}
return nil
}
func (svc *Service) checkHealthAfterDelay(
ctx context.Context,
app *models.App,
deployment *models.Deployment,
) {
svc.log.Info(
"waiting 60 seconds to check container health",
"app", app.Name,
)
time.Sleep(healthCheckDelaySeconds * time.Second)
// Reload app to get current state
reloadedApp, err := models.FindApp(ctx, svc.db, app.ID)
if err != nil || reloadedApp == nil {
svc.log.Error("failed to reload app for health check", "error", err)
return
}
containerInfo, containerErr := svc.docker.FindContainerByAppID(ctx, app.ID)
if containerErr != nil || containerInfo == nil {
return
}
healthy, err := svc.docker.IsContainerHealthy(ctx, containerInfo.ID)
if err != nil {
svc.log.Error("failed to check container health", "error", err)
svc.notify.NotifyDeployFailed(ctx, reloadedApp, deployment, err)
_ = deployment.MarkFinished(ctx, models.DeploymentStatusFailed)
svc.writeLogsToFile(reloadedApp, deployment)
reloadedApp.Status = models.AppStatusError
_ = reloadedApp.Save(ctx)
return
}
if healthy {
svc.log.Info("container healthy after 60 seconds", "app", reloadedApp.Name)
svc.notify.NotifyDeploySuccess(ctx, reloadedApp, deployment)
_ = deployment.MarkFinished(ctx, models.DeploymentStatusSuccess)
svc.writeLogsToFile(reloadedApp, deployment)
} else {
svc.log.Warn(
"container unhealthy after 60 seconds",
"app", reloadedApp.Name,
)
svc.notify.NotifyDeployFailed(ctx, reloadedApp, deployment, ErrContainerUnhealthy)
_ = deployment.MarkFinished(ctx, models.DeploymentStatusFailed)
svc.writeLogsToFile(reloadedApp, deployment)
reloadedApp.Status = models.AppStatusError
_ = reloadedApp.Save(ctx)
}
}
func (svc *Service) failDeployment(
ctx context.Context,
app *models.App,
deployment *models.Deployment,
deployErr error,
) {
svc.log.Error("deployment failed", "app", app.Name, "error", deployErr)
_ = deployment.AppendLog(ctx, "ERROR: "+deployErr.Error())
_ = deployment.MarkFinished(ctx, models.DeploymentStatusFailed)
svc.writeLogsToFile(app, deployment)
app.Status = models.AppStatusError
_ = app.Save(ctx)
}
// writeLogsToFile writes the deployment logs to a file on disk.
// Structure: DataDir/logs/<hostname>/<appname>/<appname>_<sha>_<timestamp>.log.txt
func (svc *Service) writeLogsToFile(app *models.App, deployment *models.Deployment) {
if !deployment.Logs.Valid || deployment.Logs.String == "" {
return
}
logPath := svc.GetLogFilePath(app, deployment)
if logPath == "" {
return
}
// Ensure directory exists
logDir := filepath.Dir(logPath)
err := os.MkdirAll(logDir, logsDirPermissions)
if err != nil {
svc.log.Error("failed to create logs directory", "error", err, "path", logDir)
return
}
err = os.WriteFile(logPath, []byte(deployment.Logs.String), logFilePermissions)
if err != nil {
svc.log.Error("failed to write log file", "error", err, "path", logPath)
return
}
svc.log.Info("wrote deployment logs to file", "path", logPath)
}