Files
upaas/internal/service/deploy/deploy.go
clawbot 9627942573
All checks were successful
Check / check (pull_request) Successful in 3m16s
fix: move writeLogsToFile doc comment to correct position
The buildRegistryAuths function was inserted between the writeLogsToFile
doc comment and the writeLogsToFile function body, causing Go to treat
the writeLogsToFile comment as part of buildRegistryAuths godoc.

Move the writeLogsToFile comment to sit directly above its function,
and keep only the buildRegistryAuths comment above buildRegistryAuths.
2026-03-17 02:39:38 -07:00

1297 lines
35 KiB
Go

// Package deploy provides deployment services.
package deploy
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"log/slog"
"os"
"path/filepath"
"strings"
"sync"
"time"
"go.uber.org/fx"
"sneak.berlin/go/upaas/internal/config"
"sneak.berlin/go/upaas/internal/database"
"sneak.berlin/go/upaas/internal/docker"
"sneak.berlin/go/upaas/internal/logger"
"sneak.berlin/go/upaas/internal/models"
"sneak.berlin/go/upaas/internal/service/notify"
)
// Time constants.
const (
healthCheckDelaySeconds = 60
// upaasLabelCount is the number of upaas-specific labels added to containers.
upaasLabelCount = 1
// buildsDirPermissions is the permission mode for the builds directory.
buildsDirPermissions = 0o750
// buildTimeout is the maximum duration for the build phase.
buildTimeout = 30 * time.Minute
// deployTimeout is the maximum duration for the deploy phase (container swap).
deployTimeout = 5 * time.Minute
)
// Sentinel errors for deployment failures.
var (
// ErrContainerUnhealthy indicates the container failed health check.
ErrContainerUnhealthy = errors.New("container unhealthy after 60 seconds")
// ErrDeploymentInProgress indicates another deployment is already running.
ErrDeploymentInProgress = errors.New("deployment already in progress for this app")
// ErrDeployCancelled indicates the deployment was cancelled by a newer deploy.
ErrDeployCancelled = errors.New("deployment cancelled by newer deploy")
// ErrBuildTimeout indicates the build phase exceeded the timeout.
ErrBuildTimeout = errors.New("build timeout exceeded")
// ErrDeployTimeout indicates the deploy phase exceeded the timeout.
ErrDeployTimeout = errors.New("deploy timeout exceeded")
// ErrNoPreviousImage indicates there is no previous image to rollback to.
ErrNoPreviousImage = errors.New("no previous image available for rollback")
)
// logFlushInterval is how often to flush buffered logs to the database.
const logFlushInterval = time.Second
// logsDirPermissions is the permission mode for the logs directory.
const logsDirPermissions = 0o750
// logFilePermissions is the permission mode for log files.
const logFilePermissions = 0o640
// logTimestampFormat is the format for log file timestamps.
const logTimestampFormat = "20060102T150405Z"
// logFileShortSHALength is the number of characters to use for commit SHA in log filenames.
const logFileShortSHALength = 12
// dockerLogMessage represents a Docker build log message.
type dockerLogMessage struct {
Stream string `json:"stream"`
Error string `json:"error"`
}
// deploymentLogWriter implements io.Writer and periodically flushes to deployment logs.
// It parses Docker JSON log format and extracts the stream content.
type deploymentLogWriter struct {
deployment *models.Deployment
buffer bytes.Buffer
lineBuffer bytes.Buffer // buffer for incomplete lines
mu sync.Mutex
done chan struct{}
flushed sync.WaitGroup // waits for flush goroutine to finish
flushCtx context.Context //nolint:containedctx // needed for async flush goroutine
}
func newDeploymentLogWriter(ctx context.Context, deployment *models.Deployment) *deploymentLogWriter {
w := &deploymentLogWriter{
deployment: deployment,
done: make(chan struct{}),
flushCtx: ctx,
}
w.flushed.Add(1)
go w.runFlushLoop()
return w
}
// Write implements io.Writer.
// It parses Docker JSON log format and extracts the stream content.
func (w *deploymentLogWriter) Write(p []byte) (int, error) {
w.mu.Lock()
defer w.mu.Unlock()
// Add incoming data to line buffer
w.lineBuffer.Write(p)
// Process complete lines
data := w.lineBuffer.Bytes()
lastNewline := bytes.LastIndexByte(data, '\n')
if lastNewline == -1 {
// No complete lines yet
return len(p), nil
}
// Process all complete lines
completeData := data[:lastNewline+1]
remaining := data[lastNewline+1:]
for line := range bytes.SplitSeq(completeData, []byte{'\n'}) {
w.processLine(line)
}
// Keep any remaining incomplete line data
w.lineBuffer.Reset()
if len(remaining) > 0 {
w.lineBuffer.Write(remaining)
}
return len(p), nil
}
// Close stops the flush loop, waits for the final flush to complete.
func (w *deploymentLogWriter) Close() {
close(w.done)
w.flushed.Wait()
}
func (w *deploymentLogWriter) runFlushLoop() {
defer w.flushed.Done()
ticker := time.NewTicker(logFlushInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
w.doFlush()
case <-w.done:
w.doFlush()
return
}
}
}
func (w *deploymentLogWriter) doFlush() {
w.mu.Lock()
data := w.buffer.String()
w.buffer.Reset()
w.mu.Unlock()
if data != "" {
_ = w.deployment.AppendLog(w.flushCtx, data)
}
}
// processLine parses a JSON log line and extracts the stream content.
func (w *deploymentLogWriter) processLine(line []byte) {
if len(line) == 0 {
return
}
var msg dockerLogMessage
err := json.Unmarshal(line, &msg)
if err != nil {
// Not valid JSON, write as-is
w.buffer.Write(line)
w.buffer.WriteByte('\n')
return
}
if msg.Error != "" {
w.buffer.WriteString("ERROR: ")
w.buffer.WriteString(msg.Error)
w.buffer.WriteByte('\n')
}
if msg.Stream != "" {
w.buffer.WriteString(msg.Stream)
}
}
// ServiceParams contains dependencies for Service.
type ServiceParams struct {
fx.In
Logger *logger.Logger
Config *config.Config
Database *database.Database
Docker *docker.Client
Notify *notify.Service
}
// activeDeploy tracks a running deployment so it can be cancelled.
type activeDeploy struct {
cancel context.CancelFunc
done chan struct{}
}
// Service provides deployment functionality.
type Service struct {
log *slog.Logger
db *database.Database
docker *docker.Client
notify *notify.Service
config *config.Config
params *ServiceParams
activeDeploys sync.Map // map[string]*activeDeploy - per-app active deployment tracking
appLocks sync.Map // map[string]*sync.Mutex - per-app deployment locks
}
// New creates a new deploy Service.
func New(lc fx.Lifecycle, params ServiceParams) (*Service, error) {
svc := &Service{
log: params.Logger.Get(),
db: params.Database,
docker: params.Docker,
notify: params.Notify,
config: params.Config,
params: &params,
}
if lc != nil {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return svc.cleanupStuckDeployments(ctx)
},
})
}
return svc, nil
}
// GetBuildDir returns the build directory path for an app.
func (svc *Service) GetBuildDir(appName string) string {
return filepath.Join(svc.config.DataDir, "builds", appName)
}
// GetLogFilePath returns the path to the log file for a deployment.
// Returns empty string if the path cannot be determined.
func (svc *Service) GetLogFilePath(app *models.App, deployment *models.Deployment) string {
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown"
}
// Get commit SHA
sha := ""
if deployment.CommitSHA.Valid && deployment.CommitSHA.String != "" {
sha = deployment.CommitSHA.String
if len(sha) > logFileShortSHALength {
sha = sha[:logFileShortSHALength]
}
}
// Use started_at timestamp
timestamp := deployment.StartedAt.UTC().Format(logTimestampFormat)
// Build filename: appname_sha_timestamp.log.txt (or appname_timestamp.log.txt if no SHA)
var filename string
if sha != "" {
filename = fmt.Sprintf("%s_%s_%s.log.txt", app.Name, sha, timestamp)
} else {
filename = fmt.Sprintf("%s_%s.log.txt", app.Name, timestamp)
}
return filepath.Join(svc.config.DataDir, "logs", hostname, app.Name, filename)
}
// HasActiveDeploy returns true if there is an active deployment for the given app.
func (svc *Service) HasActiveDeploy(appID string) bool {
_, ok := svc.activeDeploys.Load(appID)
return ok
}
// CancelDeploy cancels any in-progress deployment for the given app
// and waits for it to finish before returning. Returns true if a deployment
// was cancelled, false if there was nothing to cancel.
func (svc *Service) CancelDeploy(appID string) bool {
if !svc.HasActiveDeploy(appID) {
return false
}
svc.cancelActiveDeploy(appID)
return true
}
// Deploy deploys an app. If cancelExisting is true (e.g. webhook-triggered),
// any in-progress deploy for the same app will be cancelled before starting.
// If cancelExisting is false and a deploy is in progress, ErrDeploymentInProgress is returned.
func (svc *Service) Deploy(
ctx context.Context,
app *models.App,
webhookEventID *int64,
cancelExisting bool,
) error {
if cancelExisting {
svc.cancelActiveDeploy(app.ID)
}
// Try to acquire per-app deployment lock
if !svc.tryLockApp(app.ID) {
svc.log.Warn("deployment already in progress", "app", app.Name)
return ErrDeploymentInProgress
}
defer svc.unlockApp(app.ID)
// Set up cancellable context and register as active deploy
deployCtx, cancel := context.WithCancel(ctx)
done := make(chan struct{})
ad := &activeDeploy{cancel: cancel, done: done}
svc.activeDeploys.Store(app.ID, ad)
defer func() {
cancel()
close(done)
svc.activeDeploys.Delete(app.ID)
}()
// Fetch webhook event and create deployment record
webhookEvent := svc.fetchWebhookEvent(deployCtx, webhookEventID)
// Use a background context for DB operations that must complete regardless of cancellation
bgCtx := context.WithoutCancel(deployCtx)
deployment, err := svc.createDeploymentRecord(bgCtx, app, webhookEventID, webhookEvent)
if err != nil {
return err
}
svc.logWebhookPayload(bgCtx, deployment, webhookEvent)
err = svc.updateAppStatusBuilding(bgCtx, app)
if err != nil {
return err
}
svc.notify.NotifyBuildStart(bgCtx, app, deployment)
return svc.runBuildAndDeploy(deployCtx, bgCtx, app, deployment)
}
// Rollback rolls back an app to its previous image.
// It stops the current container, starts a new one with the previous image,
// and creates a deployment record for the rollback.
func (svc *Service) Rollback(ctx context.Context, app *models.App) error {
if !app.PreviousImageID.Valid || app.PreviousImageID.String == "" {
return ErrNoPreviousImage
}
// Acquire per-app deployment lock
if !svc.tryLockApp(app.ID) {
return ErrDeploymentInProgress
}
defer svc.unlockApp(app.ID)
bgCtx := context.WithoutCancel(ctx)
deployment, err := svc.createRollbackDeployment(bgCtx, app)
if err != nil {
return err
}
return svc.executeRollback(ctx, bgCtx, app, deployment)
}
// createRollbackDeployment creates a deployment record for a rollback operation.
func (svc *Service) createRollbackDeployment(
ctx context.Context,
app *models.App,
) (*models.Deployment, error) {
deployment := models.NewDeployment(svc.db)
deployment.AppID = app.ID
deployment.Status = models.DeploymentStatusDeploying
deployment.ImageID = sql.NullString{String: app.PreviousImageID.String, Valid: true}
saveErr := deployment.Save(ctx)
if saveErr != nil {
return nil, fmt.Errorf("failed to create rollback deployment: %w", saveErr)
}
_ = deployment.AppendLog(ctx, "Rolling back to previous image: "+app.PreviousImageID.String)
return deployment, nil
}
// executeRollback performs the container swap for a rollback.
func (svc *Service) executeRollback(
ctx context.Context,
bgCtx context.Context,
app *models.App,
deployment *models.Deployment,
) error {
previousImageID := app.PreviousImageID.String
svc.removeOldContainer(ctx, app, deployment)
rollbackOpts, err := svc.buildContainerOptions(ctx, app, docker.ImageID(previousImageID))
if err != nil {
svc.failDeployment(bgCtx, app, deployment, err)
return fmt.Errorf("failed to build container options: %w", err)
}
containerID, err := svc.docker.CreateContainer(ctx, rollbackOpts)
if err != nil {
svc.failDeployment(bgCtx, app, deployment, fmt.Errorf("failed to create rollback container: %w", err))
return fmt.Errorf("failed to create rollback container: %w", err)
}
deployment.ContainerID = sql.NullString{String: containerID.String(), Valid: true}
_ = deployment.AppendLog(bgCtx, "Rollback container created: "+containerID.String())
startErr := svc.docker.StartContainer(ctx, containerID)
if startErr != nil {
svc.failDeployment(bgCtx, app, deployment, fmt.Errorf("failed to start rollback container: %w", startErr))
return fmt.Errorf("failed to start rollback container: %w", startErr)
}
_ = deployment.AppendLog(bgCtx, "Rollback container started")
currentImageID := app.ImageID
app.ImageID = sql.NullString{String: previousImageID, Valid: true}
app.PreviousImageID = currentImageID
app.Status = models.AppStatusRunning
saveErr := app.Save(bgCtx)
if saveErr != nil {
return fmt.Errorf("failed to update app after rollback: %w", saveErr)
}
_ = deployment.MarkFinished(bgCtx, models.DeploymentStatusSuccess)
_ = deployment.AppendLog(bgCtx, "Rollback complete")
svc.log.Info("rollback completed", "app", app.Name, "image", previousImageID)
return nil
}
// runBuildAndDeploy executes the build and deploy phases, handling cancellation.
func (svc *Service) runBuildAndDeploy(
deployCtx context.Context,
bgCtx context.Context,
app *models.App,
deployment *models.Deployment,
) error {
// Build phase with timeout
imageID, err := svc.buildImageWithTimeout(deployCtx, app, deployment)
if err != nil {
cancelErr := svc.checkCancelled(deployCtx, bgCtx, app, deployment, "")
if cancelErr != nil {
return cancelErr
}
return err
}
svc.notify.NotifyBuildSuccess(bgCtx, app, deployment)
// Deploy phase with timeout
err = svc.deployContainerWithTimeout(deployCtx, app, deployment, imageID)
if err != nil {
cancelErr := svc.checkCancelled(deployCtx, bgCtx, app, deployment, imageID)
if cancelErr != nil {
return cancelErr
}
return err
}
// Save current image as previous before updating to new one
if app.ImageID.Valid && app.ImageID.String != "" {
app.PreviousImageID = app.ImageID
}
err = svc.updateAppRunning(bgCtx, app, imageID)
if err != nil {
return err
}
// Use context.WithoutCancel to ensure health check completes even if
// the parent context is cancelled (e.g., HTTP request ends).
go svc.checkHealthAfterDelay(bgCtx, app, deployment)
return nil
}
// buildImageWithTimeout runs the build phase with a timeout.
func (svc *Service) buildImageWithTimeout(
ctx context.Context,
app *models.App,
deployment *models.Deployment,
) (docker.ImageID, error) {
buildCtx, cancel := context.WithTimeout(ctx, buildTimeout)
defer cancel()
imageID, err := svc.buildImage(buildCtx, app, deployment)
if err != nil {
if errors.Is(buildCtx.Err(), context.DeadlineExceeded) {
timeoutErr := fmt.Errorf("%w after %v", ErrBuildTimeout, buildTimeout)
svc.failDeployment(ctx, app, deployment, timeoutErr)
return "", timeoutErr
}
return "", err
}
return imageID, nil
}
// deployContainerWithTimeout runs the deploy phase with a timeout.
// It removes the old container and starts the new one.
func (svc *Service) deployContainerWithTimeout(
ctx context.Context,
app *models.App,
deployment *models.Deployment,
imageID docker.ImageID,
) error {
deployCtx, cancel := context.WithTimeout(ctx, deployTimeout)
defer cancel()
err := svc.updateDeploymentDeploying(deployCtx, deployment)
if err != nil {
return err
}
// Remove old container first to free up the name
svc.removeOldContainer(deployCtx, app, deployment)
// Create and start the new container
_, err = svc.createAndStartContainer(deployCtx, app, deployment, imageID)
if err != nil {
if errors.Is(deployCtx.Err(), context.DeadlineExceeded) {
timeoutErr := fmt.Errorf("%w after %v", ErrDeployTimeout, deployTimeout)
svc.failDeployment(ctx, app, deployment, timeoutErr)
return timeoutErr
}
return err
}
return nil
}
// cleanupStuckDeployments marks any deployments stuck in building/deploying as failed.
func (svc *Service) cleanupStuckDeployments(ctx context.Context) error {
// First, update the deployments
deployQuery := `
UPDATE deployments
SET status = ?, finished_at = ?, logs = COALESCE(logs, '') || ?
WHERE status IN (?, ?)
`
msg := "\n[System] Deployment marked as failed: process was interrupted\n"
_, err := svc.db.DB().ExecContext(
ctx,
deployQuery,
models.DeploymentStatusFailed,
time.Now(),
msg,
models.DeploymentStatusBuilding,
models.DeploymentStatusDeploying,
)
if err != nil {
svc.log.Error("failed to cleanup stuck deployments", "error", err)
return fmt.Errorf("failed to cleanup stuck deployments: %w", err)
}
// Also update app status for apps that were stuck in building
appQuery := `
UPDATE apps
SET status = ?, updated_at = ?
WHERE status = ?
`
_, err = svc.db.DB().ExecContext(
ctx,
appQuery,
models.AppStatusError,
time.Now(),
models.AppStatusBuilding,
)
if err != nil {
svc.log.Error("failed to cleanup stuck app statuses", "error", err)
return fmt.Errorf("failed to cleanup stuck app statuses: %w", err)
}
svc.log.Info("cleaned up stuck deployments and app statuses")
return nil
}
func (svc *Service) getAppLock(appID string) *sync.Mutex {
lock, _ := svc.appLocks.LoadOrStore(appID, &sync.Mutex{})
mu, ok := lock.(*sync.Mutex)
if !ok {
// This should never happen, but handle it gracefully
newMu := &sync.Mutex{}
svc.appLocks.Store(appID, newMu)
return newMu
}
return mu
}
func (svc *Service) tryLockApp(appID string) bool {
return svc.getAppLock(appID).TryLock()
}
func (svc *Service) unlockApp(appID string) {
svc.getAppLock(appID).Unlock()
}
// cancelActiveDeploy cancels any in-progress deployment for the given app
// and waits for it to finish before returning.
func (svc *Service) cancelActiveDeploy(appID string) {
val, ok := svc.activeDeploys.Load(appID)
if !ok {
return
}
ad, ok := val.(*activeDeploy)
if !ok {
return
}
svc.log.Info("cancelling in-progress deployment", "app_id", appID)
ad.cancel()
<-ad.done
}
// checkCancelled checks if the deploy context was cancelled (by a newer deploy)
// and if so, marks the deployment as cancelled and cleans up orphan resources.
// Returns ErrDeployCancelled or nil.
func (svc *Service) checkCancelled(
deployCtx context.Context,
bgCtx context.Context,
app *models.App,
deployment *models.Deployment,
imageID docker.ImageID,
) error {
if !errors.Is(deployCtx.Err(), context.Canceled) {
return nil
}
svc.log.Info("deployment cancelled", "app", app.Name)
svc.cleanupCancelledDeploy(bgCtx, app, deployment, imageID)
_ = deployment.MarkFinished(bgCtx, models.DeploymentStatusCancelled)
return ErrDeployCancelled
}
// cleanupCancelledDeploy removes orphan resources left by a cancelled deployment.
func (svc *Service) cleanupCancelledDeploy(
ctx context.Context,
app *models.App,
deployment *models.Deployment,
imageID docker.ImageID,
) {
// Clean up the intermediate Docker image if one was built
if imageID != "" {
removeErr := svc.docker.RemoveImage(ctx, imageID)
if removeErr != nil {
svc.log.Error("failed to remove image from cancelled deploy",
"error", removeErr, "app", app.Name, "image", imageID)
_ = deployment.AppendLog(ctx, "WARNING: failed to clean up image "+imageID.String()+": "+removeErr.Error())
} else {
svc.log.Info("cleaned up image from cancelled deploy",
"app", app.Name, "image", imageID)
_ = deployment.AppendLog(ctx, "Cleaned up intermediate image: "+imageID.String())
}
}
// Clean up the build directory for this deployment
buildDir := svc.GetBuildDir(app.Name)
entries, err := os.ReadDir(buildDir)
if err != nil {
return
}
prefix := fmt.Sprintf("%d-", deployment.ID)
for _, entry := range entries {
if entry.IsDir() && strings.HasPrefix(entry.Name(), prefix) {
dirPath := filepath.Join(buildDir, entry.Name())
removeErr := os.RemoveAll(dirPath)
if removeErr != nil {
svc.log.Error("failed to remove build dir from cancelled deploy",
"error", removeErr, "path", dirPath)
} else {
svc.log.Info("cleaned up build dir from cancelled deploy",
"app", app.Name, "path", dirPath)
_ = deployment.AppendLog(ctx, "Cleaned up build directory")
}
}
}
}
func (svc *Service) fetchWebhookEvent(
ctx context.Context,
webhookEventID *int64,
) *models.WebhookEvent {
if webhookEventID == nil {
return nil
}
event, err := models.FindWebhookEvent(ctx, svc.db, *webhookEventID)
if err != nil {
svc.log.Warn("failed to fetch webhook event", "error", err)
return nil
}
return event
}
func (svc *Service) logWebhookPayload(
ctx context.Context,
deployment *models.Deployment,
webhookEvent *models.WebhookEvent,
) {
if webhookEvent == nil || !webhookEvent.Payload.Valid {
return
}
_ = deployment.AppendLog(ctx, "Webhook received:\n"+webhookEvent.Payload.String+"\n")
}
func (svc *Service) createDeploymentRecord(
ctx context.Context,
app *models.App,
webhookEventID *int64,
webhookEvent *models.WebhookEvent,
) (*models.Deployment, error) {
deployment := models.NewDeployment(svc.db)
deployment.AppID = app.ID
if webhookEventID != nil {
deployment.WebhookEventID = sql.NullInt64{
Int64: *webhookEventID,
Valid: true,
}
}
// Set commit SHA and URL from webhook event
if webhookEvent != nil {
if webhookEvent.CommitSHA.Valid {
deployment.CommitSHA = webhookEvent.CommitSHA
}
if webhookEvent.CommitURL.Valid {
deployment.CommitURL = webhookEvent.CommitURL
}
}
deployment.Status = models.DeploymentStatusBuilding
saveErr := deployment.Save(ctx)
if saveErr != nil {
return nil, fmt.Errorf("failed to create deployment: %w", saveErr)
}
return deployment, nil
}
func (svc *Service) updateAppStatusBuilding(
ctx context.Context,
app *models.App,
) error {
app.Status = models.AppStatusBuilding
saveErr := app.Save(ctx)
if saveErr != nil {
return fmt.Errorf("failed to update app status: %w", saveErr)
}
return nil
}
func (svc *Service) buildImage(
ctx context.Context,
app *models.App,
deployment *models.Deployment,
) (docker.ImageID, error) {
workDir, cleanup, err := svc.cloneRepository(ctx, app, deployment)
if err != nil {
return "", err
}
defer cleanup()
imageTag := fmt.Sprintf("upaas-%s:%d", app.Name, deployment.ID)
// Create log writer that flushes build output to deployment logs every second
logWriter := newDeploymentLogWriter(ctx, deployment)
defer logWriter.Close()
// Fetch registry credentials for private base images
registryAuths, err := svc.buildRegistryAuths(ctx, app)
if err != nil {
svc.log.Warn("failed to fetch registry credentials", "error", err, "app", app.Name)
// Continue without auth — public images will still work
}
// BuildImage creates a tar archive from the local filesystem,
// so it needs the container path where files exist, not the host path.
imageID, err := svc.docker.BuildImage(ctx, docker.BuildImageOptions{
ContextDir: workDir,
DockerfilePath: app.DockerfilePath,
Tags: []string{imageTag},
LogWriter: logWriter,
RegistryAuths: registryAuths,
})
if err != nil {
svc.notify.NotifyBuildFailed(ctx, app, deployment, err)
svc.failDeployment(
ctx,
app,
deployment,
fmt.Errorf("failed to build image: %w", err),
)
return "", fmt.Errorf("failed to build image: %w", err)
}
deployment.ImageID = sql.NullString{String: imageID.String(), Valid: true}
_ = deployment.AppendLog(ctx, "Image built: "+imageID.String())
return imageID, nil
}
func (svc *Service) cloneRepository(
ctx context.Context,
app *models.App,
deployment *models.Deployment,
) (string, func(), error) {
// Use a subdirectory of DataDir for builds since it's mounted from the host
// and accessible to Docker for bind mounts (unlike /tmp inside the container).
// Structure: builds/<appname>/<deployment-id>-<random>/
// deploy_key <- SSH key for cloning
// work/ <- cloned repository
appBuildsDir := filepath.Join(svc.config.DataDir, "builds", app.Name)
err := os.MkdirAll(appBuildsDir, buildsDirPermissions)
if err != nil {
svc.failDeployment(ctx, app, deployment, fmt.Errorf("failed to create builds dir: %w", err))
return "", nil, fmt.Errorf("failed to create builds dir: %w", err)
}
buildDir, err := os.MkdirTemp(appBuildsDir, fmt.Sprintf("%d-*", deployment.ID))
if err != nil {
svc.failDeployment(ctx, app, deployment, fmt.Errorf("failed to create temp dir: %w", err))
return "", nil, fmt.Errorf("failed to create temp dir: %w", err)
}
cleanup := func() { _ = os.RemoveAll(buildDir) }
// Calculate the host path for Docker bind mounts.
// When upaas runs in a container, DataDir is the container path but Docker
// needs the host path. HostDataDir provides the host-side equivalent.
// CloneRepo needs both: container path for writing files, host path for Docker mounts.
hostBuildDir := svc.containerToHostPath(buildDir)
// Get commit SHA from deployment if available
var commitSHA string
if deployment.CommitSHA.Valid {
commitSHA = deployment.CommitSHA.String
}
cloneResult, cloneErr := svc.docker.CloneRepo(
ctx,
app.RepoURL,
app.Branch,
commitSHA,
app.SSHPrivateKey,
buildDir,
hostBuildDir,
)
if cloneErr != nil {
cleanup()
svc.failDeployment(ctx, app, deployment, fmt.Errorf("failed to clone repo: %w", cloneErr))
return "", nil, fmt.Errorf("failed to clone repo: %w", cloneErr)
}
svc.processCloneResult(ctx, app, deployment, cloneResult, commitSHA)
// Return the 'work' subdirectory where the repo was cloned
workDir := filepath.Join(buildDir, "work")
return workDir, cleanup, nil
}
// processCloneResult handles the result of a git clone operation.
// It sets the commit SHA on the deployment if not already set, and logs the result.
func (svc *Service) processCloneResult(
ctx context.Context,
app *models.App,
deployment *models.Deployment,
cloneResult *docker.CloneResult,
originalCommitSHA string,
) {
// Set commit SHA from clone result if not already set (e.g., manual deploys)
if cloneResult != nil && cloneResult.CommitSHA != "" && !deployment.CommitSHA.Valid {
deployment.CommitSHA = sql.NullString{String: cloneResult.CommitSHA, Valid: true}
_ = deployment.Save(ctx)
}
// Log clone success with git output
actualCommitSHA := originalCommitSHA
if cloneResult != nil && cloneResult.CommitSHA != "" {
actualCommitSHA = cloneResult.CommitSHA
}
if actualCommitSHA != "" {
_ = deployment.AppendLog(ctx, "Repository cloned at commit "+actualCommitSHA)
} else {
_ = deployment.AppendLog(ctx, "Repository cloned (branch: "+app.Branch+")")
}
if cloneResult != nil && cloneResult.Output != "" {
_ = deployment.AppendLog(ctx, cloneResult.Output)
}
}
// containerToHostPath converts a container-local path to the equivalent host path.
// This is needed when upaas runs inside a container but needs to pass paths to Docker.
func (svc *Service) containerToHostPath(containerPath string) string {
if svc.config.HostDataDir == svc.config.DataDir {
return containerPath
}
// Get relative path from DataDir
relPath, err := filepath.Rel(svc.config.DataDir, containerPath)
if err != nil {
// Fall back to original path if we can't compute relative path
return containerPath
}
return filepath.Join(svc.config.HostDataDir, relPath)
}
func (svc *Service) updateDeploymentDeploying(
ctx context.Context,
deployment *models.Deployment,
) error {
deployment.Status = models.DeploymentStatusDeploying
saveErr := deployment.Save(ctx)
if saveErr != nil {
return fmt.Errorf("failed to update deployment status: %w", saveErr)
}
return nil
}
// removeOldContainer stops and removes the old container before deploying a new one.
func (svc *Service) removeOldContainer(
ctx context.Context,
app *models.App,
deployment *models.Deployment,
) {
containerInfo, err := svc.docker.FindContainerByAppID(ctx, app.ID)
if err != nil || containerInfo == nil {
return
}
svc.log.Info("removing old container", "id", containerInfo.ID)
if containerInfo.Running {
stopErr := svc.docker.StopContainer(ctx, containerInfo.ID)
if stopErr != nil {
svc.log.Warn("failed to stop old container", "error", stopErr)
}
}
removeErr := svc.docker.RemoveContainer(ctx, containerInfo.ID, true)
if removeErr != nil {
svc.log.Warn("failed to remove old container", "error", removeErr)
}
_ = deployment.AppendLog(ctx, "Old container removed: "+string(containerInfo.ID[:12]))
}
func (svc *Service) createAndStartContainer(
ctx context.Context,
app *models.App,
deployment *models.Deployment,
imageID docker.ImageID,
) (docker.ContainerID, error) {
containerOpts, err := svc.buildContainerOptions(ctx, app, imageID)
if err != nil {
svc.failDeployment(ctx, app, deployment, err)
return "", err
}
containerID, err := svc.docker.CreateContainer(ctx, containerOpts)
if err != nil {
svc.notify.NotifyDeployFailed(ctx, app, deployment, err)
svc.failDeployment(
ctx,
app,
deployment,
fmt.Errorf("failed to create container: %w", err),
)
return "", fmt.Errorf("failed to create container: %w", err)
}
deployment.ContainerID = sql.NullString{String: containerID.String(), Valid: true}
_ = deployment.AppendLog(ctx, "Container created: "+containerID.String())
startErr := svc.docker.StartContainer(ctx, containerID)
if startErr != nil {
svc.notify.NotifyDeployFailed(ctx, app, deployment, startErr)
svc.failDeployment(
ctx,
app,
deployment,
fmt.Errorf("failed to start container: %w", startErr),
)
return "", fmt.Errorf("failed to start container: %w", startErr)
}
_ = deployment.AppendLog(ctx, "Container started")
return containerID, nil
}
func (svc *Service) buildContainerOptions(
ctx context.Context,
app *models.App,
imageID docker.ImageID,
) (docker.CreateContainerOptions, error) {
envVars, err := app.GetEnvVars(ctx)
if err != nil {
return docker.CreateContainerOptions{}, fmt.Errorf("failed to get env vars: %w", err)
}
labels, err := app.GetLabels(ctx)
if err != nil {
return docker.CreateContainerOptions{}, fmt.Errorf("failed to get labels: %w", err)
}
volumes, err := app.GetVolumes(ctx)
if err != nil {
return docker.CreateContainerOptions{}, fmt.Errorf("failed to get volumes: %w", err)
}
ports, err := app.GetPorts(ctx)
if err != nil {
return docker.CreateContainerOptions{}, fmt.Errorf("failed to get ports: %w", err)
}
envMap := make(map[string]string, len(envVars))
for _, envVar := range envVars {
envMap[envVar.Key] = envVar.Value
}
network := ""
if app.DockerNetwork.Valid {
network = app.DockerNetwork.String
}
return docker.CreateContainerOptions{
Name: "upaas-" + app.Name,
Image: imageID.String(),
Env: envMap,
Labels: buildLabelMap(app, labels),
Volumes: buildVolumeMounts(volumes),
Ports: buildPortMappings(ports),
Network: network,
}, nil
}
func buildLabelMap(app *models.App, labels []*models.Label) map[string]string {
labelMap := make(map[string]string, len(labels)+upaasLabelCount)
for _, label := range labels {
labelMap[label.Key] = label.Value
}
// Add the upaas.id label to identify this container
labelMap[docker.LabelUpaasID] = app.ID
return labelMap
}
func buildVolumeMounts(volumes []*models.Volume) []docker.VolumeMount {
mounts := make([]docker.VolumeMount, 0, len(volumes))
for _, vol := range volumes {
mounts = append(mounts, docker.VolumeMount{
HostPath: vol.HostPath,
ContainerPath: vol.ContainerPath,
ReadOnly: vol.ReadOnly,
})
}
return mounts
}
func buildPortMappings(ports []*models.Port) []docker.PortMapping {
mappings := make([]docker.PortMapping, 0, len(ports))
for _, port := range ports {
mappings = append(mappings, docker.PortMapping{
HostPort: port.HostPort,
ContainerPort: port.ContainerPort,
Protocol: string(port.Protocol),
})
}
return mappings
}
func (svc *Service) updateAppRunning(
ctx context.Context,
app *models.App,
imageID docker.ImageID,
) error {
app.ImageID = sql.NullString{String: imageID.String(), Valid: true}
app.Status = models.AppStatusRunning
saveErr := app.Save(ctx)
if saveErr != nil {
return fmt.Errorf("failed to update app: %w", saveErr)
}
return nil
}
func (svc *Service) checkHealthAfterDelay(
ctx context.Context,
app *models.App,
deployment *models.Deployment,
) {
svc.log.Info(
"waiting 60 seconds to check container health",
"app", app.Name,
)
time.Sleep(healthCheckDelaySeconds * time.Second)
// Reload app to get current state
reloadedApp, err := models.FindApp(ctx, svc.db, app.ID)
if err != nil || reloadedApp == nil {
svc.log.Error("failed to reload app for health check", "error", err)
return
}
containerInfo, containerErr := svc.docker.FindContainerByAppID(ctx, app.ID)
if containerErr != nil || containerInfo == nil {
return
}
healthy, err := svc.docker.IsContainerHealthy(ctx, containerInfo.ID)
if err != nil {
svc.log.Error("failed to check container health", "error", err)
svc.notify.NotifyDeployFailed(ctx, reloadedApp, deployment, err)
_ = deployment.MarkFinished(ctx, models.DeploymentStatusFailed)
svc.writeLogsToFile(reloadedApp, deployment)
reloadedApp.Status = models.AppStatusError
_ = reloadedApp.Save(ctx)
return
}
if healthy {
svc.log.Info("container healthy after 60 seconds", "app", reloadedApp.Name)
svc.notify.NotifyDeploySuccess(ctx, reloadedApp, deployment)
_ = deployment.MarkFinished(ctx, models.DeploymentStatusSuccess)
svc.writeLogsToFile(reloadedApp, deployment)
} else {
svc.log.Warn(
"container unhealthy after 60 seconds",
"app", reloadedApp.Name,
)
svc.notify.NotifyDeployFailed(ctx, reloadedApp, deployment, ErrContainerUnhealthy)
_ = deployment.MarkFinished(ctx, models.DeploymentStatusFailed)
svc.writeLogsToFile(reloadedApp, deployment)
reloadedApp.Status = models.AppStatusError
_ = reloadedApp.Save(ctx)
}
}
func (svc *Service) failDeployment(
ctx context.Context,
app *models.App,
deployment *models.Deployment,
deployErr error,
) {
svc.log.Error("deployment failed", "app", app.Name, "error", deployErr)
_ = deployment.AppendLog(ctx, "ERROR: "+deployErr.Error())
_ = deployment.MarkFinished(ctx, models.DeploymentStatusFailed)
svc.writeLogsToFile(app, deployment)
app.Status = models.AppStatusError
_ = app.Save(ctx)
}
// buildRegistryAuths fetches registry credentials for an app and converts them
// to Docker RegistryAuth objects for use during image builds.
func (svc *Service) buildRegistryAuths(
ctx context.Context,
app *models.App,
) ([]docker.RegistryAuth, error) {
creds, err := app.GetRegistryCredentials(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get registry credentials: %w", err)
}
if len(creds) == 0 {
return nil, nil
}
auths := make([]docker.RegistryAuth, 0, len(creds))
for _, cred := range creds {
auths = append(auths, docker.RegistryAuth{
Registry: cred.Registry,
Username: cred.Username,
Password: cred.Password,
})
}
return auths, nil
}
// writeLogsToFile writes the deployment logs to a file on disk.
// Structure: DataDir/logs/<hostname>/<appname>/<appname>_<sha>_<timestamp>.log.txt
func (svc *Service) writeLogsToFile(app *models.App, deployment *models.Deployment) {
if !deployment.Logs.Valid || deployment.Logs.String == "" {
return
}
logPath := svc.GetLogFilePath(app, deployment)
if logPath == "" {
return
}
// Ensure directory exists
logDir := filepath.Dir(logPath)
err := os.MkdirAll(logDir, logsDirPermissions)
if err != nil {
svc.log.Error("failed to create logs directory", "error", err, "path", logDir)
return
}
err = os.WriteFile(logPath, []byte(deployment.Logs.String), logFilePermissions)
if err != nil {
svc.log.Error("failed to write log file", "error", err, "path", logPath)
return
}
svc.log.Info("wrote deployment logs to file", "path", logPath)
}