feat: add observability improvements (metrics, audit log, structured logging)
All checks were successful
Check / check (pull_request) Successful in 1m45s

- Add Prometheus metrics package (internal/metrics) with deployment,
  container health, webhook, HTTP request, and audit counters/histograms
- Add audit_log SQLite table via migration 007
- Add AuditEntry model with CRUD operations and query methods
- Add audit service (internal/service/audit) for recording user actions
- Instrument deploy service with deployment duration, count, and
  in-flight metrics; container health gauge updates on deploy completion
- Instrument webhook service with event counters by app/type/matched
- Instrument HTTP middleware with request count, duration, and response
  size metrics; also log response bytes in structured request logs
- Add audit logging to all key handler operations: login/logout, app
  CRUD, deploy, cancel, rollback, restart/stop/start, webhook receipt,
  and initial setup
- Add GET /api/audit endpoint for querying recent audit entries
- Make /metrics endpoint always available (optionally auth-protected)
- Add comprehensive tests for metrics, audit model, and audit service
- Update existing test infrastructure with metrics and audit dependencies
- Update README with Observability section documenting all metrics,
  audit log, and structured logging
This commit is contained in:
clawbot
2026-03-17 02:23:44 -07:00
parent fd110e69db
commit f558e2cdd8
21 changed files with 1399 additions and 42 deletions

View File

@@ -0,0 +1,153 @@
// Package audit provides audit logging for user actions.
package audit
import (
"context"
"database/sql"
"log/slog"
"net"
"net/http"
"strings"
"go.uber.org/fx"
"sneak.berlin/go/upaas/internal/database"
"sneak.berlin/go/upaas/internal/logger"
"sneak.berlin/go/upaas/internal/metrics"
"sneak.berlin/go/upaas/internal/models"
)
// ServiceParams contains dependencies for Service.
type ServiceParams struct {
fx.In
Logger *logger.Logger
Database *database.Database
Metrics *metrics.Metrics
}
// Service provides audit logging functionality.
type Service struct {
log *slog.Logger
db *database.Database
metrics *metrics.Metrics
}
// New creates a new audit Service.
func New(_ fx.Lifecycle, params ServiceParams) (*Service, error) {
return &Service{
log: params.Logger.Get(),
db: params.Database,
metrics: params.Metrics,
}, nil
}
// LogEntry records an audit event.
type LogEntry struct {
UserID int64
Username string
Action models.AuditAction
ResourceType models.AuditResourceType
ResourceID string
Detail string
RemoteIP string
}
// Log records an audit log entry and increments the audit metrics counter.
func (svc *Service) Log(ctx context.Context, entry LogEntry) {
auditEntry := models.NewAuditEntry(svc.db)
auditEntry.Username = entry.Username
auditEntry.Action = entry.Action
auditEntry.ResourceType = entry.ResourceType
if entry.UserID != 0 {
auditEntry.UserID = sql.NullInt64{Int64: entry.UserID, Valid: true}
}
if entry.ResourceID != "" {
auditEntry.ResourceID = sql.NullString{String: entry.ResourceID, Valid: true}
}
if entry.Detail != "" {
auditEntry.Detail = sql.NullString{String: entry.Detail, Valid: true}
}
if entry.RemoteIP != "" {
auditEntry.RemoteIP = sql.NullString{String: entry.RemoteIP, Valid: true}
}
err := auditEntry.Save(ctx)
if err != nil {
svc.log.Error("failed to save audit entry",
"error", err,
"action", entry.Action,
"username", entry.Username,
)
return
}
svc.metrics.AuditEventsTotal.WithLabelValues(string(entry.Action)).Inc()
svc.log.Info("audit",
"action", entry.Action,
"username", entry.Username,
"resource_type", entry.ResourceType,
"resource_id", entry.ResourceID,
)
}
// LogFromRequest records an audit log entry, extracting the remote IP from
// the HTTP request.
func (svc *Service) LogFromRequest(
ctx context.Context,
request *http.Request,
entry LogEntry,
) {
entry.RemoteIP = extractRemoteIP(request)
svc.Log(ctx, entry)
}
// extractRemoteIP extracts the client IP from the request, preferring
// X-Real-IP and X-Forwarded-For headers from trusted proxies.
func extractRemoteIP(r *http.Request) string {
// Check X-Real-IP first
if ip := strings.TrimSpace(r.Header.Get("X-Real-IP")); ip != "" {
return ip
}
// Check X-Forwarded-For (leftmost = client)
if xff := r.Header.Get("X-Forwarded-For"); xff != "" {
if parts := strings.SplitN(xff, ",", 2); len(parts) > 0 { //nolint:mnd // split limit
if ip := strings.TrimSpace(parts[0]); ip != "" {
return ip
}
}
}
// Fall back to RemoteAddr
host, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
return r.RemoteAddr
}
return host
}
// Recent returns the most recent audit log entries.
func (svc *Service) Recent(
ctx context.Context,
limit int,
) ([]*models.AuditEntry, error) {
return models.FindAuditEntries(ctx, svc.db, limit)
}
// ForResource returns audit log entries for a specific resource.
func (svc *Service) ForResource(
ctx context.Context,
resourceType models.AuditResourceType,
resourceID string,
limit int,
) ([]*models.AuditEntry, error) {
return models.FindAuditEntriesByResource(ctx, svc.db, resourceType, resourceID, limit)
}

