When a webhook-triggered deploy starts for an app that already has a deploy in progress, the new deploy now cancels the existing one via context cancellation, waits for the lock to be released, and then starts the new deploy. Changes: - Add per-app context cancellation (appCancels sync.Map) to deploy.Service - Deploy() creates a cancellable context and registers it for the app - Add CancelAppDeploy() method to cancel an in-progress deploy - Add ErrDeployCancelled sentinel error for cancelled deploys - Handle context cancellation in build and deploy phases, marking deployments as failed with a clear cancellation message - Webhook triggerDeployment() now cancels in-progress deploys and retries until the lock is released (up to 30 attempts with 2s delay) fixes #38
1033 lines
27 KiB
Go
1033 lines
27 KiB
Go
// Package deploy provides deployment services.
|
|
package deploy
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.uber.org/fx"
|
|
|
|
"git.eeqj.de/sneak/upaas/internal/config"
|
|
"git.eeqj.de/sneak/upaas/internal/database"
|
|
"git.eeqj.de/sneak/upaas/internal/docker"
|
|
"git.eeqj.de/sneak/upaas/internal/logger"
|
|
"git.eeqj.de/sneak/upaas/internal/models"
|
|
"git.eeqj.de/sneak/upaas/internal/service/notify"
|
|
)
|
|
|
|
// Time constants.
|
|
const (
|
|
healthCheckDelaySeconds = 60
|
|
// upaasLabelCount is the number of upaas-specific labels added to containers.
|
|
upaasLabelCount = 1
|
|
// buildsDirPermissions is the permission mode for the builds directory.
|
|
buildsDirPermissions = 0o750
|
|
// buildTimeout is the maximum duration for the build phase.
|
|
buildTimeout = 30 * time.Minute
|
|
// deployTimeout is the maximum duration for the deploy phase (container swap).
|
|
deployTimeout = 5 * time.Minute
|
|
)
|
|
|
|
// Sentinel errors for deployment failures.
|
|
var (
|
|
// ErrContainerUnhealthy indicates the container failed health check.
|
|
ErrContainerUnhealthy = errors.New("container unhealthy after 60 seconds")
|
|
// ErrDeploymentInProgress indicates another deployment is already running.
|
|
ErrDeploymentInProgress = errors.New("deployment already in progress for this app")
|
|
// ErrBuildTimeout indicates the build phase exceeded the timeout.
|
|
ErrBuildTimeout = errors.New("build timeout exceeded")
|
|
// ErrDeployTimeout indicates the deploy phase exceeded the timeout.
|
|
ErrDeployTimeout = errors.New("deploy timeout exceeded")
|
|
// ErrDeployCancelled indicates the deployment was cancelled by a newer deploy.
|
|
ErrDeployCancelled = errors.New("deployment cancelled by newer deploy")
|
|
)
|
|
|
|
// logFlushInterval is how often to flush buffered logs to the database.
|
|
const logFlushInterval = time.Second
|
|
|
|
// logsDirPermissions is the permission mode for the logs directory.
|
|
const logsDirPermissions = 0o750
|
|
|
|
// logFilePermissions is the permission mode for log files.
|
|
const logFilePermissions = 0o640
|
|
|
|
// logTimestampFormat is the format for log file timestamps.
|
|
const logTimestampFormat = "20060102T150405Z"
|
|
|
|
// logFileShortSHALength is the number of characters to use for commit SHA in log filenames.
|
|
const logFileShortSHALength = 12
|
|
|
|
// dockerLogMessage represents a Docker build log message.
|
|
type dockerLogMessage struct {
|
|
Stream string `json:"stream"`
|
|
Error string `json:"error"`
|
|
}
|
|
|
|
// deploymentLogWriter implements io.Writer and periodically flushes to deployment logs.
|
|
// It parses Docker JSON log format and extracts the stream content.
|
|
type deploymentLogWriter struct {
|
|
deployment *models.Deployment
|
|
buffer bytes.Buffer
|
|
lineBuffer bytes.Buffer // buffer for incomplete lines
|
|
mu sync.Mutex
|
|
done chan struct{}
|
|
flushed sync.WaitGroup // waits for flush goroutine to finish
|
|
flushCtx context.Context //nolint:containedctx // needed for async flush goroutine
|
|
}
|
|
|
|
func newDeploymentLogWriter(ctx context.Context, deployment *models.Deployment) *deploymentLogWriter {
|
|
w := &deploymentLogWriter{
|
|
deployment: deployment,
|
|
done: make(chan struct{}),
|
|
flushCtx: ctx,
|
|
}
|
|
w.flushed.Add(1)
|
|
|
|
go w.runFlushLoop()
|
|
|
|
return w
|
|
}
|
|
|
|
// Write implements io.Writer.
|
|
// It parses Docker JSON log format and extracts the stream content.
|
|
func (w *deploymentLogWriter) Write(p []byte) (int, error) {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
// Add incoming data to line buffer
|
|
w.lineBuffer.Write(p)
|
|
|
|
// Process complete lines
|
|
data := w.lineBuffer.Bytes()
|
|
lastNewline := bytes.LastIndexByte(data, '\n')
|
|
|
|
if lastNewline == -1 {
|
|
// No complete lines yet
|
|
return len(p), nil
|
|
}
|
|
|
|
// Process all complete lines
|
|
completeData := data[:lastNewline+1]
|
|
remaining := data[lastNewline+1:]
|
|
|
|
for line := range bytes.SplitSeq(completeData, []byte{'\n'}) {
|
|
w.processLine(line)
|
|
}
|
|
|
|
// Keep any remaining incomplete line data
|
|
w.lineBuffer.Reset()
|
|
|
|
if len(remaining) > 0 {
|
|
w.lineBuffer.Write(remaining)
|
|
}
|
|
|
|
return len(p), nil
|
|
}
|
|
|
|
// Close stops the flush loop, waits for the final flush to complete.
|
|
func (w *deploymentLogWriter) Close() {
|
|
close(w.done)
|
|
w.flushed.Wait()
|
|
}
|
|
|
|
func (w *deploymentLogWriter) runFlushLoop() {
|
|
defer w.flushed.Done()
|
|
|
|
ticker := time.NewTicker(logFlushInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
w.doFlush()
|
|
case <-w.done:
|
|
w.doFlush()
|
|
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *deploymentLogWriter) doFlush() {
|
|
w.mu.Lock()
|
|
data := w.buffer.String()
|
|
w.buffer.Reset()
|
|
w.mu.Unlock()
|
|
|
|
if data != "" {
|
|
_ = w.deployment.AppendLog(w.flushCtx, data)
|
|
}
|
|
}
|
|
|
|
// processLine parses a JSON log line and extracts the stream content.
|
|
func (w *deploymentLogWriter) processLine(line []byte) {
|
|
if len(line) == 0 {
|
|
return
|
|
}
|
|
|
|
var msg dockerLogMessage
|
|
|
|
err := json.Unmarshal(line, &msg)
|
|
if err != nil {
|
|
// Not valid JSON, write as-is
|
|
w.buffer.Write(line)
|
|
w.buffer.WriteByte('\n')
|
|
|
|
return
|
|
}
|
|
|
|
if msg.Error != "" {
|
|
w.buffer.WriteString("ERROR: ")
|
|
w.buffer.WriteString(msg.Error)
|
|
w.buffer.WriteByte('\n')
|
|
}
|
|
|
|
if msg.Stream != "" {
|
|
w.buffer.WriteString(msg.Stream)
|
|
}
|
|
}
|
|
|
|
// ServiceParams contains dependencies for Service.
|
|
type ServiceParams struct {
|
|
fx.In
|
|
|
|
Logger *logger.Logger
|
|
Config *config.Config
|
|
Database *database.Database
|
|
Docker *docker.Client
|
|
Notify *notify.Service
|
|
}
|
|
|
|
// Service provides deployment functionality.
|
|
type Service struct {
|
|
log *slog.Logger
|
|
db *database.Database
|
|
docker *docker.Client
|
|
notify *notify.Service
|
|
config *config.Config
|
|
params *ServiceParams
|
|
appLocks sync.Map // map[string]*sync.Mutex - per-app deployment locks
|
|
appCancels sync.Map // map[string]context.CancelFunc - per-app deploy cancellation
|
|
}
|
|
|
|
// New creates a new deploy Service.
|
|
func New(lc fx.Lifecycle, params ServiceParams) (*Service, error) {
|
|
svc := &Service{
|
|
log: params.Logger.Get(),
|
|
db: params.Database,
|
|
docker: params.Docker,
|
|
notify: params.Notify,
|
|
config: params.Config,
|
|
params: ¶ms,
|
|
}
|
|
|
|
if lc != nil {
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(ctx context.Context) error {
|
|
return svc.cleanupStuckDeployments(ctx)
|
|
},
|
|
})
|
|
}
|
|
|
|
return svc, nil
|
|
}
|
|
|
|
// GetBuildDir returns the build directory path for an app.
|
|
func (svc *Service) GetBuildDir(appID string) string {
|
|
return filepath.Join(svc.config.DataDir, "builds", appID)
|
|
}
|
|
|
|
// GetLogFilePath returns the path to the log file for a deployment.
|
|
// Returns empty string if the path cannot be determined.
|
|
func (svc *Service) GetLogFilePath(app *models.App, deployment *models.Deployment) string {
|
|
hostname, err := os.Hostname()
|
|
if err != nil {
|
|
hostname = "unknown"
|
|
}
|
|
|
|
// Get commit SHA
|
|
sha := ""
|
|
if deployment.CommitSHA.Valid && deployment.CommitSHA.String != "" {
|
|
sha = deployment.CommitSHA.String
|
|
if len(sha) > logFileShortSHALength {
|
|
sha = sha[:logFileShortSHALength]
|
|
}
|
|
}
|
|
|
|
// Use started_at timestamp
|
|
timestamp := deployment.StartedAt.UTC().Format(logTimestampFormat)
|
|
|
|
// Build filename: appname_sha_timestamp.log.txt (or appname_timestamp.log.txt if no SHA)
|
|
var filename string
|
|
if sha != "" {
|
|
filename = fmt.Sprintf("%s_%s_%s.log.txt", app.Name, sha, timestamp)
|
|
} else {
|
|
filename = fmt.Sprintf("%s_%s.log.txt", app.Name, timestamp)
|
|
}
|
|
|
|
return filepath.Join(svc.config.DataDir, "logs", hostname, app.Name, filename)
|
|
}
|
|
|
|
// CancelAppDeploy cancels any in-progress deployment for the given app.
|
|
// It returns true if a deployment was cancelled, false if none was in progress.
|
|
func (svc *Service) CancelAppDeploy(appID string) bool {
|
|
if cancelVal, ok := svc.appCancels.Load(appID); ok {
|
|
if cancelFn, ok := cancelVal.(context.CancelFunc); ok {
|
|
svc.log.Info("cancelling in-progress deployment", "app_id", appID)
|
|
cancelFn()
|
|
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// Deploy deploys an app.
|
|
func (svc *Service) Deploy(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
webhookEventID *int64,
|
|
) error {
|
|
// Try to acquire per-app deployment lock
|
|
if !svc.tryLockApp(app.ID) {
|
|
svc.log.Warn("deployment already in progress", "app", app.Name)
|
|
|
|
return ErrDeploymentInProgress
|
|
}
|
|
defer svc.unlockApp(app.ID)
|
|
|
|
// Create a cancellable context so in-progress deploys can be cancelled
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
// Store the cancel func so other deploys can cancel this one
|
|
svc.appCancels.Store(app.ID, cancel)
|
|
defer svc.appCancels.Delete(app.ID)
|
|
|
|
// Fetch webhook event and create deployment record
|
|
webhookEvent := svc.fetchWebhookEvent(ctx, webhookEventID)
|
|
|
|
deployment, err := svc.createDeploymentRecord(ctx, app, webhookEventID, webhookEvent)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
svc.logWebhookPayload(ctx, deployment, webhookEvent)
|
|
|
|
err = svc.updateAppStatusBuilding(ctx, app)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
svc.notify.NotifyBuildStart(ctx, app, deployment)
|
|
|
|
// Build phase with timeout
|
|
imageID, err := svc.buildImageWithTimeout(ctx, app, deployment)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
svc.notify.NotifyBuildSuccess(ctx, app, deployment)
|
|
|
|
// Deploy phase with timeout
|
|
err = svc.deployContainerWithTimeout(ctx, app, deployment, imageID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = svc.updateAppRunning(ctx, app, imageID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Use context.WithoutCancel to ensure health check completes even if
|
|
// the parent context is cancelled (e.g., HTTP request ends).
|
|
go svc.checkHealthAfterDelay(context.WithoutCancel(ctx), app, deployment)
|
|
|
|
return nil
|
|
}
|
|
|
|
// buildImageWithTimeout runs the build phase with a timeout.
|
|
func (svc *Service) buildImageWithTimeout(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
deployment *models.Deployment,
|
|
) (string, error) {
|
|
buildCtx, cancel := context.WithTimeout(ctx, buildTimeout)
|
|
defer cancel()
|
|
|
|
imageID, err := svc.buildImage(buildCtx, app, deployment)
|
|
if err != nil {
|
|
if errors.Is(buildCtx.Err(), context.DeadlineExceeded) {
|
|
timeoutErr := fmt.Errorf("%w after %v", ErrBuildTimeout, buildTimeout)
|
|
svc.failDeployment(ctx, app, deployment, timeoutErr)
|
|
|
|
return "", timeoutErr
|
|
}
|
|
|
|
if errors.Is(ctx.Err(), context.Canceled) {
|
|
cancelErr := fmt.Errorf("%w", ErrDeployCancelled)
|
|
svc.failDeployment(context.WithoutCancel(ctx), app, deployment, cancelErr)
|
|
|
|
return "", cancelErr
|
|
}
|
|
|
|
return "", err
|
|
}
|
|
|
|
return imageID, nil
|
|
}
|
|
|
|
// deployContainerWithTimeout runs the deploy phase with a timeout.
|
|
// It removes the old container and starts the new one.
|
|
func (svc *Service) deployContainerWithTimeout(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
deployment *models.Deployment,
|
|
imageID string,
|
|
) error {
|
|
deployCtx, cancel := context.WithTimeout(ctx, deployTimeout)
|
|
defer cancel()
|
|
|
|
err := svc.updateDeploymentDeploying(deployCtx, deployment)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Remove old container first to free up the name
|
|
svc.removeOldContainer(deployCtx, app, deployment)
|
|
|
|
// Create and start the new container
|
|
_, err = svc.createAndStartContainer(deployCtx, app, deployment, imageID)
|
|
if err != nil {
|
|
if errors.Is(deployCtx.Err(), context.DeadlineExceeded) {
|
|
timeoutErr := fmt.Errorf("%w after %v", ErrDeployTimeout, deployTimeout)
|
|
svc.failDeployment(ctx, app, deployment, timeoutErr)
|
|
|
|
return timeoutErr
|
|
}
|
|
|
|
if errors.Is(ctx.Err(), context.Canceled) {
|
|
cancelErr := fmt.Errorf("%w", ErrDeployCancelled)
|
|
svc.failDeployment(context.WithoutCancel(ctx), app, deployment, cancelErr)
|
|
|
|
return cancelErr
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// cleanupStuckDeployments marks any deployments stuck in building/deploying as failed.
|
|
func (svc *Service) cleanupStuckDeployments(ctx context.Context) error {
|
|
// First, update the deployments
|
|
deployQuery := `
|
|
UPDATE deployments
|
|
SET status = ?, finished_at = ?, logs = COALESCE(logs, '') || ?
|
|
WHERE status IN (?, ?)
|
|
`
|
|
msg := "\n[System] Deployment marked as failed: process was interrupted\n"
|
|
|
|
_, err := svc.db.DB().ExecContext(
|
|
ctx,
|
|
deployQuery,
|
|
models.DeploymentStatusFailed,
|
|
time.Now(),
|
|
msg,
|
|
models.DeploymentStatusBuilding,
|
|
models.DeploymentStatusDeploying,
|
|
)
|
|
if err != nil {
|
|
svc.log.Error("failed to cleanup stuck deployments", "error", err)
|
|
|
|
return fmt.Errorf("failed to cleanup stuck deployments: %w", err)
|
|
}
|
|
|
|
// Also update app status for apps that were stuck in building
|
|
appQuery := `
|
|
UPDATE apps
|
|
SET status = ?, updated_at = ?
|
|
WHERE status = ?
|
|
`
|
|
|
|
_, err = svc.db.DB().ExecContext(
|
|
ctx,
|
|
appQuery,
|
|
models.AppStatusError,
|
|
time.Now(),
|
|
models.AppStatusBuilding,
|
|
)
|
|
if err != nil {
|
|
svc.log.Error("failed to cleanup stuck app statuses", "error", err)
|
|
|
|
return fmt.Errorf("failed to cleanup stuck app statuses: %w", err)
|
|
}
|
|
|
|
svc.log.Info("cleaned up stuck deployments and app statuses")
|
|
|
|
return nil
|
|
}
|
|
|
|
func (svc *Service) getAppLock(appID string) *sync.Mutex {
|
|
lock, _ := svc.appLocks.LoadOrStore(appID, &sync.Mutex{})
|
|
|
|
mu, ok := lock.(*sync.Mutex)
|
|
if !ok {
|
|
// This should never happen, but handle it gracefully
|
|
newMu := &sync.Mutex{}
|
|
svc.appLocks.Store(appID, newMu)
|
|
|
|
return newMu
|
|
}
|
|
|
|
return mu
|
|
}
|
|
|
|
func (svc *Service) tryLockApp(appID string) bool {
|
|
return svc.getAppLock(appID).TryLock()
|
|
}
|
|
|
|
func (svc *Service) unlockApp(appID string) {
|
|
svc.getAppLock(appID).Unlock()
|
|
}
|
|
|
|
func (svc *Service) fetchWebhookEvent(
|
|
ctx context.Context,
|
|
webhookEventID *int64,
|
|
) *models.WebhookEvent {
|
|
if webhookEventID == nil {
|
|
return nil
|
|
}
|
|
|
|
event, err := models.FindWebhookEvent(ctx, svc.db, *webhookEventID)
|
|
if err != nil {
|
|
svc.log.Warn("failed to fetch webhook event", "error", err)
|
|
|
|
return nil
|
|
}
|
|
|
|
return event
|
|
}
|
|
|
|
func (svc *Service) logWebhookPayload(
|
|
ctx context.Context,
|
|
deployment *models.Deployment,
|
|
webhookEvent *models.WebhookEvent,
|
|
) {
|
|
if webhookEvent == nil || !webhookEvent.Payload.Valid {
|
|
return
|
|
}
|
|
|
|
_ = deployment.AppendLog(ctx, "Webhook received:\n"+webhookEvent.Payload.String+"\n")
|
|
}
|
|
|
|
func (svc *Service) createDeploymentRecord(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
webhookEventID *int64,
|
|
webhookEvent *models.WebhookEvent,
|
|
) (*models.Deployment, error) {
|
|
deployment := models.NewDeployment(svc.db)
|
|
deployment.AppID = app.ID
|
|
|
|
if webhookEventID != nil {
|
|
deployment.WebhookEventID = sql.NullInt64{
|
|
Int64: *webhookEventID,
|
|
Valid: true,
|
|
}
|
|
}
|
|
|
|
// Set commit SHA and URL from webhook event
|
|
if webhookEvent != nil {
|
|
if webhookEvent.CommitSHA.Valid {
|
|
deployment.CommitSHA = webhookEvent.CommitSHA
|
|
}
|
|
|
|
if webhookEvent.CommitURL.Valid {
|
|
deployment.CommitURL = webhookEvent.CommitURL
|
|
}
|
|
}
|
|
|
|
deployment.Status = models.DeploymentStatusBuilding
|
|
|
|
saveErr := deployment.Save(ctx)
|
|
if saveErr != nil {
|
|
return nil, fmt.Errorf("failed to create deployment: %w", saveErr)
|
|
}
|
|
|
|
return deployment, nil
|
|
}
|
|
|
|
func (svc *Service) updateAppStatusBuilding(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
) error {
|
|
app.Status = models.AppStatusBuilding
|
|
|
|
saveErr := app.Save(ctx)
|
|
if saveErr != nil {
|
|
return fmt.Errorf("failed to update app status: %w", saveErr)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (svc *Service) buildImage(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
deployment *models.Deployment,
|
|
) (string, error) {
|
|
workDir, cleanup, err := svc.cloneRepository(ctx, app, deployment)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
defer cleanup()
|
|
|
|
imageTag := fmt.Sprintf("upaas-%s:%d", app.Name, deployment.ID)
|
|
|
|
// Create log writer that flushes build output to deployment logs every second
|
|
logWriter := newDeploymentLogWriter(ctx, deployment)
|
|
defer logWriter.Close()
|
|
|
|
// BuildImage creates a tar archive from the local filesystem,
|
|
// so it needs the container path where files exist, not the host path.
|
|
imageID, err := svc.docker.BuildImage(ctx, docker.BuildImageOptions{
|
|
ContextDir: workDir,
|
|
DockerfilePath: app.DockerfilePath,
|
|
Tags: []string{imageTag},
|
|
LogWriter: logWriter,
|
|
})
|
|
if err != nil {
|
|
svc.notify.NotifyBuildFailed(ctx, app, deployment, err)
|
|
svc.failDeployment(
|
|
ctx,
|
|
app,
|
|
deployment,
|
|
fmt.Errorf("failed to build image: %w", err),
|
|
)
|
|
|
|
return "", fmt.Errorf("failed to build image: %w", err)
|
|
}
|
|
|
|
deployment.ImageID = sql.NullString{String: imageID, Valid: true}
|
|
_ = deployment.AppendLog(ctx, "Image built: "+imageID)
|
|
|
|
return imageID, nil
|
|
}
|
|
|
|
func (svc *Service) cloneRepository(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
deployment *models.Deployment,
|
|
) (string, func(), error) {
|
|
// Use a subdirectory of DataDir for builds since it's mounted from the host
|
|
// and accessible to Docker for bind mounts (unlike /tmp inside the container).
|
|
// Structure: builds/<appname>/<deployment-id>-<random>/
|
|
// deploy_key <- SSH key for cloning
|
|
// work/ <- cloned repository
|
|
appBuildsDir := filepath.Join(svc.config.DataDir, "builds", app.Name)
|
|
|
|
err := os.MkdirAll(appBuildsDir, buildsDirPermissions)
|
|
if err != nil {
|
|
svc.failDeployment(ctx, app, deployment, fmt.Errorf("failed to create builds dir: %w", err))
|
|
|
|
return "", nil, fmt.Errorf("failed to create builds dir: %w", err)
|
|
}
|
|
|
|
buildDir, err := os.MkdirTemp(appBuildsDir, fmt.Sprintf("%d-*", deployment.ID))
|
|
if err != nil {
|
|
svc.failDeployment(ctx, app, deployment, fmt.Errorf("failed to create temp dir: %w", err))
|
|
|
|
return "", nil, fmt.Errorf("failed to create temp dir: %w", err)
|
|
}
|
|
|
|
cleanup := func() { _ = os.RemoveAll(buildDir) }
|
|
|
|
// Calculate the host path for Docker bind mounts.
|
|
// When upaas runs in a container, DataDir is the container path but Docker
|
|
// needs the host path. HostDataDir provides the host-side equivalent.
|
|
// CloneRepo needs both: container path for writing files, host path for Docker mounts.
|
|
hostBuildDir := svc.containerToHostPath(buildDir)
|
|
|
|
// Get commit SHA from deployment if available
|
|
var commitSHA string
|
|
|
|
if deployment.CommitSHA.Valid {
|
|
commitSHA = deployment.CommitSHA.String
|
|
}
|
|
|
|
cloneResult, cloneErr := svc.docker.CloneRepo(
|
|
ctx,
|
|
app.RepoURL,
|
|
app.Branch,
|
|
commitSHA,
|
|
app.SSHPrivateKey,
|
|
buildDir,
|
|
hostBuildDir,
|
|
)
|
|
if cloneErr != nil {
|
|
cleanup()
|
|
svc.failDeployment(ctx, app, deployment, fmt.Errorf("failed to clone repo: %w", cloneErr))
|
|
|
|
return "", nil, fmt.Errorf("failed to clone repo: %w", cloneErr)
|
|
}
|
|
|
|
svc.processCloneResult(ctx, app, deployment, cloneResult, commitSHA)
|
|
|
|
// Return the 'work' subdirectory where the repo was cloned
|
|
workDir := filepath.Join(buildDir, "work")
|
|
|
|
return workDir, cleanup, nil
|
|
}
|
|
|
|
// processCloneResult handles the result of a git clone operation.
|
|
// It sets the commit SHA on the deployment if not already set, and logs the result.
|
|
func (svc *Service) processCloneResult(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
deployment *models.Deployment,
|
|
cloneResult *docker.CloneResult,
|
|
originalCommitSHA string,
|
|
) {
|
|
// Set commit SHA from clone result if not already set (e.g., manual deploys)
|
|
if cloneResult != nil && cloneResult.CommitSHA != "" && !deployment.CommitSHA.Valid {
|
|
deployment.CommitSHA = sql.NullString{String: cloneResult.CommitSHA, Valid: true}
|
|
_ = deployment.Save(ctx)
|
|
}
|
|
|
|
// Log clone success with git output
|
|
actualCommitSHA := originalCommitSHA
|
|
if cloneResult != nil && cloneResult.CommitSHA != "" {
|
|
actualCommitSHA = cloneResult.CommitSHA
|
|
}
|
|
|
|
if actualCommitSHA != "" {
|
|
_ = deployment.AppendLog(ctx, "Repository cloned at commit "+actualCommitSHA)
|
|
} else {
|
|
_ = deployment.AppendLog(ctx, "Repository cloned (branch: "+app.Branch+")")
|
|
}
|
|
|
|
if cloneResult != nil && cloneResult.Output != "" {
|
|
_ = deployment.AppendLog(ctx, cloneResult.Output)
|
|
}
|
|
}
|
|
|
|
// containerToHostPath converts a container-local path to the equivalent host path.
|
|
// This is needed when upaas runs inside a container but needs to pass paths to Docker.
|
|
func (svc *Service) containerToHostPath(containerPath string) string {
|
|
if svc.config.HostDataDir == svc.config.DataDir {
|
|
return containerPath
|
|
}
|
|
|
|
// Get relative path from DataDir
|
|
relPath, err := filepath.Rel(svc.config.DataDir, containerPath)
|
|
if err != nil {
|
|
// Fall back to original path if we can't compute relative path
|
|
return containerPath
|
|
}
|
|
|
|
return filepath.Join(svc.config.HostDataDir, relPath)
|
|
}
|
|
|
|
func (svc *Service) updateDeploymentDeploying(
|
|
ctx context.Context,
|
|
deployment *models.Deployment,
|
|
) error {
|
|
deployment.Status = models.DeploymentStatusDeploying
|
|
|
|
saveErr := deployment.Save(ctx)
|
|
if saveErr != nil {
|
|
return fmt.Errorf("failed to update deployment status: %w", saveErr)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// removeOldContainer stops and removes the old container before deploying a new one.
|
|
func (svc *Service) removeOldContainer(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
deployment *models.Deployment,
|
|
) {
|
|
containerInfo, err := svc.docker.FindContainerByAppID(ctx, app.ID)
|
|
if err != nil || containerInfo == nil {
|
|
return
|
|
}
|
|
|
|
svc.log.Info("removing old container", "id", containerInfo.ID)
|
|
|
|
if containerInfo.Running {
|
|
stopErr := svc.docker.StopContainer(ctx, containerInfo.ID)
|
|
if stopErr != nil {
|
|
svc.log.Warn("failed to stop old container", "error", stopErr)
|
|
}
|
|
}
|
|
|
|
removeErr := svc.docker.RemoveContainer(ctx, containerInfo.ID, true)
|
|
if removeErr != nil {
|
|
svc.log.Warn("failed to remove old container", "error", removeErr)
|
|
}
|
|
|
|
_ = deployment.AppendLog(ctx, "Old container removed: "+containerInfo.ID[:12])
|
|
}
|
|
|
|
func (svc *Service) createAndStartContainer(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
deployment *models.Deployment,
|
|
_ string,
|
|
) (string, error) {
|
|
containerOpts, err := svc.buildContainerOptions(ctx, app, deployment.ID)
|
|
if err != nil {
|
|
svc.failDeployment(ctx, app, deployment, err)
|
|
|
|
return "", err
|
|
}
|
|
|
|
containerID, err := svc.docker.CreateContainer(ctx, containerOpts)
|
|
if err != nil {
|
|
svc.notify.NotifyDeployFailed(ctx, app, deployment, err)
|
|
svc.failDeployment(
|
|
ctx,
|
|
app,
|
|
deployment,
|
|
fmt.Errorf("failed to create container: %w", err),
|
|
)
|
|
|
|
return "", fmt.Errorf("failed to create container: %w", err)
|
|
}
|
|
|
|
deployment.ContainerID = sql.NullString{String: containerID, Valid: true}
|
|
_ = deployment.AppendLog(ctx, "Container created: "+containerID)
|
|
|
|
startErr := svc.docker.StartContainer(ctx, containerID)
|
|
if startErr != nil {
|
|
svc.notify.NotifyDeployFailed(ctx, app, deployment, startErr)
|
|
svc.failDeployment(
|
|
ctx,
|
|
app,
|
|
deployment,
|
|
fmt.Errorf("failed to start container: %w", startErr),
|
|
)
|
|
|
|
return "", fmt.Errorf("failed to start container: %w", startErr)
|
|
}
|
|
|
|
_ = deployment.AppendLog(ctx, "Container started")
|
|
|
|
return containerID, nil
|
|
}
|
|
|
|
func (svc *Service) buildContainerOptions(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
deploymentID int64,
|
|
) (docker.CreateContainerOptions, error) {
|
|
envVars, err := app.GetEnvVars(ctx)
|
|
if err != nil {
|
|
return docker.CreateContainerOptions{}, fmt.Errorf("failed to get env vars: %w", err)
|
|
}
|
|
|
|
labels, err := app.GetLabels(ctx)
|
|
if err != nil {
|
|
return docker.CreateContainerOptions{}, fmt.Errorf("failed to get labels: %w", err)
|
|
}
|
|
|
|
volumes, err := app.GetVolumes(ctx)
|
|
if err != nil {
|
|
return docker.CreateContainerOptions{}, fmt.Errorf("failed to get volumes: %w", err)
|
|
}
|
|
|
|
ports, err := app.GetPorts(ctx)
|
|
if err != nil {
|
|
return docker.CreateContainerOptions{}, fmt.Errorf("failed to get ports: %w", err)
|
|
}
|
|
|
|
envMap := make(map[string]string, len(envVars))
|
|
for _, envVar := range envVars {
|
|
envMap[envVar.Key] = envVar.Value
|
|
}
|
|
|
|
network := ""
|
|
if app.DockerNetwork.Valid {
|
|
network = app.DockerNetwork.String
|
|
}
|
|
|
|
return docker.CreateContainerOptions{
|
|
Name: "upaas-" + app.Name,
|
|
Image: fmt.Sprintf("upaas-%s:%d", app.Name, deploymentID),
|
|
Env: envMap,
|
|
Labels: buildLabelMap(app, labels),
|
|
Volumes: buildVolumeMounts(volumes),
|
|
Ports: buildPortMappings(ports),
|
|
Network: network,
|
|
}, nil
|
|
}
|
|
|
|
func buildLabelMap(app *models.App, labels []*models.Label) map[string]string {
|
|
labelMap := make(map[string]string, len(labels)+upaasLabelCount)
|
|
for _, label := range labels {
|
|
labelMap[label.Key] = label.Value
|
|
}
|
|
|
|
// Add the upaas.id label to identify this container
|
|
labelMap[docker.LabelUpaasID] = app.ID
|
|
|
|
return labelMap
|
|
}
|
|
|
|
func buildVolumeMounts(volumes []*models.Volume) []docker.VolumeMount {
|
|
mounts := make([]docker.VolumeMount, 0, len(volumes))
|
|
for _, vol := range volumes {
|
|
mounts = append(mounts, docker.VolumeMount{
|
|
HostPath: vol.HostPath,
|
|
ContainerPath: vol.ContainerPath,
|
|
ReadOnly: vol.ReadOnly,
|
|
})
|
|
}
|
|
|
|
return mounts
|
|
}
|
|
|
|
func buildPortMappings(ports []*models.Port) []docker.PortMapping {
|
|
mappings := make([]docker.PortMapping, 0, len(ports))
|
|
for _, port := range ports {
|
|
mappings = append(mappings, docker.PortMapping{
|
|
HostPort: port.HostPort,
|
|
ContainerPort: port.ContainerPort,
|
|
Protocol: string(port.Protocol),
|
|
})
|
|
}
|
|
|
|
return mappings
|
|
}
|
|
|
|
func (svc *Service) updateAppRunning(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
imageID string,
|
|
) error {
|
|
app.ImageID = sql.NullString{String: imageID, Valid: true}
|
|
app.Status = models.AppStatusRunning
|
|
|
|
saveErr := app.Save(ctx)
|
|
if saveErr != nil {
|
|
return fmt.Errorf("failed to update app: %w", saveErr)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (svc *Service) checkHealthAfterDelay(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
deployment *models.Deployment,
|
|
) {
|
|
svc.log.Info(
|
|
"waiting 60 seconds to check container health",
|
|
"app", app.Name,
|
|
)
|
|
time.Sleep(healthCheckDelaySeconds * time.Second)
|
|
|
|
// Reload app to get current state
|
|
reloadedApp, err := models.FindApp(ctx, svc.db, app.ID)
|
|
if err != nil || reloadedApp == nil {
|
|
svc.log.Error("failed to reload app for health check", "error", err)
|
|
|
|
return
|
|
}
|
|
|
|
containerInfo, containerErr := svc.docker.FindContainerByAppID(ctx, app.ID)
|
|
if containerErr != nil || containerInfo == nil {
|
|
return
|
|
}
|
|
|
|
healthy, err := svc.docker.IsContainerHealthy(ctx, containerInfo.ID)
|
|
if err != nil {
|
|
svc.log.Error("failed to check container health", "error", err)
|
|
svc.notify.NotifyDeployFailed(ctx, reloadedApp, deployment, err)
|
|
_ = deployment.MarkFinished(ctx, models.DeploymentStatusFailed)
|
|
svc.writeLogsToFile(reloadedApp, deployment)
|
|
reloadedApp.Status = models.AppStatusError
|
|
_ = reloadedApp.Save(ctx)
|
|
|
|
return
|
|
}
|
|
|
|
if healthy {
|
|
svc.log.Info("container healthy after 60 seconds", "app", reloadedApp.Name)
|
|
svc.notify.NotifyDeploySuccess(ctx, reloadedApp, deployment)
|
|
_ = deployment.MarkFinished(ctx, models.DeploymentStatusSuccess)
|
|
svc.writeLogsToFile(reloadedApp, deployment)
|
|
} else {
|
|
svc.log.Warn(
|
|
"container unhealthy after 60 seconds",
|
|
"app", reloadedApp.Name,
|
|
)
|
|
svc.notify.NotifyDeployFailed(ctx, reloadedApp, deployment, ErrContainerUnhealthy)
|
|
_ = deployment.MarkFinished(ctx, models.DeploymentStatusFailed)
|
|
svc.writeLogsToFile(reloadedApp, deployment)
|
|
reloadedApp.Status = models.AppStatusError
|
|
_ = reloadedApp.Save(ctx)
|
|
}
|
|
}
|
|
|
|
func (svc *Service) failDeployment(
|
|
ctx context.Context,
|
|
app *models.App,
|
|
deployment *models.Deployment,
|
|
deployErr error,
|
|
) {
|
|
svc.log.Error("deployment failed", "app", app.Name, "error", deployErr)
|
|
_ = deployment.AppendLog(ctx, "ERROR: "+deployErr.Error())
|
|
_ = deployment.MarkFinished(ctx, models.DeploymentStatusFailed)
|
|
svc.writeLogsToFile(app, deployment)
|
|
app.Status = models.AppStatusError
|
|
_ = app.Save(ctx)
|
|
}
|
|
|
|
// writeLogsToFile writes the deployment logs to a file on disk.
|
|
// Structure: DataDir/logs/<hostname>/<appname>/<appname>_<sha>_<timestamp>.log.txt
|
|
func (svc *Service) writeLogsToFile(app *models.App, deployment *models.Deployment) {
|
|
if !deployment.Logs.Valid || deployment.Logs.String == "" {
|
|
return
|
|
}
|
|
|
|
logPath := svc.GetLogFilePath(app, deployment)
|
|
if logPath == "" {
|
|
return
|
|
}
|
|
|
|
// Ensure directory exists
|
|
logDir := filepath.Dir(logPath)
|
|
|
|
err := os.MkdirAll(logDir, logsDirPermissions)
|
|
if err != nil {
|
|
svc.log.Error("failed to create logs directory", "error", err, "path", logDir)
|
|
|
|
return
|
|
}
|
|
|
|
err = os.WriteFile(logPath, []byte(deployment.Logs.String), logFilePermissions)
|
|
if err != nil {
|
|
svc.log.Error("failed to write log file", "error", err, "path", logPath)
|
|
|
|
return
|
|
}
|
|
|
|
svc.log.Info("wrote deployment logs to file", "path", logPath)
|
|
}
|