The parameter was accepted but ignored (blank identifier). The image reference is constructed from app name and deployment ID in buildContainerOptions instead.
1261 lines
34 KiB
Go
1261 lines
34 KiB
Go
// Package deploy provides deployment services.
|
|
package deploy
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.uber.org/fx"
|
|
|
|
"git.eeqj.de/sneak/upaas/internal/config"
|
|
"git.eeqj.de/sneak/upaas/internal/database"
|
|
"git.eeqj.de/sneak/upaas/internal/docker"
|
|
"git.eeqj.de/sneak/upaas/internal/logger"
|
|
"git.eeqj.de/sneak/upaas/internal/models"
|
|
"git.eeqj.de/sneak/upaas/internal/service/notify"
|
|
)
|
|
|
|
// Time constants.
|
|
const (
|
|
healthCheckDelaySeconds = 60
|
|
// upaasLabelCount is the number of upaas-specific labels added to containers.
|
|
upaasLabelCount = 1
|
|
// buildsDirPermissions is the permission mode for the builds directory.
|
|
buildsDirPermissions = 0o750
|
|
// buildTimeout is the maximum duration for the build phase.
|
|
buildTimeout = 30 * time.Minute
|
|
// deployTimeout is the maximum duration for the deploy phase (container swap).
|
|
deployTimeout = 5 * time.Minute
|
|
)
|
|
|
|
// Sentinel errors for deployment failures.
|
|
var (
|
|
// ErrContainerUnhealthy indicates the container failed health check.
|
|
ErrContainerUnhealthy = errors.New("container unhealthy after 60 seconds")
|
|
// ErrDeploymentInProgress indicates another deployment is already running.
|
|
ErrDeploymentInProgress = errors.New("deployment already in progress for this app")
|
|
// ErrDeployCancelled indicates the deployment was cancelled by a newer deploy.
|
|
ErrDeployCancelled = errors.New("deployment cancelled by newer deploy")
|
|
// ErrBuildTimeout indicates the build phase exceeded the timeout.
|
|
ErrBuildTimeout = errors.New("build timeout exceeded")
|
|
// ErrDeployTimeout indicates the deploy phase exceeded the timeout.
|
|
ErrDeployTimeout = errors.New("deploy timeout exceeded")
|
|
// ErrNoPreviousImage indicates there is no previous image to rollback to.
|
|
ErrNoPreviousImage = errors.New("no previous image available for rollback")
|
|
)
|
|
|
|
// logFlushInterval is how often to flush buffered logs to the database.
|
|
const logFlushInterval = time.Second
|
|
|
|
// logsDirPermissions is the permission mode for the logs directory.
|
|
const logsDirPermissions = 0o750
|
|
|
|
// logFilePermissions is the permission mode for log files.
|
|
const logFilePermissions = 0o640
|
|
|
|
// logTimestampFormat is the format for log file timestamps.
|
|
const logTimestampFormat = "20060102T150405Z"
|
|
|
|
// logFileShortSHALength is the number of characters to use for commit SHA in log filenames.
|
|
const logFileShortSHALength = 12
|
|
|
|
// dockerLogMessage represents a Docker build log message.
|
|
type dockerLogMessage struct {
|
|
Stream string `json:"stream"`
|
|
Error string `json:"error"`
|
|
}
|
|
|
|
// deploymentLogWriter implements io.Writer and periodically flushes to deployment logs.
|
|
// It parses Docker JSON log format and extracts the stream content.
|
|
type deploymentLogWriter struct {
|
|
deployment *models.Deployment
|
|
buffer bytes.Buffer
|
|
lineBuffer bytes.Buffer // buffer for incomplete lines
|
|
mu sync.Mutex
|
|
done chan struct{}
|
|
flushed sync.WaitGroup // waits for flush goroutine to finish
|
|
flushCtx context.Context //nolint:containedctx // needed for async flush goroutine
|
|
}
|
|
|
|
func newDeploymentLogWriter(ctx context.Context, deployment *models.Deployment) *deploymentLogWriter {
|
|
w := &deploymentLogWriter{
|
|
deployment: deployment,
|
|
done: make(chan struct{}),
|
|
flushCtx: ctx,
|
|
}
|
|
w.flushed.Add(1)
|
|
|
|
go w.runFlushLoop()
|
|
|
|
return w
|
|
}
|
|
|
|
// Write implements io.Writer.
|
|
// It parses Docker JSON log format and extracts the stream content.
|
|
func (w *deploymentLogWriter) Write(p []byte) (int, error) {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
// Add incoming data to line buffer
|
|
w.lineBuffer.Write(p)
|
|
|
|
// Process complete lines
|
|
data := w.lineBuffer.Bytes()
|
|
lastNewline := bytes.LastIndexByte(data, '\n')
|
|
|
|
if lastNewline == -1 {
|
|
// No complete lines yet
|
|
return len(p), nil
|
|
}
|
|
|
|
// Process all complete lines
|
|
completeData := data[:lastNewline+1]
|
|
remaining := data[lastNewline+1:]
|
|
|
|
for line := range bytes.SplitSeq(completeData, []byte{'\n'}) {
|
|
w.processLine(line)
|
|
}
|
|
|
|
// Keep any remaining incomplete line data
|
|
w.lineBuffer.Reset()
|
|
|
|
if len(remaining) > 0 {
|
|
w.lineBuffer.Write(remaining)
|
|
}
|
|
|
|
return len(p), nil
|
|
}
|
|
|
|
// Close stops the flush loop, waits for the final flush to complete.
|
|
func (w *deploymentLogWriter) Close() {
|
|
close(w.done)
|
|
w.flushed.Wait()
|
|
}
|
|
|
|
func (w *deploymentLogWriter) runFlushLoop() {
|
|
defer w.flushed.Done()
|
|
|
|
ticker := time.NewTicker(logFlushInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
w.doFlush()
|
|
case <-w.done:
|
|
w.doFlush()
|
|
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *deploymentLogWriter) doFlush() {
|
|
w.mu.Lock()
|
|
data := w.buffer.String()
|
|
w.buffer.Reset()
|
|
w.mu.Unlock()
|
|
|
|
if data != "" {
|
|
_ = w.deployment.AppendLog(w.flushCtx, data)
|
|
}
|
|
}
|
|
|
|
// processLine parses a JSON log line and extracts the stream content.
|
|
func (w *deploymentLogWriter) processLine(line []byte) {
|
|
if len(line) == 0 {
|
|
return
|
|
}
|
|
|
|
var msg dockerLogMessage
|
|
|
|
err := json.Unmarshal(line, &msg)
|
|
if err != nil {
|
|
// Not valid JSON, write as-is
|
|
w.buffer.Write(line)
|
|
w.buffer.WriteByte('\n')
|
|
|
|
return
|
|
}
|
|
|
|
if msg.Error != "" {
|
|
w.buffer.WriteString("ERROR: ")
|
|
w.buffer.WriteString(msg.Error)
|
|
w.buffer.WriteByte('\n')
|
|
}
|
|
|
|
if msg.Stream != "" {
|
|
w.buffer.WriteString(msg.Stream)
|
|
}
|
|
}
|
|
|
|
// ServiceParams contains dependencies for Service.
|
|
type ServiceParams struct {
|
|
fx.In
|
|
|
|
Logger *logger.Logger
|
|
Config *config.Config
|
|
Database *database.Database
|
|
Docker *docker.Client
|
|
Notify *notify.Service
|
|
}
|
|
|
|
// activeDeploy tracks a running deployment so it can be cancelled.
|
|
type activeDeploy struct {
|
|
cancel context.CancelFunc
|
|
done chan struct{}
|
|
}
|
|
|
|
// Service provides deployment functionality.
|
|
type Service struct {
|
|
log *slog.Logger
|
|
db *database.Database
|
|
docker *docker.Client
|
|
notify *notify.Service
|
|
config *config.Config
|
|
params *ServiceParams
|
|
activeDeploys sync.Map // map[string]*activeDeploy - per-app active deployment tracking
|
|
appLocks sync.Map // map[string]*sync.Mutex - per-app deployment locks
|
|
}
|
|
|
|
// New creates a new deploy Service.
|
|
func New(lc fx.Lifecycle, params ServiceParams) (*Service, error) {
|
|
svc := &Service{
|
|
log: params.Logger.Get(),
|
|
db: params.Database,
|
|
docker: params.Docker,
|
|
notify: params.Notify,
|
|
config: params.Config,
|
|
params: ¶ms,
|
|
}
|
|
|
|
if lc != nil {
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(ctx context.Context) error {
|
|
return svc.cleanupStuckDeployments(ctx)
|
|
},
|
|
})
|
|
}
|
|
|
|
return svc, nil
|
|
}
|
|
|
|
// GetBuildDir returns the build directory path for an app.
|
|
func (svc *Service) GetBuildDir(appName string) string {
|
|
return filepath.Join(svc.config.DataDir, "builds", appName)
|
|
}
|
|
|
|
// GetLogFilePath returns the path to the log file for a deployment.
|
|
// Returns empty string if the path cannot be determined.
|
|
func (svc *Service) GetLogFilePath(app *models.App, deployment *models.Deployment) string {
|
|
hostname, err := os.Hostname()
|
|
if err != nil {
|
|
hostname = "unknown"
|
|
}
|
|
|
|
// Get commit SHA
|
|
sha := ""
|
|
if deployment.CommitSHA.Valid && deployment.CommitSHA.String != "" {
|
|
sha = deployment.CommitSHA.String
|
|
if len(sha) > logFileShortSHALength {
|
|
sha = sha[:logFileShortSHALength]
|
|
}
|
|
}
|
|
|
|
// Use started_at timestamp
|
|
timestamp := deployment.StartedAt.UTC().Format(logTimestampFormat)
|
|
|
|
// Build filename: appname_sha_timestamp.log.txt (or appname_timestamp.log.txt if no SHA)
|
|
var filename string
|
|
if sha != "" {
|
|
filename = fmt.Sprintf("%s_%s_%s.log.txt", app.Name, sha, timestamp)
|
|
} else {
|
|
filename = fmt.Sprintf("%s_%s.log.txt", app.Name, timestamp)
|
|
}
|
|
|
|
return filepath.Join(svc.config.DataDir, "logs", hostname, app.Name, filename)
|
|
}
|
|
|
|
// HasActiveDeploy returns true if there is an active deployment for the given app.
|
|
func (svc *Service) HasActiveDeploy(appID string) bool {
|
|
_, ok := svc.activeDeploys.Load(appID)
|
|
|
|
return ok
|
|
}
|
|
|
|
// CancelDeploy cancels any in-progress deployment for the given app
|
|
// and waits for it to finish before returning. Returns true if a deployment
|
|
// was cancelled, false if there was nothing to cancel.
|
|
func (svc *Service) CancelDeploy(appID string) bool {
|
|
if !svc.HasActiveDeploy(appID) {
|
|
return false
|
|
}
|
|
|
|
svc.cancelActiveDeploy(appID)
|
|
|
|
return true
|
|
}
|
|
|
|
// Deploy deploys an app. If cancelExisting is true (e.g. webhook-triggered),
|
|
// any in-progress deploy for the same app will be cancelled before starting.
|
|
// If cancelExisting is false and a deploy is in progress, ErrDeploymentInProgress is returned.
|
|
func (svc *Service) Deploy(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
webhookEventID *int64,
|
|
cancelExisting bool,
|
|
) error {
|
|
if cancelExisting {
|
|
svc.cancelActiveDeploy(app.ID)
|
|
}
|
|
|
|
// Try to acquire per-app deployment lock
|
|
if !svc.tryLockApp(app.ID) {
|
|
svc.log.Warn("deployment already in progress", "app", app.Name)
|
|
|
|
return ErrDeploymentInProgress
|
|
}
|
|
defer svc.unlockApp(app.ID)
|
|
|
|
// Set up cancellable context and register as active deploy
|
|
deployCtx, cancel := context.WithCancel(ctx)
|
|
done := make(chan struct{})
|
|
ad := &activeDeploy{cancel: cancel, done: done}
|
|
svc.activeDeploys.Store(app.ID, ad)
|
|
|
|
defer func() {
|
|
cancel()
|
|
close(done)
|
|
svc.activeDeploys.Delete(app.ID)
|
|
}()
|
|
|
|
// Fetch webhook event and create deployment record
|
|
webhookEvent := svc.fetchWebhookEvent(deployCtx, webhookEventID)
|
|
|
|
// Use a background context for DB operations that must complete regardless of cancellation
|
|
bgCtx := context.WithoutCancel(deployCtx)
|
|
|
|
deployment, err := svc.createDeploymentRecord(bgCtx, app, webhookEventID, webhookEvent)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
svc.logWebhookPayload(bgCtx, deployment, webhookEvent)
|
|
|
|
err = svc.updateAppStatusBuilding(bgCtx, app)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
svc.notify.NotifyBuildStart(bgCtx, app, deployment)
|
|
|
|
return svc.runBuildAndDeploy(deployCtx, bgCtx, app, deployment)
|
|
}
|
|
|
|
// Rollback rolls back an app to its previous image.
|
|
// It stops the current container, starts a new one with the previous image,
|
|
// and creates a deployment record for the rollback.
|
|
func (svc *Service) Rollback(ctx context.Context, app *models.App) error {
|
|
if !app.PreviousImageID.Valid || app.PreviousImageID.String == "" {
|
|
return ErrNoPreviousImage
|
|
}
|
|
|
|
// Acquire per-app deployment lock
|
|
if !svc.tryLockApp(app.ID) {
|
|
return ErrDeploymentInProgress
|
|
}
|
|
defer svc.unlockApp(app.ID)
|
|
|
|
bgCtx := context.WithoutCancel(ctx)
|
|
|
|
deployment, err := svc.createRollbackDeployment(bgCtx, app)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return svc.executeRollback(ctx, bgCtx, app, deployment)
|
|
}
|
|
|
|
// createRollbackDeployment creates a deployment record for a rollback operation.
|
|
func (svc *Service) createRollbackDeployment(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
) (*models.Deployment, error) {
|
|
deployment := models.NewDeployment(svc.db)
|
|
deployment.AppID = app.ID
|
|
deployment.Status = models.DeploymentStatusDeploying
|
|
deployment.ImageID = sql.NullString{String: app.PreviousImageID.String, Valid: true}
|
|
|
|
saveErr := deployment.Save(ctx)
|
|
if saveErr != nil {
|
|
return nil, fmt.Errorf("failed to create rollback deployment: %w", saveErr)
|
|
}
|
|
|
|
_ = deployment.AppendLog(ctx, "Rolling back to previous image: "+app.PreviousImageID.String)
|
|
|
|
return deployment, nil
|
|
}
|
|
|
|
// executeRollback performs the container swap for a rollback.
|
|
func (svc *Service) executeRollback(
|
|
ctx context.Context,
|
|
bgCtx context.Context,
|
|
app *models.App,
|
|
deployment *models.Deployment,
|
|
) error {
|
|
previousImageID := app.PreviousImageID.String
|
|
|
|
svc.removeOldContainer(ctx, app, deployment)
|
|
|
|
rollbackOpts, err := svc.buildContainerOptions(ctx, app, deployment.ID)
|
|
if err != nil {
|
|
svc.failDeployment(bgCtx, app, deployment, err)
|
|
|
|
return fmt.Errorf("failed to build container options: %w", err)
|
|
}
|
|
|
|
rollbackOpts.Image = previousImageID
|
|
|
|
containerID, err := svc.docker.CreateContainer(ctx, rollbackOpts)
|
|
if err != nil {
|
|
svc.failDeployment(bgCtx, app, deployment, fmt.Errorf("failed to create rollback container: %w", err))
|
|
|
|
return fmt.Errorf("failed to create rollback container: %w", err)
|
|
}
|
|
|
|
deployment.ContainerID = sql.NullString{String: containerID, Valid: true}
|
|
_ = deployment.AppendLog(bgCtx, "Rollback container created: "+containerID)
|
|
|
|
startErr := svc.docker.StartContainer(ctx, containerID)
|
|
if startErr != nil {
|
|
svc.failDeployment(bgCtx, app, deployment, fmt.Errorf("failed to start rollback container: %w", startErr))
|
|
|
|
return fmt.Errorf("failed to start rollback container: %w", startErr)
|
|
}
|
|
|
|
_ = deployment.AppendLog(bgCtx, "Rollback container started")
|
|
|
|
currentImageID := app.ImageID
|
|
app.ImageID = sql.NullString{String: previousImageID, Valid: true}
|
|
app.PreviousImageID = currentImageID
|
|
app.Status = models.AppStatusRunning
|
|
|
|
saveErr := app.Save(bgCtx)
|
|
if saveErr != nil {
|
|
return fmt.Errorf("failed to update app after rollback: %w", saveErr)
|
|
}
|
|
|
|
_ = deployment.MarkFinished(bgCtx, models.DeploymentStatusSuccess)
|
|
_ = deployment.AppendLog(bgCtx, "Rollback complete")
|
|
|
|
svc.log.Info("rollback completed", "app", app.Name, "image", previousImageID)
|
|
|
|
return nil
|
|
}
|
|
|
|
// runBuildAndDeploy executes the build and deploy phases, handling cancellation.
|
|
func (svc *Service) runBuildAndDeploy(
|
|
deployCtx context.Context,
|
|
bgCtx context.Context,
|
|
app *models.App,
|
|
deployment *models.Deployment,
|
|
) error {
|
|
// Build phase with timeout
|
|
imageID, err := svc.buildImageWithTimeout(deployCtx, app, deployment)
|
|
if err != nil {
|
|
cancelErr := svc.checkCancelled(deployCtx, bgCtx, app, deployment, "")
|
|
if cancelErr != nil {
|
|
return cancelErr
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
svc.notify.NotifyBuildSuccess(bgCtx, app, deployment)
|
|
|
|
// Deploy phase with timeout
|
|
err = svc.deployContainerWithTimeout(deployCtx, app, deployment)
|
|
if err != nil {
|
|
cancelErr := svc.checkCancelled(deployCtx, bgCtx, app, deployment, imageID)
|
|
if cancelErr != nil {
|
|
return cancelErr
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// Save current image as previous before updating to new one
|
|
if app.ImageID.Valid && app.ImageID.String != "" {
|
|
app.PreviousImageID = app.ImageID
|
|
}
|
|
|
|
err = svc.updateAppRunning(bgCtx, app, imageID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Use context.WithoutCancel to ensure health check completes even if
|
|
// the parent context is cancelled (e.g., HTTP request ends).
|
|
go svc.checkHealthAfterDelay(bgCtx, app, deployment)
|
|
|
|
return nil
|
|
}
|
|
|
|
// buildImageWithTimeout runs the build phase with a timeout.
|
|
func (svc *Service) buildImageWithTimeout(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
deployment *models.Deployment,
|
|
) (string, error) {
|
|
buildCtx, cancel := context.WithTimeout(ctx, buildTimeout)
|
|
defer cancel()
|
|
|
|
imageID, err := svc.buildImage(buildCtx, app, deployment)
|
|
if err != nil {
|
|
if errors.Is(buildCtx.Err(), context.DeadlineExceeded) {
|
|
timeoutErr := fmt.Errorf("%w after %v", ErrBuildTimeout, buildTimeout)
|
|
svc.failDeployment(ctx, app, deployment, timeoutErr)
|
|
|
|
return "", timeoutErr
|
|
}
|
|
|
|
return "", err
|
|
}
|
|
|
|
return imageID, nil
|
|
}
|
|
|
|
// deployContainerWithTimeout runs the deploy phase with a timeout.
|
|
// It removes the old container and starts the new one.
|
|
func (svc *Service) deployContainerWithTimeout(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
deployment *models.Deployment,
|
|
) error {
|
|
deployCtx, cancel := context.WithTimeout(ctx, deployTimeout)
|
|
defer cancel()
|
|
|
|
err := svc.updateDeploymentDeploying(deployCtx, deployment)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Remove old container first to free up the name
|
|
svc.removeOldContainer(deployCtx, app, deployment)
|
|
|
|
// Create and start the new container
|
|
_, err = svc.createAndStartContainer(deployCtx, app, deployment)
|
|
if err != nil {
|
|
if errors.Is(deployCtx.Err(), context.DeadlineExceeded) {
|
|
timeoutErr := fmt.Errorf("%w after %v", ErrDeployTimeout, deployTimeout)
|
|
svc.failDeployment(ctx, app, deployment, timeoutErr)
|
|
|
|
return timeoutErr
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// cleanupStuckDeployments marks any deployments stuck in building/deploying as failed.
|
|
func (svc *Service) cleanupStuckDeployments(ctx context.Context) error {
|
|
// First, update the deployments
|
|
deployQuery := `
|
|
UPDATE deployments
|
|
SET status = ?, finished_at = ?, logs = COALESCE(logs, '') || ?
|
|
WHERE status IN (?, ?)
|
|
`
|
|
msg := "\n[System] Deployment marked as failed: process was interrupted\n"
|
|
|
|
_, err := svc.db.DB().ExecContext(
|
|
ctx,
|
|
deployQuery,
|
|
models.DeploymentStatusFailed,
|
|
time.Now(),
|
|
msg,
|
|
models.DeploymentStatusBuilding,
|
|
models.DeploymentStatusDeploying,
|
|
)
|
|
if err != nil {
|
|
svc.log.Error("failed to cleanup stuck deployments", "error", err)
|
|
|
|
return fmt.Errorf("failed to cleanup stuck deployments: %w", err)
|
|
}
|
|
|
|
// Also update app status for apps that were stuck in building
|
|
appQuery := `
|
|
UPDATE apps
|
|
SET status = ?, updated_at = ?
|
|
WHERE status = ?
|
|
`
|
|
|
|
_, err = svc.db.DB().ExecContext(
|
|
ctx,
|
|
appQuery,
|
|
models.AppStatusError,
|
|
time.Now(),
|
|
models.AppStatusBuilding,
|
|
)
|
|
if err != nil {
|
|
svc.log.Error("failed to cleanup stuck app statuses", "error", err)
|
|
|
|
return fmt.Errorf("failed to cleanup stuck app statuses: %w", err)
|
|
}
|
|
|
|
svc.log.Info("cleaned up stuck deployments and app statuses")
|
|
|
|
return nil
|
|
}
|
|
|
|
func (svc *Service) getAppLock(appID string) *sync.Mutex {
|
|
lock, _ := svc.appLocks.LoadOrStore(appID, &sync.Mutex{})
|
|
|
|
mu, ok := lock.(*sync.Mutex)
|
|
if !ok {
|
|
// This should never happen, but handle it gracefully
|
|
newMu := &sync.Mutex{}
|
|
svc.appLocks.Store(appID, newMu)
|
|
|
|
return newMu
|
|
}
|
|
|
|
return mu
|
|
}
|
|
|
|
func (svc *Service) tryLockApp(appID string) bool {
|
|
return svc.getAppLock(appID).TryLock()
|
|
}
|
|
|
|
func (svc *Service) unlockApp(appID string) {
|
|
svc.getAppLock(appID).Unlock()
|
|
}
|
|
|
|
// cancelActiveDeploy cancels any in-progress deployment for the given app
|
|
// and waits for it to finish before returning.
|
|
func (svc *Service) cancelActiveDeploy(appID string) {
|
|
val, ok := svc.activeDeploys.Load(appID)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
ad, ok := val.(*activeDeploy)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
svc.log.Info("cancelling in-progress deployment", "app_id", appID)
|
|
ad.cancel()
|
|
<-ad.done
|
|
}
|
|
|
|
// checkCancelled checks if the deploy context was cancelled (by a newer deploy)
|
|
// and if so, marks the deployment as cancelled and cleans up orphan resources.
|
|
// Returns ErrDeployCancelled or nil.
|
|
func (svc *Service) checkCancelled(
|
|
deployCtx context.Context,
|
|
bgCtx context.Context,
|
|
app *models.App,
|
|
deployment *models.Deployment,
|
|
imageID string,
|
|
) error {
|
|
if !errors.Is(deployCtx.Err(), context.Canceled) {
|
|
return nil
|
|
}
|
|
|
|
svc.log.Info("deployment cancelled", "app", app.Name)
|
|
|
|
svc.cleanupCancelledDeploy(bgCtx, app, deployment, imageID)
|
|
|
|
_ = deployment.MarkFinished(bgCtx, models.DeploymentStatusCancelled)
|
|
|
|
return ErrDeployCancelled
|
|
}
|
|
|
|
// cleanupCancelledDeploy removes orphan resources left by a cancelled deployment.
|
|
func (svc *Service) cleanupCancelledDeploy(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
deployment *models.Deployment,
|
|
imageID string,
|
|
) {
|
|
// Clean up the intermediate Docker image if one was built
|
|
if imageID != "" {
|
|
removeErr := svc.docker.RemoveImage(ctx, imageID)
|
|
if removeErr != nil {
|
|
svc.log.Error("failed to remove image from cancelled deploy",
|
|
"error", removeErr, "app", app.Name, "image", imageID)
|
|
_ = deployment.AppendLog(ctx, "WARNING: failed to clean up image "+imageID+": "+removeErr.Error())
|
|
} else {
|
|
svc.log.Info("cleaned up image from cancelled deploy",
|
|
"app", app.Name, "image", imageID)
|
|
_ = deployment.AppendLog(ctx, "Cleaned up intermediate image: "+imageID)
|
|
}
|
|
}
|
|
|
|
// Clean up the build directory for this deployment
|
|
buildDir := svc.GetBuildDir(app.Name)
|
|
|
|
entries, err := os.ReadDir(buildDir)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
prefix := fmt.Sprintf("%d-", deployment.ID)
|
|
|
|
for _, entry := range entries {
|
|
if entry.IsDir() && strings.HasPrefix(entry.Name(), prefix) {
|
|
dirPath := filepath.Join(buildDir, entry.Name())
|
|
|
|
removeErr := os.RemoveAll(dirPath)
|
|
if removeErr != nil {
|
|
svc.log.Error("failed to remove build dir from cancelled deploy",
|
|
"error", removeErr, "path", dirPath)
|
|
} else {
|
|
svc.log.Info("cleaned up build dir from cancelled deploy",
|
|
"app", app.Name, "path", dirPath)
|
|
|
|
_ = deployment.AppendLog(ctx, "Cleaned up build directory")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (svc *Service) fetchWebhookEvent(
|
|
ctx context.Context,
|
|
webhookEventID *int64,
|
|
) *models.WebhookEvent {
|
|
if webhookEventID == nil {
|
|
return nil
|
|
}
|
|
|
|
event, err := models.FindWebhookEvent(ctx, svc.db, *webhookEventID)
|
|
if err != nil {
|
|
svc.log.Warn("failed to fetch webhook event", "error", err)
|
|
|
|
return nil
|
|
}
|
|
|
|
return event
|
|
}
|
|
|
|
func (svc *Service) logWebhookPayload(
|
|
ctx context.Context,
|
|
deployment *models.Deployment,
|
|
webhookEvent *models.WebhookEvent,
|
|
) {
|
|
if webhookEvent == nil || !webhookEvent.Payload.Valid {
|
|
return
|
|
}
|
|
|
|
_ = deployment.AppendLog(ctx, "Webhook received:\n"+webhookEvent.Payload.String+"\n")
|
|
}
|
|
|
|
func (svc *Service) createDeploymentRecord(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
webhookEventID *int64,
|
|
webhookEvent *models.WebhookEvent,
|
|
) (*models.Deployment, error) {
|
|
deployment := models.NewDeployment(svc.db)
|
|
deployment.AppID = app.ID
|
|
|
|
if webhookEventID != nil {
|
|
deployment.WebhookEventID = sql.NullInt64{
|
|
Int64: *webhookEventID,
|
|
Valid: true,
|
|
}
|
|
}
|
|
|
|
// Set commit SHA and URL from webhook event
|
|
if webhookEvent != nil {
|
|
if webhookEvent.CommitSHA.Valid {
|
|
deployment.CommitSHA = webhookEvent.CommitSHA
|
|
}
|
|
|
|
if webhookEvent.CommitURL.Valid {
|
|
deployment.CommitURL = webhookEvent.CommitURL
|
|
}
|
|
}
|
|
|
|
deployment.Status = models.DeploymentStatusBuilding
|
|
|
|
saveErr := deployment.Save(ctx)
|
|
if saveErr != nil {
|
|
return nil, fmt.Errorf("failed to create deployment: %w", saveErr)
|
|
}
|
|
|
|
return deployment, nil
|
|
}
|
|
|
|
func (svc *Service) updateAppStatusBuilding(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
) error {
|
|
app.Status = models.AppStatusBuilding
|
|
|
|
saveErr := app.Save(ctx)
|
|
if saveErr != nil {
|
|
return fmt.Errorf("failed to update app status: %w", saveErr)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (svc *Service) buildImage(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
deployment *models.Deployment,
|
|
) (string, error) {
|
|
workDir, cleanup, err := svc.cloneRepository(ctx, app, deployment)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
defer cleanup()
|
|
|
|
imageTag := fmt.Sprintf("upaas-%s:%d", app.Name, deployment.ID)
|
|
|
|
// Create log writer that flushes build output to deployment logs every second
|
|
logWriter := newDeploymentLogWriter(ctx, deployment)
|
|
defer logWriter.Close()
|
|
|
|
// BuildImage creates a tar archive from the local filesystem,
|
|
// so it needs the container path where files exist, not the host path.
|
|
imageID, err := svc.docker.BuildImage(ctx, docker.BuildImageOptions{
|
|
ContextDir: workDir,
|
|
DockerfilePath: app.DockerfilePath,
|
|
Tags: []string{imageTag},
|
|
LogWriter: logWriter,
|
|
})
|
|
if err != nil {
|
|
svc.notify.NotifyBuildFailed(ctx, app, deployment, err)
|
|
svc.failDeployment(
|
|
ctx,
|
|
app,
|
|
deployment,
|
|
fmt.Errorf("failed to build image: %w", err),
|
|
)
|
|
|
|
return "", fmt.Errorf("failed to build image: %w", err)
|
|
}
|
|
|
|
deployment.ImageID = sql.NullString{String: imageID, Valid: true}
|
|
_ = deployment.AppendLog(ctx, "Image built: "+imageID)
|
|
|
|
return imageID, nil
|
|
}
|
|
|
|
func (svc *Service) cloneRepository(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
deployment *models.Deployment,
|
|
) (string, func(), error) {
|
|
// Use a subdirectory of DataDir for builds since it's mounted from the host
|
|
// and accessible to Docker for bind mounts (unlike /tmp inside the container).
|
|
// Structure: builds/<appname>/<deployment-id>-<random>/
|
|
// deploy_key <- SSH key for cloning
|
|
// work/ <- cloned repository
|
|
appBuildsDir := filepath.Join(svc.config.DataDir, "builds", app.Name)
|
|
|
|
err := os.MkdirAll(appBuildsDir, buildsDirPermissions)
|
|
if err != nil {
|
|
svc.failDeployment(ctx, app, deployment, fmt.Errorf("failed to create builds dir: %w", err))
|
|
|
|
return "", nil, fmt.Errorf("failed to create builds dir: %w", err)
|
|
}
|
|
|
|
buildDir, err := os.MkdirTemp(appBuildsDir, fmt.Sprintf("%d-*", deployment.ID))
|
|
if err != nil {
|
|
svc.failDeployment(ctx, app, deployment, fmt.Errorf("failed to create temp dir: %w", err))
|
|
|
|
return "", nil, fmt.Errorf("failed to create temp dir: %w", err)
|
|
}
|
|
|
|
cleanup := func() { _ = os.RemoveAll(buildDir) }
|
|
|
|
// Calculate the host path for Docker bind mounts.
|
|
// When upaas runs in a container, DataDir is the container path but Docker
|
|
// needs the host path. HostDataDir provides the host-side equivalent.
|
|
// CloneRepo needs both: container path for writing files, host path for Docker mounts.
|
|
hostBuildDir := svc.containerToHostPath(buildDir)
|
|
|
|
// Get commit SHA from deployment if available
|
|
var commitSHA string
|
|
|
|
if deployment.CommitSHA.Valid {
|
|
commitSHA = deployment.CommitSHA.String
|
|
}
|
|
|
|
cloneResult, cloneErr := svc.docker.CloneRepo(
|
|
ctx,
|
|
app.RepoURL,
|
|
app.Branch,
|
|
commitSHA,
|
|
app.SSHPrivateKey,
|
|
buildDir,
|
|
hostBuildDir,
|
|
)
|
|
if cloneErr != nil {
|
|
cleanup()
|
|
svc.failDeployment(ctx, app, deployment, fmt.Errorf("failed to clone repo: %w", cloneErr))
|
|
|
|
return "", nil, fmt.Errorf("failed to clone repo: %w", cloneErr)
|
|
}
|
|
|
|
svc.processCloneResult(ctx, app, deployment, cloneResult, commitSHA)
|
|
|
|
// Return the 'work' subdirectory where the repo was cloned
|
|
workDir := filepath.Join(buildDir, "work")
|
|
|
|
return workDir, cleanup, nil
|
|
}
|
|
|
|
// processCloneResult handles the result of a git clone operation.
|
|
// It sets the commit SHA on the deployment if not already set, and logs the result.
|
|
func (svc *Service) processCloneResult(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
deployment *models.Deployment,
|
|
cloneResult *docker.CloneResult,
|
|
originalCommitSHA string,
|
|
) {
|
|
// Set commit SHA from clone result if not already set (e.g., manual deploys)
|
|
if cloneResult != nil && cloneResult.CommitSHA != "" && !deployment.CommitSHA.Valid {
|
|
deployment.CommitSHA = sql.NullString{String: cloneResult.CommitSHA, Valid: true}
|
|
_ = deployment.Save(ctx)
|
|
}
|
|
|
|
// Log clone success with git output
|
|
actualCommitSHA := originalCommitSHA
|
|
if cloneResult != nil && cloneResult.CommitSHA != "" {
|
|
actualCommitSHA = cloneResult.CommitSHA
|
|
}
|
|
|
|
if actualCommitSHA != "" {
|
|
_ = deployment.AppendLog(ctx, "Repository cloned at commit "+actualCommitSHA)
|
|
} else {
|
|
_ = deployment.AppendLog(ctx, "Repository cloned (branch: "+app.Branch+")")
|
|
}
|
|
|
|
if cloneResult != nil && cloneResult.Output != "" {
|
|
_ = deployment.AppendLog(ctx, cloneResult.Output)
|
|
}
|
|
}
|
|
|
|
// containerToHostPath converts a container-local path to the equivalent host path.
|
|
// This is needed when upaas runs inside a container but needs to pass paths to Docker.
|
|
func (svc *Service) containerToHostPath(containerPath string) string {
|
|
if svc.config.HostDataDir == svc.config.DataDir {
|
|
return containerPath
|
|
}
|
|
|
|
// Get relative path from DataDir
|
|
relPath, err := filepath.Rel(svc.config.DataDir, containerPath)
|
|
if err != nil {
|
|
// Fall back to original path if we can't compute relative path
|
|
return containerPath
|
|
}
|
|
|
|
return filepath.Join(svc.config.HostDataDir, relPath)
|
|
}
|
|
|
|
func (svc *Service) updateDeploymentDeploying(
|
|
ctx context.Context,
|
|
deployment *models.Deployment,
|
|
) error {
|
|
deployment.Status = models.DeploymentStatusDeploying
|
|
|
|
saveErr := deployment.Save(ctx)
|
|
if saveErr != nil {
|
|
return fmt.Errorf("failed to update deployment status: %w", saveErr)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// removeOldContainer stops and removes the old container before deploying a new one.
|
|
func (svc *Service) removeOldContainer(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
deployment *models.Deployment,
|
|
) {
|
|
containerInfo, err := svc.docker.FindContainerByAppID(ctx, app.ID)
|
|
if err != nil || containerInfo == nil {
|
|
return
|
|
}
|
|
|
|
svc.log.Info("removing old container", "id", containerInfo.ID)
|
|
|
|
if containerInfo.Running {
|
|
stopErr := svc.docker.StopContainer(ctx, containerInfo.ID)
|
|
if stopErr != nil {
|
|
svc.log.Warn("failed to stop old container", "error", stopErr)
|
|
}
|
|
}
|
|
|
|
removeErr := svc.docker.RemoveContainer(ctx, containerInfo.ID, true)
|
|
if removeErr != nil {
|
|
svc.log.Warn("failed to remove old container", "error", removeErr)
|
|
}
|
|
|
|
_ = deployment.AppendLog(ctx, "Old container removed: "+containerInfo.ID[:12])
|
|
}
|
|
|
|
func (svc *Service) createAndStartContainer(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
deployment *models.Deployment,
|
|
) (string, error) {
|
|
containerOpts, err := svc.buildContainerOptions(ctx, app, deployment.ID)
|
|
if err != nil {
|
|
svc.failDeployment(ctx, app, deployment, err)
|
|
|
|
return "", err
|
|
}
|
|
|
|
containerID, err := svc.docker.CreateContainer(ctx, containerOpts)
|
|
if err != nil {
|
|
svc.notify.NotifyDeployFailed(ctx, app, deployment, err)
|
|
svc.failDeployment(
|
|
ctx,
|
|
app,
|
|
deployment,
|
|
fmt.Errorf("failed to create container: %w", err),
|
|
)
|
|
|
|
return "", fmt.Errorf("failed to create container: %w", err)
|
|
}
|
|
|
|
deployment.ContainerID = sql.NullString{String: containerID, Valid: true}
|
|
_ = deployment.AppendLog(ctx, "Container created: "+containerID)
|
|
|
|
startErr := svc.docker.StartContainer(ctx, containerID)
|
|
if startErr != nil {
|
|
svc.notify.NotifyDeployFailed(ctx, app, deployment, startErr)
|
|
svc.failDeployment(
|
|
ctx,
|
|
app,
|
|
deployment,
|
|
fmt.Errorf("failed to start container: %w", startErr),
|
|
)
|
|
|
|
return "", fmt.Errorf("failed to start container: %w", startErr)
|
|
}
|
|
|
|
_ = deployment.AppendLog(ctx, "Container started")
|
|
|
|
return containerID, nil
|
|
}
|
|
|
|
func (svc *Service) buildContainerOptions(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
deploymentID int64,
|
|
) (docker.CreateContainerOptions, error) {
|
|
envVars, err := app.GetEnvVars(ctx)
|
|
if err != nil {
|
|
return docker.CreateContainerOptions{}, fmt.Errorf("failed to get env vars: %w", err)
|
|
}
|
|
|
|
labels, err := app.GetLabels(ctx)
|
|
if err != nil {
|
|
return docker.CreateContainerOptions{}, fmt.Errorf("failed to get labels: %w", err)
|
|
}
|
|
|
|
volumes, err := app.GetVolumes(ctx)
|
|
if err != nil {
|
|
return docker.CreateContainerOptions{}, fmt.Errorf("failed to get volumes: %w", err)
|
|
}
|
|
|
|
ports, err := app.GetPorts(ctx)
|
|
if err != nil {
|
|
return docker.CreateContainerOptions{}, fmt.Errorf("failed to get ports: %w", err)
|
|
}
|
|
|
|
envMap := make(map[string]string, len(envVars))
|
|
for _, envVar := range envVars {
|
|
envMap[envVar.Key] = envVar.Value
|
|
}
|
|
|
|
network := ""
|
|
if app.DockerNetwork.Valid {
|
|
network = app.DockerNetwork.String
|
|
}
|
|
|
|
return docker.CreateContainerOptions{
|
|
Name: "upaas-" + app.Name,
|
|
Image: fmt.Sprintf("upaas-%s:%d", app.Name, deploymentID),
|
|
Env: envMap,
|
|
Labels: buildLabelMap(app, labels),
|
|
Volumes: buildVolumeMounts(volumes),
|
|
Ports: buildPortMappings(ports),
|
|
Network: network,
|
|
}, nil
|
|
}
|
|
|
|
func buildLabelMap(app *models.App, labels []*models.Label) map[string]string {
|
|
labelMap := make(map[string]string, len(labels)+upaasLabelCount)
|
|
for _, label := range labels {
|
|
labelMap[label.Key] = label.Value
|
|
}
|
|
|
|
// Add the upaas.id label to identify this container
|
|
labelMap[docker.LabelUpaasID] = app.ID
|
|
|
|
return labelMap
|
|
}
|
|
|
|
func buildVolumeMounts(volumes []*models.Volume) []docker.VolumeMount {
|
|
mounts := make([]docker.VolumeMount, 0, len(volumes))
|
|
for _, vol := range volumes {
|
|
mounts = append(mounts, docker.VolumeMount{
|
|
HostPath: vol.HostPath,
|
|
ContainerPath: vol.ContainerPath,
|
|
ReadOnly: vol.ReadOnly,
|
|
})
|
|
}
|
|
|
|
return mounts
|
|
}
|
|
|
|
func buildPortMappings(ports []*models.Port) []docker.PortMapping {
|
|
mappings := make([]docker.PortMapping, 0, len(ports))
|
|
for _, port := range ports {
|
|
mappings = append(mappings, docker.PortMapping{
|
|
HostPort: port.HostPort,
|
|
ContainerPort: port.ContainerPort,
|
|
Protocol: string(port.Protocol),
|
|
})
|
|
}
|
|
|
|
return mappings
|
|
}
|
|
|
|
func (svc *Service) updateAppRunning(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
imageID string,
|
|
) error {
|
|
app.ImageID = sql.NullString{String: imageID, Valid: true}
|
|
app.Status = models.AppStatusRunning
|
|
|
|
saveErr := app.Save(ctx)
|
|
if saveErr != nil {
|
|
return fmt.Errorf("failed to update app: %w", saveErr)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (svc *Service) checkHealthAfterDelay(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
deployment *models.Deployment,
|
|
) {
|
|
svc.log.Info(
|
|
"waiting 60 seconds to check container health",
|
|
"app", app.Name,
|
|
)
|
|
time.Sleep(healthCheckDelaySeconds * time.Second)
|
|
|
|
// Reload app to get current state
|
|
reloadedApp, err := models.FindApp(ctx, svc.db, app.ID)
|
|
if err != nil || reloadedApp == nil {
|
|
svc.log.Error("failed to reload app for health check", "error", err)
|
|
|
|
return
|
|
}
|
|
|
|
containerInfo, containerErr := svc.docker.FindContainerByAppID(ctx, app.ID)
|
|
if containerErr != nil || containerInfo == nil {
|
|
return
|
|
}
|
|
|
|
healthy, err := svc.docker.IsContainerHealthy(ctx, containerInfo.ID)
|
|
if err != nil {
|
|
svc.log.Error("failed to check container health", "error", err)
|
|
svc.notify.NotifyDeployFailed(ctx, reloadedApp, deployment, err)
|
|
_ = deployment.MarkFinished(ctx, models.DeploymentStatusFailed)
|
|
svc.writeLogsToFile(reloadedApp, deployment)
|
|
reloadedApp.Status = models.AppStatusError
|
|
_ = reloadedApp.Save(ctx)
|
|
|
|
return
|
|
}
|
|
|
|
if healthy {
|
|
svc.log.Info("container healthy after 60 seconds", "app", reloadedApp.Name)
|
|
svc.notify.NotifyDeploySuccess(ctx, reloadedApp, deployment)
|
|
_ = deployment.MarkFinished(ctx, models.DeploymentStatusSuccess)
|
|
svc.writeLogsToFile(reloadedApp, deployment)
|
|
} else {
|
|
svc.log.Warn(
|
|
"container unhealthy after 60 seconds",
|
|
"app", reloadedApp.Name,
|
|
)
|
|
svc.notify.NotifyDeployFailed(ctx, reloadedApp, deployment, ErrContainerUnhealthy)
|
|
_ = deployment.MarkFinished(ctx, models.DeploymentStatusFailed)
|
|
svc.writeLogsToFile(reloadedApp, deployment)
|
|
reloadedApp.Status = models.AppStatusError
|
|
_ = reloadedApp.Save(ctx)
|
|
}
|
|
}
|
|
|
|
func (svc *Service) failDeployment(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
deployment *models.Deployment,
|
|
deployErr error,
|
|
) {
|
|
svc.log.Error("deployment failed", "app", app.Name, "error", deployErr)
|
|
_ = deployment.AppendLog(ctx, "ERROR: "+deployErr.Error())
|
|
_ = deployment.MarkFinished(ctx, models.DeploymentStatusFailed)
|
|
svc.writeLogsToFile(app, deployment)
|
|
app.Status = models.AppStatusError
|
|
_ = app.Save(ctx)
|
|
}
|
|
|
|
// writeLogsToFile writes the deployment logs to a file on disk.
|
|
// Structure: DataDir/logs/<hostname>/<appname>/<appname>_<sha>_<timestamp>.log.txt
|
|
func (svc *Service) writeLogsToFile(app *models.App, deployment *models.Deployment) {
|
|
if !deployment.Logs.Valid || deployment.Logs.String == "" {
|
|
return
|
|
}
|
|
|
|
logPath := svc.GetLogFilePath(app, deployment)
|
|
if logPath == "" {
|
|
return
|
|
}
|
|
|
|
// Ensure directory exists
|
|
logDir := filepath.Dir(logPath)
|
|
|
|
err := os.MkdirAll(logDir, logsDirPermissions)
|
|
if err != nil {
|
|
svc.log.Error("failed to create logs directory", "error", err, "path", logDir)
|
|
|
|
return
|
|
}
|
|
|
|
err = os.WriteFile(logPath, []byte(deployment.Logs.String), logFilePermissions)
|
|
if err != nil {
|
|
svc.log.Error("failed to write log file", "error", err, "path", logPath)
|
|
|
|
return
|
|
}
|
|
|
|
svc.log.Info("wrote deployment logs to file", "path", logPath)
|
|
}
|