// Package blob handles the creation of blobs - the final storage units for Vaultik. // A blob is a large file (up to 10GB) containing many compressed and encrypted chunks // from multiple source files. Blobs are content-addressed, meaning their filename // is derived from the SHA256 hash of their compressed and encrypted content. // // The blob creation process: // 1. Chunks are accumulated from multiple files // 2. The collection is compressed using zstd // 3. The compressed data is encrypted using age // 4. The encrypted blob is hashed to create its content-addressed name // 5. The blob is uploaded to S3 using the hash as the filename // // This design optimizes storage efficiency by batching many small chunks into // larger blobs, reducing the number of S3 operations and associated costs. package blob import ( "context" "database/sql" "encoding/hex" "fmt" "io" "sync" "time" "git.eeqj.de/sneak/vaultik/internal/blobgen" "git.eeqj.de/sneak/vaultik/internal/database" "git.eeqj.de/sneak/vaultik/internal/log" "github.com/google/uuid" "github.com/spf13/afero" ) // BlobHandler is a callback function invoked when a blob is finalized and ready for upload. // The handler receives a BlobWithReader containing the blob metadata and a reader for // the compressed and encrypted blob content. The handler is responsible for uploading // the blob to storage and cleaning up any temporary files. type BlobHandler func(blob *BlobWithReader) error // PackerConfig holds configuration for creating a Packer. // All fields except BlobHandler are required. type PackerConfig struct { MaxBlobSize int64 // Maximum size of a blob before forcing finalization CompressionLevel int // Zstd compression level (1-19, higher = better compression) Recipients []string // Age recipients for encryption Repositories *database.Repositories // Database repositories for tracking blob metadata BlobHandler BlobHandler // Optional callback when blob is ready for upload Fs afero.Fs // Filesystem for temporary files } // Packer accumulates chunks and packs them into blobs. // It handles compression, encryption, and coordination with the database // to track blob metadata. Packer is thread-safe. type Packer struct { maxBlobSize int64 compressionLevel int recipients []string // Age recipients for encryption blobHandler BlobHandler // Called when blob is ready repos *database.Repositories // For creating blob records fs afero.Fs // Filesystem for temporary files // Mutex for thread-safe blob creation mu sync.Mutex // Current blob being packed currentBlob *blobInProgress finishedBlobs []*FinishedBlob // Only used if no handler provided } // blobInProgress represents a blob being assembled type blobInProgress struct { id string // UUID of the blob chunks []*chunkInfo // Track chunk metadata chunkSet map[string]bool // Track unique chunks in this blob tempFile afero.File // Temporary file for encrypted compressed data writer *blobgen.Writer // Unified compression/encryption/hashing writer startTime time.Time size int64 // Current uncompressed size } // ChunkRef represents a chunk to be added to a blob. // The Hash is the content-addressed identifier (SHA256) of the chunk, // and Data contains the raw chunk bytes. After adding to a blob, // the Data can be safely discarded as it's written to the blob immediately. type ChunkRef struct { Hash string // SHA256 hash of the chunk data Data []byte // Raw chunk content } // chunkInfo tracks chunk metadata in a blob type chunkInfo struct { Hash string Offset int64 Size int64 } // FinishedBlob represents a completed blob ready for storage type FinishedBlob struct { ID string Hash string Data []byte // Compressed data Chunks []*BlobChunkRef CreatedTS time.Time Uncompressed int64 Compressed int64 } // BlobChunkRef represents a chunk's position within a blob type BlobChunkRef struct { ChunkHash string Offset int64 Length int64 } // BlobWithReader wraps a FinishedBlob with its data reader type BlobWithReader struct { *FinishedBlob Reader io.ReadSeeker TempFile afero.File // Optional, only set for disk-based blobs } // NewPacker creates a new blob packer that accumulates chunks into blobs. // The packer will automatically finalize blobs when they reach MaxBlobSize. // Returns an error if required configuration fields are missing or invalid. func NewPacker(cfg PackerConfig) (*Packer, error) { if len(cfg.Recipients) == 0 { return nil, fmt.Errorf("recipients are required - blobs must be encrypted") } if cfg.MaxBlobSize <= 0 { return nil, fmt.Errorf("max blob size must be positive") } if cfg.Fs == nil { return nil, fmt.Errorf("filesystem is required") } return &Packer{ maxBlobSize: cfg.MaxBlobSize, compressionLevel: cfg.CompressionLevel, recipients: cfg.Recipients, blobHandler: cfg.BlobHandler, repos: cfg.Repositories, fs: cfg.Fs, finishedBlobs: make([]*FinishedBlob, 0), }, nil } // SetBlobHandler sets the handler to be called when a blob is finalized. // The handler is responsible for uploading the blob to storage. // If no handler is set, finalized blobs are stored in memory and can be // retrieved with GetFinishedBlobs(). func (p *Packer) SetBlobHandler(handler BlobHandler) { p.mu.Lock() defer p.mu.Unlock() p.blobHandler = handler } // AddChunk adds a chunk to the current blob being packed. // If adding the chunk would exceed MaxBlobSize, returns ErrBlobSizeLimitExceeded. // In this case, the caller should finalize the current blob and retry. // The chunk data is written immediately and can be garbage collected after this call. // Thread-safe. func (p *Packer) AddChunk(chunk *ChunkRef) error { p.mu.Lock() defer p.mu.Unlock() // Initialize new blob if needed if p.currentBlob == nil { if err := p.startNewBlob(); err != nil { return fmt.Errorf("starting new blob: %w", err) } } // Check if adding this chunk would exceed blob size limit // Use conservative estimate: assume no compression // Skip size check if chunk already exists in blob if !p.currentBlob.chunkSet[chunk.Hash] { currentSize := p.currentBlob.size newSize := currentSize + int64(len(chunk.Data)) if newSize > p.maxBlobSize && len(p.currentBlob.chunks) > 0 { // Return error indicating size limit would be exceeded return ErrBlobSizeLimitExceeded } } // Add chunk to current blob if err := p.addChunkToCurrentBlob(chunk); err != nil { return err } return nil } // Flush finalizes any in-progress blob, compressing, encrypting, and hashing it. // This should be called after all chunks have been added to ensure no data is lost. // If a BlobHandler is set, it will be called with the finalized blob. // Thread-safe. func (p *Packer) Flush() error { p.mu.Lock() defer p.mu.Unlock() if p.currentBlob != nil && len(p.currentBlob.chunks) > 0 { if err := p.finalizeCurrentBlob(); err != nil { return fmt.Errorf("finalizing blob: %w", err) } } return nil } // FinalizeBlob finalizes the current blob being assembled. // This compresses the accumulated chunks, encrypts the result, and computes // the content-addressed hash. The finalized blob is either passed to the // BlobHandler (if set) or stored internally. // Caller must handle retrying any chunk that triggered size limit exceeded. // Not thread-safe - caller must hold the lock. func (p *Packer) FinalizeBlob() error { p.mu.Lock() defer p.mu.Unlock() if p.currentBlob == nil { return nil } return p.finalizeCurrentBlob() } // GetFinishedBlobs returns all completed blobs and clears the internal list. // This is only used when no BlobHandler is set. After calling this method, // the caller is responsible for uploading the blobs to storage. // Thread-safe. func (p *Packer) GetFinishedBlobs() []*FinishedBlob { p.mu.Lock() defer p.mu.Unlock() blobs := p.finishedBlobs p.finishedBlobs = make([]*FinishedBlob, 0) return blobs } // startNewBlob initializes a new blob (must be called with lock held) func (p *Packer) startNewBlob() error { // Generate UUID for the blob blobID := uuid.New().String() // Create blob record in database if p.repos != nil { blob := &database.Blob{ ID: blobID, Hash: "temp-placeholder-" + blobID, // Temporary placeholder until finalized CreatedTS: time.Now().UTC(), FinishedTS: nil, UncompressedSize: 0, CompressedSize: 0, UploadedTS: nil, } err := p.repos.WithTx(context.Background(), func(ctx context.Context, tx *sql.Tx) error { return p.repos.Blobs.Create(ctx, tx, blob) }) if err != nil { return fmt.Errorf("creating blob record: %w", err) } } // Create temporary file tempFile, err := afero.TempFile(p.fs, "", "vaultik-blob-*.tmp") if err != nil { return fmt.Errorf("creating temp file: %w", err) } // Create blobgen writer for unified compression/encryption/hashing writer, err := blobgen.NewWriter(tempFile, p.compressionLevel, p.recipients) if err != nil { _ = tempFile.Close() _ = p.fs.Remove(tempFile.Name()) return fmt.Errorf("creating blobgen writer: %w", err) } p.currentBlob = &blobInProgress{ id: blobID, chunks: make([]*chunkInfo, 0), chunkSet: make(map[string]bool), startTime: time.Now().UTC(), tempFile: tempFile, writer: writer, size: 0, } log.Debug("Created new blob container", "blob_id", blobID, "temp_file", tempFile.Name()) return nil } // addChunkToCurrentBlob adds a chunk to the current blob (must be called with lock held) func (p *Packer) addChunkToCurrentBlob(chunk *ChunkRef) error { // Skip if chunk already in current blob if p.currentBlob.chunkSet[chunk.Hash] { log.Debug("Skipping duplicate chunk already in current blob", "chunk_hash", chunk.Hash) return nil } // Track offset before writing offset := p.currentBlob.size // Write to the blobgen writer (compression -> encryption -> disk) if _, err := p.currentBlob.writer.Write(chunk.Data); err != nil { return fmt.Errorf("writing to blob stream: %w", err) } // Track chunk info chunkSize := int64(len(chunk.Data)) chunkInfo := &chunkInfo{ Hash: chunk.Hash, Offset: offset, Size: chunkSize, } p.currentBlob.chunks = append(p.currentBlob.chunks, chunkInfo) p.currentBlob.chunkSet[chunk.Hash] = true // Store blob-chunk association in database immediately if p.repos != nil { blobChunk := &database.BlobChunk{ BlobID: p.currentBlob.id, ChunkHash: chunk.Hash, Offset: offset, Length: chunkSize, } err := p.repos.WithTx(context.Background(), func(ctx context.Context, tx *sql.Tx) error { return p.repos.BlobChunks.Create(ctx, tx, blobChunk) }) if err != nil { log.Error("Failed to store blob-chunk association in database", "error", err, "blob_id", p.currentBlob.id, "chunk_hash", chunk.Hash) // Continue anyway - we can reconstruct this later if needed } } // Update total size p.currentBlob.size += chunkSize log.Debug("Added chunk to blob container", "blob_id", p.currentBlob.id, "chunk_hash", chunk.Hash, "chunk_size", len(chunk.Data), "offset", offset, "blob_chunks", len(p.currentBlob.chunks), "uncompressed_size", p.currentBlob.size) return nil } // finalizeCurrentBlob completes the current blob (must be called with lock held) func (p *Packer) finalizeCurrentBlob() error { if p.currentBlob == nil { return nil } // Close blobgen writer to flush all data if err := p.currentBlob.writer.Close(); err != nil { p.cleanupTempFile() return fmt.Errorf("closing blobgen writer: %w", err) } // Sync file to ensure all data is written if err := p.currentBlob.tempFile.Sync(); err != nil { p.cleanupTempFile() return fmt.Errorf("syncing temp file: %w", err) } // Get the final size (encrypted if applicable) finalSize, err := p.currentBlob.tempFile.Seek(0, io.SeekCurrent) if err != nil { p.cleanupTempFile() return fmt.Errorf("getting file size: %w", err) } // Reset to beginning for reading if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil { p.cleanupTempFile() return fmt.Errorf("seeking to start: %w", err) } // Get hash from blobgen writer (of final encrypted data) finalHash := p.currentBlob.writer.Sum256() blobHash := hex.EncodeToString(finalHash) // Create chunk references with offsets chunkRefs := make([]*BlobChunkRef, 0, len(p.currentBlob.chunks)) for _, chunk := range p.currentBlob.chunks { chunkRefs = append(chunkRefs, &BlobChunkRef{ ChunkHash: chunk.Hash, Offset: chunk.Offset, Length: chunk.Size, }) } // Update blob record in database with hash and sizes if p.repos != nil { err := p.repos.WithTx(context.Background(), func(ctx context.Context, tx *sql.Tx) error { return p.repos.Blobs.UpdateFinished(ctx, tx, p.currentBlob.id, blobHash, p.currentBlob.size, finalSize) }) if err != nil { p.cleanupTempFile() return fmt.Errorf("updating blob record: %w", err) } } // Create finished blob finished := &FinishedBlob{ ID: p.currentBlob.id, Hash: blobHash, Data: nil, // We don't load data into memory anymore Chunks: chunkRefs, CreatedTS: p.currentBlob.startTime, Uncompressed: p.currentBlob.size, Compressed: finalSize, } compressionRatio := float64(finished.Compressed) / float64(finished.Uncompressed) log.Info("Finalized blob (compressed and encrypted)", "hash", blobHash, "chunks", len(chunkRefs), "uncompressed", finished.Uncompressed, "compressed", finished.Compressed, "ratio", fmt.Sprintf("%.2f", compressionRatio), "duration", time.Since(p.currentBlob.startTime)) // Call blob handler if set if p.blobHandler != nil { // Reset file position for handler if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil { p.cleanupTempFile() return fmt.Errorf("seeking for handler: %w", err) } // Create a blob reader that includes the data stream blobWithReader := &BlobWithReader{ FinishedBlob: finished, Reader: p.currentBlob.tempFile, TempFile: p.currentBlob.tempFile, } if err := p.blobHandler(blobWithReader); err != nil { p.cleanupTempFile() return fmt.Errorf("blob handler failed: %w", err) } // Note: blob handler is responsible for closing/cleaning up temp file p.currentBlob = nil } else { log.Debug("No blob handler callback configured", "blob_hash", blobHash[:8]+"...") // No handler, need to read data for legacy behavior if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil { p.cleanupTempFile() return fmt.Errorf("seeking to read data: %w", err) } data, err := io.ReadAll(p.currentBlob.tempFile) if err != nil { p.cleanupTempFile() return fmt.Errorf("reading blob data: %w", err) } finished.Data = data p.finishedBlobs = append(p.finishedBlobs, finished) // Cleanup p.cleanupTempFile() p.currentBlob = nil } return nil } // cleanupTempFile removes the temporary file func (p *Packer) cleanupTempFile() { if p.currentBlob != nil && p.currentBlob.tempFile != nil { name := p.currentBlob.tempFile.Name() _ = p.currentBlob.tempFile.Close() _ = p.fs.Remove(name) } } // PackChunks is a convenience method to pack multiple chunks at once func (p *Packer) PackChunks(chunks []*ChunkRef) error { for _, chunk := range chunks { err := p.AddChunk(chunk) if err == ErrBlobSizeLimitExceeded { // Finalize current blob and retry if err := p.FinalizeBlob(); err != nil { return fmt.Errorf("finalizing blob before retry: %w", err) } // Retry the chunk if err := p.AddChunk(chunk); err != nil { return fmt.Errorf("adding chunk %s after finalize: %w", chunk.Hash, err) } } else if err != nil { return fmt.Errorf("adding chunk %s: %w", chunk.Hash, err) } } return p.Flush() }