vaultik/internal/blob/packer.go
sneak 78af626759 Major refactoring: UUID-based storage, streaming architecture, and CLI improvements
This commit represents a significant architectural overhaul of vaultik:

Database Schema Changes:
- Switch files table to use UUID primary keys instead of path-based keys
- Add UUID primary keys to blobs table for immediate chunk association
- Update all foreign key relationships to use UUIDs
- Add comprehensive schema documentation in DATAMODEL.md
- Add SQLite busy timeout handling for concurrent operations

Streaming and Performance Improvements:
- Implement true streaming blob packing without intermediate storage
- Add streaming chunk processing to reduce memory usage
- Improve progress reporting with real-time metrics
- Add upload metrics tracking in new uploads table

CLI Refactoring:
- Restructure CLI to use subcommands: snapshot create/list/purge/verify
- Add store info command for S3 configuration display
- Add custom duration parser supporting days/weeks/months/years
- Remove old backup.go in favor of enhanced snapshot.go
- Add --cron flag for silent operation

Configuration Changes:
- Remove unused index_prefix configuration option
- Add support for snapshot pruning retention policies
- Improve configuration validation and error messages

Testing Improvements:
- Add comprehensive repository tests with edge cases
- Add cascade delete debugging tests
- Fix concurrent operation tests to use SQLite busy timeout
- Remove tolerance for SQLITE_BUSY errors in tests

Documentation:
- Add MIT LICENSE file
- Update README with new command structure
- Add comprehensive DATAMODEL.md explaining database schema
- Update DESIGN.md with UUID-based architecture

Other Changes:
- Add test-config.yml for testing
- Update Makefile with better test output formatting
- Fix various race conditions in concurrent operations
- Improve error handling throughout
2025-07-22 14:56:44 +02:00

559 lines
17 KiB
Go