View File

@@ -0,0 +1,196 @@
package audit_test
import (
"context"
"log/slog"
"net/http"
"net/http/httptest"
"os"
"testing"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/fx"
"sneak.berlin/go/upaas/internal/config"
"sneak.berlin/go/upaas/internal/database"
"sneak.berlin/go/upaas/internal/globals"
"sneak.berlin/go/upaas/internal/logger"
"sneak.berlin/go/upaas/internal/metrics"
"sneak.berlin/go/upaas/internal/models"
"sneak.berlin/go/upaas/internal/service/audit"
)
func setupTestAuditService(t *testing.T) (*audit.Service, *database.Database) {
t.Helper()
globals.SetAppname("upaas-test")
globals.SetVersion("test")
tmpDir := t.TempDir()
cfg := &config.Config{
DataDir: tmpDir,
}
log := slog.New(slog.NewTextHandler(os.Stderr, nil))
logWrapper := logger.NewForTest(log)
db, err := database.New(fx.Lifecycle(nil), database.Params{
Logger: logWrapper,
Config: cfg,
})
require.NoError(t, err)
reg := prometheus.NewRegistry()
metricsInstance := metrics.NewForTest(reg)
svc, err := audit.New(fx.Lifecycle(nil), audit.ServiceParams{
Logger: logWrapper,
Database: db,
Metrics: metricsInstance,
})
require.NoError(t, err)
return svc, db
}
func TestAuditServiceLog(t *testing.T) {
t.Parallel()
svc, db := setupTestAuditService(t)
ctx := context.Background()
svc.Log(ctx, audit.LogEntry{
UserID: 1,
Username: "admin",
Action: models.AuditActionLogin,
ResourceType: models.AuditResourceSession,
Detail: "user logged in",
RemoteIP: "127.0.0.1",
})
entries, err := models.FindAuditEntries(ctx, db, 10)
require.NoError(t, err)
require.Len(t, entries, 1)
assert.Equal(t, "admin", entries[0].Username)
assert.Equal(t, models.AuditActionLogin, entries[0].Action)
assert.Equal(t, "127.0.0.1", entries[0].RemoteIP.String)
}
func TestAuditServiceLogFromRequest(t *testing.T) {
t.Parallel()
svc, db := setupTestAuditService(t)
ctx := context.Background()
request := httptest.NewRequest(http.MethodPost, "/apps", nil)
request.RemoteAddr = "10.0.0.1:12345"
svc.LogFromRequest(ctx, request, audit.LogEntry{
Username: "admin",
Action: models.AuditActionAppCreate,
ResourceType: models.AuditResourceApp,
ResourceID: "app-1",
Detail: "created app",
})
entries, err := models.FindAuditEntries(ctx, db, 10)
require.NoError(t, err)
require.Len(t, entries, 1)
assert.Equal(t, "10.0.0.1", entries[0].RemoteIP.String)
assert.Equal(t, "app-1", entries[0].ResourceID.String)
}
func TestAuditServiceLogFromRequestWithXRealIP(t *testing.T) {
t.Parallel()
svc, db := setupTestAuditService(t)
ctx := context.Background()
request := httptest.NewRequest(http.MethodPost, "/apps", nil)
request.Header.Set("X-Real-IP", "203.0.113.50")
svc.LogFromRequest(ctx, request, audit.LogEntry{
Username: "admin",
Action: models.AuditActionAppCreate,
ResourceType: models.AuditResourceApp,
})
entries, err := models.FindAuditEntries(ctx, db, 10)
require.NoError(t, err)
require.Len(t, entries, 1)
assert.Equal(t, "203.0.113.50", entries[0].RemoteIP.String)
}
func TestAuditServiceRecent(t *testing.T) {
t.Parallel()
svc, _ := setupTestAuditService(t)
ctx := context.Background()
for range 5 {
svc.Log(ctx, audit.LogEntry{
Username: "admin",
Action: models.AuditActionLogin,
ResourceType: models.AuditResourceSession,
})
}
entries, err := svc.Recent(ctx, 3)
require.NoError(t, err)
assert.Len(t, entries, 3)
}
func TestAuditServiceForResource(t *testing.T) {
t.Parallel()
svc, _ := setupTestAuditService(t)
ctx := context.Background()
// Log entries for different resources.
svc.Log(ctx, audit.LogEntry{
Username: "admin",
Action: models.AuditActionAppCreate,
ResourceType: models.AuditResourceApp,
ResourceID: "app-1",
})
svc.Log(ctx, audit.LogEntry{
Username: "admin",
Action: models.AuditActionAppDeploy,
ResourceType: models.AuditResourceApp,
ResourceID: "app-1",
})
svc.Log(ctx, audit.LogEntry{
Username: "admin",
Action: models.AuditActionAppCreate,
ResourceType: models.AuditResourceApp,
ResourceID: "app-2",
})
entries, err := svc.ForResource(ctx, models.AuditResourceApp, "app-1", 10)
require.NoError(t, err)
assert.Len(t, entries, 2)
}
func TestAuditServiceLogWithNoOptionalFields(t *testing.T) {
t.Parallel()
svc, db := setupTestAuditService(t)
ctx := context.Background()
svc.Log(ctx, audit.LogEntry{
Username: "system",
Action: models.AuditActionWebhookReceive,
ResourceType: models.AuditResourceWebhook,
})
entries, err := models.FindAuditEntries(ctx, db, 10)
require.NoError(t, err)
require.Len(t, entries, 1)
assert.False(t, entries[0].UserID.Valid)
assert.False(t, entries[0].ResourceID.Valid)
assert.False(t, entries[0].Detail.Valid)
assert.False(t, entries[0].RemoteIP.Valid)
}

View File

@@ -21,6 +21,7 @@ import (
"sneak.berlin/go/upaas/internal/database"
"sneak.berlin/go/upaas/internal/docker"
"sneak.berlin/go/upaas/internal/logger"
"sneak.berlin/go/upaas/internal/metrics"
"sneak.berlin/go/upaas/internal/models"
"sneak.berlin/go/upaas/internal/service/notify"
)
@@ -208,6 +209,7 @@ type ServiceParams struct {
Database *database.Database
Docker *docker.Client
Notify *notify.Service
Metrics *metrics.Metrics
}
// activeDeploy tracks a running deployment so it can be cancelled.
@@ -222,6 +224,7 @@ type Service struct {
db *database.Database
docker *docker.Client
notify *notify.Service
metrics *metrics.Metrics
config *config.Config
params *ServiceParams
activeDeploys sync.Map // map[string]*activeDeploy - per-app active deployment tracking
@@ -231,12 +234,13 @@ type Service struct {
// New creates a new deploy Service.
func New(lc fx.Lifecycle, params ServiceParams) (*Service, error) {
svc := &Service{
log: params.Logger.Get(),
db: params.Database,
docker: params.Docker,
notify: params.Notify,
config: params.Config,
params: &params,
log: params.Logger.Get(),
db: params.Database,
docker: params.Docker,
notify: params.Notify,
metrics: params.Metrics,
config: params.Config,
params: &params,
}
if lc != nil {
@@ -327,6 +331,11 @@ func (svc *Service) Deploy(
}
defer svc.unlockApp(app.ID)
// Track in-flight deployments
svc.metrics.DeploymentsInFlight.WithLabelValues(app.Name).Inc()
deployStart := time.Now()
// Set up cancellable context and register as active deploy
deployCtx, cancel := context.WithCancel(ctx)
done := make(chan struct{})
@@ -334,6 +343,7 @@ func (svc *Service) Deploy(
svc.activeDeploys.Store(app.ID, ad)
defer func() {
svc.metrics.DeploymentsInFlight.WithLabelValues(app.Name).Dec()
cancel()
close(done)
svc.activeDeploys.Delete(app.ID)
@@ -359,7 +369,7 @@ func (svc *Service) Deploy(
svc.notify.NotifyBuildStart(bgCtx, app, deployment)
return svc.runBuildAndDeploy(deployCtx, bgCtx, app, deployment)
return svc.runBuildAndDeploy(deployCtx, bgCtx, app, deployment, deployStart)
}
// Rollback rolls back an app to its previous image.
@@ -467,15 +477,20 @@ func (svc *Service) runBuildAndDeploy(
bgCtx context.Context,
app *models.App,
deployment *models.Deployment,
deployStart time.Time,
) error {
// Build phase with timeout
imageID, err := svc.buildImageWithTimeout(deployCtx, app, deployment)
if err != nil {
cancelErr := svc.checkCancelled(deployCtx, bgCtx, app, deployment, "")
if cancelErr != nil {
svc.recordDeployMetrics(app.Name, "cancelled", deployStart)
return cancelErr
}
svc.recordDeployMetrics(app.Name, "failed", deployStart)
return err
}
@@ -486,9 +501,13 @@ func (svc *Service) runBuildAndDeploy(
if err != nil {
cancelErr := svc.checkCancelled(deployCtx, bgCtx, app, deployment, imageID)
if cancelErr != nil {
svc.recordDeployMetrics(app.Name, "cancelled", deployStart)
return cancelErr
}
svc.recordDeployMetrics(app.Name, "failed", deployStart)
return err
}
@@ -504,11 +523,19 @@ func (svc *Service) runBuildAndDeploy(
// Use context.WithoutCancel to ensure health check completes even if
// the parent context is cancelled (e.g., HTTP request ends).
go svc.checkHealthAfterDelay(bgCtx, app, deployment)
go svc.checkHealthAfterDelay(bgCtx, app, deployment, deployStart)
return nil
}
// recordDeployMetrics records deployment completion metrics.
func (svc *Service) recordDeployMetrics(appName, status string, start time.Time) {
duration := time.Since(start).Seconds()
svc.metrics.DeploymentsTotal.WithLabelValues(appName, status).Inc()
svc.metrics.DeploymentDuration.WithLabelValues(appName, status).Observe(duration)
}
// buildImageWithTimeout runs the build phase with a timeout.
func (svc *Service) buildImageWithTimeout(
ctx context.Context,
@@ -1163,6 +1190,7 @@ func (svc *Service) checkHealthAfterDelay(
ctx context.Context,
app *models.App,
deployment *models.Deployment,
deployStart time.Time,
) {
svc.log.Info(
"waiting 60 seconds to check container health",
@@ -1189,6 +1217,8 @@ func (svc *Service) checkHealthAfterDelay(
svc.notify.NotifyDeployFailed(ctx, reloadedApp, deployment, err)
_ = deployment.MarkFinished(ctx, models.DeploymentStatusFailed)
svc.writeLogsToFile(reloadedApp, deployment)
svc.recordDeployMetrics(reloadedApp.Name, "failed", deployStart)
svc.metrics.ContainerHealthy.WithLabelValues(reloadedApp.Name).Set(0)
reloadedApp.Status = models.AppStatusError
_ = reloadedApp.Save(ctx)
@@ -1200,6 +1230,8 @@ func (svc *Service) checkHealthAfterDelay(
svc.notify.NotifyDeploySuccess(ctx, reloadedApp, deployment)
_ = deployment.MarkFinished(ctx, models.DeploymentStatusSuccess)
svc.writeLogsToFile(reloadedApp, deployment)
svc.recordDeployMetrics(reloadedApp.Name, "success", deployStart)
svc.metrics.ContainerHealthy.WithLabelValues(reloadedApp.Name).Set(1)
} else {
svc.log.Warn(
"container unhealthy after 60 seconds",
@@ -1208,6 +1240,8 @@ func (svc *Service) checkHealthAfterDelay(
svc.notify.NotifyDeployFailed(ctx, reloadedApp, deployment, ErrContainerUnhealthy)
_ = deployment.MarkFinished(ctx, models.DeploymentStatusFailed)
svc.writeLogsToFile(reloadedApp, deployment)
svc.recordDeployMetrics(reloadedApp.Name, "failed", deployStart)
svc.metrics.ContainerHealthy.WithLabelValues(reloadedApp.Name).Set(0)
reloadedApp.Status = models.AppStatusError
_ = reloadedApp.Save(ctx)
}

View File

@@ -7,12 +7,14 @@ import (
"encoding/json"
"fmt"
"log/slog"
"strconv"
"go.uber.org/fx"
"sneak.berlin/go/upaas/internal/database"
"sneak.berlin/go/upaas/internal/logger"
"sneak.berlin/go/upaas/internal/metrics"
"sneak.berlin/go/upaas/internal/models"
"sneak.berlin/go/upaas/internal/service/deploy"
)
@@ -24,23 +26,26 @@ type ServiceParams struct {
Logger *logger.Logger
Database *database.Database
Deploy *deploy.Service
Metrics *metrics.Metrics
}
// Service provides webhook handling functionality.
type Service struct {
log *slog.Logger
db *database.Database
deploy *deploy.Service
params *ServiceParams
log *slog.Logger
db *database.Database
deploy *deploy.Service
metrics *metrics.Metrics
params *ServiceParams
}
// New creates a new webhook Service.
func New(_ fx.Lifecycle, params ServiceParams) (*Service, error) {
return &Service{
log: params.Logger.Get(),
db: params.Database,
deploy: params.Deploy,
params: &params,
log: params.Logger.Get(),
db: params.Database,
deploy: params.Deploy,
metrics: params.Metrics,
params: &params,
}, nil
}
@@ -122,6 +127,10 @@ func (svc *Service) HandleWebhook(
"commit", commitSHA,
)
svc.metrics.WebhookEventsTotal.WithLabelValues(
app.Name, eventType, strconv.FormatBool(matched),
).Inc()
// If branch matches, trigger deployment
if matched {
svc.triggerDeployment(ctx, app, event)

View File

@@ -8,6 +8,7 @@ import (
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/fx"
@@ -17,6 +18,7 @@ import (
"sneak.berlin/go/upaas/internal/docker"
"sneak.berlin/go/upaas/internal/globals"
"sneak.berlin/go/upaas/internal/logger"
"sneak.berlin/go/upaas/internal/metrics"
"sneak.berlin/go/upaas/internal/models"
"sneak.berlin/go/upaas/internal/service/deploy"
"sneak.berlin/go/upaas/internal/service/notify"
@@ -63,13 +65,17 @@ func setupTestService(t *testing.T) (*webhook.Service, *database.Database, func(
notifySvc, err := notify.New(fx.Lifecycle(nil), notify.ServiceParams{Logger: deps.logger})
require.NoError(t, err)
metricsInstance := metrics.NewForTest(prometheus.NewRegistry())
deploySvc, err := deploy.New(fx.Lifecycle(nil), deploy.ServiceParams{
Logger: deps.logger, Config: deps.config, Database: deps.db, Docker: dockerClient, Notify: notifySvc,
Metrics: metricsInstance,
})
require.NoError(t, err)
svc, err := webhook.New(fx.Lifecycle(nil), webhook.ServiceParams{
Logger: deps.logger, Database: deps.db, Deploy: deploySvc,
Metrics: metricsInstance,
})
require.NoError(t, err)