// Package blob handles the creation of blobs - the final storage units for Vaultik.
// A blob is a large file (up to 10GB) containing many compressed and encrypted chunks
// from multiple source files. Blobs are content-addressed, meaning their filename
// is derived from the SHA256 hash of their compressed and encrypted content.
//
// The blob creation process:
// 1. Chunks are accumulated from multiple files
// 2. The collection is compressed using zstd
// 3. The compressed data is encrypted using age
// 4. The encrypted blob is hashed to create its content-addressed name
// 5. The blob is uploaded to S3 using the hash as the filename
//
// This design optimizes storage efficiency by batching many small chunks into
// larger blobs, reducing the number of S3 operations and associated costs.
package blob
import (
"context"
"crypto/sha256"
"database/sql"
"encoding/hex"
"fmt"
"hash"
"io"
"math/bits"
"os"
"runtime"
"sync"
"time"
"git.eeqj.de/sneak/vaultik/internal/database"
"git.eeqj.de/sneak/vaultik/internal/log"
"github.com/google/uuid"
"github.com/klauspost/compress/zstd"
)
// BlobHandler is a callback function invoked when a blob is finalized and ready for upload.
// The handler receives a BlobWithReader containing the blob metadata and a reader for
// the compressed and encrypted blob content. The handler is responsible for uploading
// the blob to storage and cleaning up any temporary files.
type BlobHandler func(blob *BlobWithReader) error
// PackerConfig holds configuration for creating a Packer.
// All fields except BlobHandler are required.
type PackerConfig struct {
MaxBlobSize int64 // Maximum size of a blob before forcing finalization
CompressionLevel int // Zstd compression level (1-19, higher = better compression)
Encryptor Encryptor // Age encryptor for blob encryption (required)
Repositories *database.Repositories // Database repositories for tracking blob metadata
BlobHandler BlobHandler // Optional callback when blob is ready for upload
}
// Packer accumulates chunks and packs them into blobs.
// It handles compression, encryption, and coordination with the database
// to track blob metadata. Packer is thread-safe.
type Packer struct {
maxBlobSize int64
compressionLevel int
encryptor Encryptor // Required - blobs are always encrypted
blobHandler BlobHandler // Called when blob is ready
repos *database.Repositories // For creating blob records
// Mutex for thread-safe blob creation
mu sync.Mutex
// Current blob being packed
currentBlob *blobInProgress
finishedBlobs []*FinishedBlob // Only used if no handler provided
}
// Encryptor interface for encryption support
type Encryptor interface {
Encrypt(data []byte) ([]byte, error)
EncryptWriter(dst io.Writer) (io.WriteCloser, error)
}
// blobInProgress represents a blob being assembled
type blobInProgress struct {
id string // UUID of the blob
chunks []*chunkInfo // Track chunk metadata
chunkSet map[string]bool // Track unique chunks in this blob
tempFile *os.File // Temporary file for encrypted compressed data
hasher hash.Hash // For computing hash of final encrypted data
compressor io.WriteCloser // Compression writer
encryptor io.WriteCloser // Encryption writer (if encryption enabled)
finalWriter io.Writer // The final writer in the chain
startTime time.Time
size int64 // Current uncompressed size
compressedSize int64 // Current compressed size (estimated)
}
// ChunkRef represents a chunk to be added to a blob.
// The Hash is the content-addressed identifier (SHA256) of the chunk,
// and Data contains the raw chunk bytes. After adding to a blob,
// the Data can be safely discarded as it's written to the blob immediately.
type ChunkRef struct {
Hash string // SHA256 hash of the chunk data
Data []byte // Raw chunk content
}
// chunkInfo tracks chunk metadata in a blob
type chunkInfo struct {
Hash string
Offset int64
Size int64
}
// FinishedBlob represents a completed blob ready for storage
type FinishedBlob struct {
ID string
Hash string
Data []byte // Compressed data
Chunks []*BlobChunkRef
CreatedTS time.Time
Uncompressed int64
Compressed int64
}
// BlobChunkRef represents a chunk's position within a blob
type BlobChunkRef struct {
ChunkHash string
Offset int64
Length int64
}
// BlobWithReader wraps a FinishedBlob with its data reader
type BlobWithReader struct {
*FinishedBlob
Reader io.ReadSeeker
TempFile *os.File // Optional, only set for disk-based blobs
}
// NewPacker creates a new blob packer that accumulates chunks into blobs.
// The packer will automatically finalize blobs when they reach MaxBlobSize.
// Returns an error if required configuration fields are missing or invalid.
func NewPacker(cfg PackerConfig) (*Packer, error) {
if cfg.Encryptor == nil {
return nil, fmt.Errorf("encryptor is required - blobs must be encrypted")
}
if cfg.MaxBlobSize <= 0 {
return nil, fmt.Errorf("max blob size must be positive")
}
return &Packer{
maxBlobSize: cfg.MaxBlobSize,
compressionLevel: cfg.CompressionLevel,
encryptor: cfg.Encryptor,
blobHandler: cfg.BlobHandler,
repos: cfg.Repositories,
finishedBlobs: make([]*FinishedBlob, 0),
}, nil
}
// SetBlobHandler sets the handler to be called when a blob is finalized.
// The handler is responsible for uploading the blob to storage.
// If no handler is set, finalized blobs are stored in memory and can be
// retrieved with GetFinishedBlobs().
func (p *Packer) SetBlobHandler(handler BlobHandler) {
p.mu.Lock()
defer p.mu.Unlock()
p.blobHandler = handler
}
// AddChunk adds a chunk to the current blob being packed.
// If adding the chunk would exceed MaxBlobSize, returns ErrBlobSizeLimitExceeded.
// In this case, the caller should finalize the current blob and retry.
// The chunk data is written immediately and can be garbage collected after this call.
// Thread-safe.
func (p *Packer) AddChunk(chunk *ChunkRef) error {
p.mu.Lock()
defer p.mu.Unlock()
// Initialize new blob if needed
if p.currentBlob == nil {
if err := p.startNewBlob(); err != nil {
return fmt.Errorf("starting new blob: %w", err)
}
}
// Check if adding this chunk would exceed blob size limit
// Use conservative estimate: assume no compression
// Skip size check if chunk already exists in blob
if !p.currentBlob.chunkSet[chunk.Hash] {
currentSize := p.currentBlob.size
newSize := currentSize + int64(len(chunk.Data))
if newSize > p.maxBlobSize && len(p.currentBlob.chunks) > 0 {
// Return error indicating size limit would be exceeded
return ErrBlobSizeLimitExceeded
}
}
// Add chunk to current blob
if err := p.addChunkToCurrentBlob(chunk); err != nil {
return err
}
return nil
}
// Flush finalizes any in-progress blob, compressing, encrypting, and hashing it.
// This should be called after all chunks have been added to ensure no data is lost.
// If a BlobHandler is set, it will be called with the finalized blob.
// Thread-safe.
func (p *Packer) Flush() error {
p.mu.Lock()
defer p.mu.Unlock()
if p.currentBlob != nil && len(p.currentBlob.chunks) > 0 {
if err := p.finalizeCurrentBlob(); err != nil {
return fmt.Errorf("finalizing blob: %w", err)
}
}
return nil
}
// FinalizeBlob finalizes the current blob being assembled.
// This compresses the accumulated chunks, encrypts the result, and computes
// the content-addressed hash. The finalized blob is either passed to the
// BlobHandler (if set) or stored internally.
// Caller must handle retrying any chunk that triggered size limit exceeded.
// Not thread-safe - caller must hold the lock.
func (p *Packer) FinalizeBlob() error {
p.mu.Lock()
defer p.mu.Unlock()
if p.currentBlob == nil {
return nil
}
return p.finalizeCurrentBlob()
}
// GetFinishedBlobs returns all completed blobs and clears the internal list.
// This is only used when no BlobHandler is set. After calling this method,
// the caller is responsible for uploading the blobs to storage.
// Thread-safe.
func (p *Packer) GetFinishedBlobs() []*FinishedBlob {
p.mu.Lock()
defer p.mu.Unlock()
blobs := p.finishedBlobs
p.finishedBlobs = make([]*FinishedBlob, 0)
return blobs
}
// startNewBlob initializes a new blob (must be called with lock held)
func (p *Packer) startNewBlob() error {
// Generate UUID for the blob
blobID := uuid.New().String()
// Create blob record in database
if p.repos != nil {
blob := &database.Blob{
ID: blobID,
Hash: "temp-placeholder-" + blobID, // Temporary placeholder until finalized
CreatedTS: time.Now().UTC(),
FinishedTS: nil,
UncompressedSize: 0,
CompressedSize: 0,
UploadedTS: nil,
}
err := p.repos.WithTx(context.Background(), func(ctx context.Context, tx *sql.Tx) error {
return p.repos.Blobs.Create(ctx, tx, blob)
})
if err != nil {
return fmt.Errorf("creating blob record: %w", err)
}
}
// Create temporary file
tempFile, err := os.CreateTemp("", "vaultik-blob-*.tmp")
if err != nil {
return fmt.Errorf("creating temp file: %w", err)
}
p.currentBlob = &blobInProgress{
id: blobID,
chunks: make([]*chunkInfo, 0),
chunkSet: make(map[string]bool),
startTime: time.Now().UTC(),
tempFile: tempFile,
hasher: sha256.New(),
size: 0,
compressedSize: 0,
}
// Build writer chain: compressor -> [encryptor ->] hasher+file
// This ensures only encrypted data touches disk
// Final destination: write to both file and hasher
finalWriter := io.MultiWriter(tempFile, p.currentBlob.hasher)
// Set up encryption (required - closest to disk)
encWriter, err := p.encryptor.EncryptWriter(finalWriter)
if err != nil {
_ = tempFile.Close()
_ = os.Remove(tempFile.Name())
return fmt.Errorf("creating encryption writer: %w", err)
}
p.currentBlob.encryptor = encWriter
currentWriter := encWriter
// Set up compression (processes data before encryption)
encoderLevel := zstd.EncoderLevel(p.compressionLevel)
if p.compressionLevel < 1 {
encoderLevel = zstd.SpeedDefault
} else if p.compressionLevel > 9 {
encoderLevel = zstd.SpeedBestCompression
}
// Calculate window size based on blob size
windowSize := p.maxBlobSize / 100
if windowSize < (1 << 20) { // Min 1MB
windowSize = 1 << 20
} else if windowSize > (128 << 20) { // Max 128MB
windowSize = 128 << 20
}
windowSize = 1 << uint(63-bits.LeadingZeros64(uint64(windowSize)))
compWriter, err := zstd.NewWriter(currentWriter,
zstd.WithEncoderLevel(encoderLevel),
zstd.WithEncoderConcurrency(runtime.NumCPU()),
zstd.WithWindowSize(int(windowSize)),
)
if err != nil {
if p.currentBlob.encryptor != nil {
_ = p.currentBlob.encryptor.Close()
}
_ = tempFile.Close()
_ = os.Remove(tempFile.Name())
return fmt.Errorf("creating compression writer: %w", err)
}
p.currentBlob.compressor = compWriter
p.currentBlob.finalWriter = compWriter
log.Debug("Started new blob", "blob_id", blobID, "temp_file", tempFile.Name())
return nil
}
// addChunkToCurrentBlob adds a chunk to the current blob (must be called with lock held)
func (p *Packer) addChunkToCurrentBlob(chunk *ChunkRef) error {
// Skip if chunk already in current blob
if p.currentBlob.chunkSet[chunk.Hash] {
log.Debug("Skipping duplicate chunk in blob", "chunk_hash", chunk.Hash)
return nil
}
// Track offset before writing
offset := p.currentBlob.size
// Write to the final writer (compression -> encryption -> disk)
if _, err := p.currentBlob.finalWriter.Write(chunk.Data); err != nil {
return fmt.Errorf("writing to blob stream: %w", err)
}
// Track chunk info
chunkSize := int64(len(chunk.Data))
chunkInfo := &chunkInfo{
Hash: chunk.Hash,
Offset: offset,
Size: chunkSize,
}
p.currentBlob.chunks = append(p.currentBlob.chunks, chunkInfo)
p.currentBlob.chunkSet[chunk.Hash] = true
// Store blob-chunk association in database immediately
if p.repos != nil {
blobChunk := &database.BlobChunk{
BlobID: p.currentBlob.id,
ChunkHash: chunk.Hash,
Offset: offset,
Length: chunkSize,
}
err := p.repos.WithTx(context.Background(), func(ctx context.Context, tx *sql.Tx) error {
return p.repos.BlobChunks.Create(ctx, tx, blobChunk)
})
if err != nil {
log.Error("Failed to store blob-chunk association", "error", err,
"blob_id", p.currentBlob.id, "chunk_hash", chunk.Hash)
// Continue anyway - we can reconstruct this later if needed
}
}
// Update total size
p.currentBlob.size += chunkSize
log.Debug("Added chunk to blob",
"blob_id", p.currentBlob.id,
"chunk_hash", chunk.Hash,
"chunk_size", len(chunk.Data),
"offset", offset,
"blob_chunks", len(p.currentBlob.chunks),
"uncompressed_size", p.currentBlob.size)
return nil
}
// finalizeCurrentBlob completes the current blob (must be called with lock held)
func (p *Packer) finalizeCurrentBlob() error {
if p.currentBlob == nil {
return nil
}
// Close compression writer to flush all data
if err := p.currentBlob.compressor.Close(); err != nil {
p.cleanupTempFile()
return fmt.Errorf("closing compression writer: %w", err)
}
// Close encryption writer
if err := p.currentBlob.encryptor.Close(); err != nil {
p.cleanupTempFile()
return fmt.Errorf("closing encryption writer: %w", err)
}
// Sync file to ensure all data is written
if err := p.currentBlob.tempFile.Sync(); err != nil {
p.cleanupTempFile()
return fmt.Errorf("syncing temp file: %w", err)
}
// Get the final size (encrypted if applicable)
finalSize, err := p.currentBlob.tempFile.Seek(0, io.SeekCurrent)
if err != nil {
p.cleanupTempFile()
return fmt.Errorf("getting file size: %w", err)
}
// Reset to beginning for reading
if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil {
p.cleanupTempFile()
return fmt.Errorf("seeking to start: %w", err)
}
// Get hash from hasher (of final encrypted data)
finalHash := p.currentBlob.hasher.Sum(nil)
blobHash := hex.EncodeToString(finalHash)
// Create chunk references with offsets
chunkRefs := make([]*BlobChunkRef, 0, len(p.currentBlob.chunks))
for _, chunk := range p.currentBlob.chunks {
chunkRefs = append(chunkRefs, &BlobChunkRef{
ChunkHash: chunk.Hash,
Offset: chunk.Offset,
Length: chunk.Size,
})
}
// Update blob record in database with hash and sizes
if p.repos != nil {
err := p.repos.WithTx(context.Background(), func(ctx context.Context, tx *sql.Tx) error {
return p.repos.Blobs.UpdateFinished(ctx, tx, p.currentBlob.id, blobHash,
p.currentBlob.size, finalSize)
})
if err != nil {
p.cleanupTempFile()
return fmt.Errorf("updating blob record: %w", err)
}
}
// Create finished blob
finished := &FinishedBlob{
ID: p.currentBlob.id,
Hash: blobHash,
Data: nil, // We don't load data into memory anymore
Chunks: chunkRefs,
CreatedTS: p.currentBlob.startTime,
Uncompressed: p.currentBlob.size,
Compressed: finalSize,
}
compressionRatio := float64(finished.Compressed) / float64(finished.Uncompressed)
log.Info("Finalized blob",
"hash", blobHash,
"chunks", len(chunkRefs),
"uncompressed", finished.Uncompressed,
"compressed", finished.Compressed,
"ratio", fmt.Sprintf("%.2f", compressionRatio),
"duration", time.Since(p.currentBlob.startTime))
// Call blob handler if set
if p.blobHandler != nil {
log.Debug("Calling blob handler", "blob_hash", blobHash[:8]+"...")
// Reset file position for handler
if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil {
p.cleanupTempFile()
return fmt.Errorf("seeking for handler: %w", err)
}
// Create a blob reader that includes the data stream
blobWithReader := &BlobWithReader{
FinishedBlob: finished,
Reader: p.currentBlob.tempFile,
TempFile: p.currentBlob.tempFile,
}
if err := p.blobHandler(blobWithReader); err != nil {
p.cleanupTempFile()
return fmt.Errorf("blob handler failed: %w", err)
}
// Note: blob handler is responsible for closing/cleaning up temp file
p.currentBlob = nil
} else {
log.Debug("No blob handler set", "blob_hash", blobHash[:8]+"...")
// No handler, need to read data for legacy behavior
if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil {
p.cleanupTempFile()
return fmt.Errorf("seeking to read data: %w", err)
}
data, err := io.ReadAll(p.currentBlob.tempFile)
if err != nil {
p.cleanupTempFile()
return fmt.Errorf("reading blob data: %w", err)
}
finished.Data = data
p.finishedBlobs = append(p.finishedBlobs, finished)
// Cleanup
p.cleanupTempFile()
p.currentBlob = nil
}
return nil
}
// cleanupTempFile removes the temporary file
func (p *Packer) cleanupTempFile() {
if p.currentBlob != nil && p.currentBlob.tempFile != nil {
name := p.currentBlob.tempFile.Name()
_ = p.currentBlob.tempFile.Close()
_ = os.Remove(name)
}
}
// PackChunks is a convenience method to pack multiple chunks at once
func (p *Packer) PackChunks(chunks []*ChunkRef) error {
for _, chunk := range chunks {
err := p.AddChunk(chunk)
if err == ErrBlobSizeLimitExceeded {
// Finalize current blob and retry
if err := p.FinalizeBlob(); err != nil {
return fmt.Errorf("finalizing blob before retry: %w", err)
}
// Retry the chunk
if err := p.AddChunk(chunk); err != nil {
return fmt.Errorf("adding chunk %s after finalize: %w", chunk.Hash, err)
}
} else if err != nil {
return fmt.Errorf("adding chunk %s: %w", chunk.Hash, err)
}
}
return p.Flush()
}