- Created new internal/vaultik package with unified Vaultik struct - Moved all command methods (snapshot, info, prune, verify) from CLI to vaultik package - Implemented single constructor that handles crypto capabilities automatically - Added CanDecrypt() method to check if decryption is available - Updated all CLI commands to use the new vaultik.Vaultik struct - Removed old fragmented App structs and WithCrypto wrapper - Fixed context management - Vaultik now owns its context lifecycle - Cleaned up package imports and dependencies This creates a cleaner separation between CLI/Cobra code and business logic, with all vaultik operations now centralized in the internal/vaultik package.
496 lines
16 KiB
Go
496 lines
16 KiB
Go
// Package blob handles the creation of blobs - the final storage units for Vaultik.
|
|
// A blob is a large file (up to 10GB) containing many compressed and encrypted chunks
|
|
// from multiple source files. Blobs are content-addressed, meaning their filename
|
|
// is derived from the SHA256 hash of their compressed and encrypted content.
|
|
//
|
|
// The blob creation process:
|
|
// 1. Chunks are accumulated from multiple files
|
|
// 2. The collection is compressed using zstd
|
|
// 3. The compressed data is encrypted using age
|
|
// 4. The encrypted blob is hashed to create its content-addressed name
|
|
// 5. The blob is uploaded to S3 using the hash as the filename
|
|
//
|
|
// This design optimizes storage efficiency by batching many small chunks into
|
|
// larger blobs, reducing the number of S3 operations and associated costs.
|
|
package blob
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.eeqj.de/sneak/vaultik/internal/blobgen"
|
|
"git.eeqj.de/sneak/vaultik/internal/database"
|
|
"git.eeqj.de/sneak/vaultik/internal/log"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// BlobHandler is a callback function invoked when a blob is finalized and ready for upload.
|
|
// The handler receives a BlobWithReader containing the blob metadata and a reader for
|
|
// the compressed and encrypted blob content. The handler is responsible for uploading
|
|
// the blob to storage and cleaning up any temporary files.
|
|
type BlobHandler func(blob *BlobWithReader) error
|
|
|
|
// PackerConfig holds configuration for creating a Packer.
|
|
// All fields except BlobHandler are required.
|
|
type PackerConfig struct {
|
|
MaxBlobSize int64 // Maximum size of a blob before forcing finalization
|
|
CompressionLevel int // Zstd compression level (1-19, higher = better compression)
|
|
Recipients []string // Age recipients for encryption
|
|
Repositories *database.Repositories // Database repositories for tracking blob metadata
|
|
BlobHandler BlobHandler // Optional callback when blob is ready for upload
|
|
}
|
|
|
|
// Packer accumulates chunks and packs them into blobs.
|
|
// It handles compression, encryption, and coordination with the database
|
|
// to track blob metadata. Packer is thread-safe.
|
|
type Packer struct {
|
|
maxBlobSize int64
|
|
compressionLevel int
|
|
recipients []string // Age recipients for encryption
|
|
blobHandler BlobHandler // Called when blob is ready
|
|
repos *database.Repositories // For creating blob records
|
|
|
|
// Mutex for thread-safe blob creation
|
|
mu sync.Mutex
|
|
|
|
// Current blob being packed
|
|
currentBlob *blobInProgress
|
|
finishedBlobs []*FinishedBlob // Only used if no handler provided
|
|
}
|
|
|
|
// blobInProgress represents a blob being assembled
|
|
type blobInProgress struct {
|
|
id string // UUID of the blob
|
|
chunks []*chunkInfo // Track chunk metadata
|
|
chunkSet map[string]bool // Track unique chunks in this blob
|
|
tempFile *os.File // Temporary file for encrypted compressed data
|
|
writer *blobgen.Writer // Unified compression/encryption/hashing writer
|
|
startTime time.Time
|
|
size int64 // Current uncompressed size
|
|
}
|
|
|
|
// ChunkRef represents a chunk to be added to a blob.
|
|
// The Hash is the content-addressed identifier (SHA256) of the chunk,
|
|
// and Data contains the raw chunk bytes. After adding to a blob,
|
|
// the Data can be safely discarded as it's written to the blob immediately.
|
|
type ChunkRef struct {
|
|
Hash string // SHA256 hash of the chunk data
|
|
Data []byte // Raw chunk content
|
|
}
|
|
|
|
// chunkInfo tracks chunk metadata in a blob
|
|
type chunkInfo struct {
|
|
Hash string
|
|
Offset int64
|
|
Size int64
|
|
}
|
|
|
|
// FinishedBlob represents a completed blob ready for storage
|
|
type FinishedBlob struct {
|
|
ID string
|
|
Hash string
|
|
Data []byte // Compressed data
|
|
Chunks []*BlobChunkRef
|
|
CreatedTS time.Time
|
|
Uncompressed int64
|
|
Compressed int64
|
|
}
|
|
|
|
// BlobChunkRef represents a chunk's position within a blob
|
|
type BlobChunkRef struct {
|
|
ChunkHash string
|
|
Offset int64
|
|
Length int64
|
|
}
|
|
|
|
// BlobWithReader wraps a FinishedBlob with its data reader
|
|
type BlobWithReader struct {
|
|
*FinishedBlob
|
|
Reader io.ReadSeeker
|
|
TempFile *os.File // Optional, only set for disk-based blobs
|
|
}
|
|
|
|
// NewPacker creates a new blob packer that accumulates chunks into blobs.
|
|
// The packer will automatically finalize blobs when they reach MaxBlobSize.
|
|
// Returns an error if required configuration fields are missing or invalid.
|
|
func NewPacker(cfg PackerConfig) (*Packer, error) {
|
|
if len(cfg.Recipients) == 0 {
|
|
return nil, fmt.Errorf("recipients are required - blobs must be encrypted")
|
|
}
|
|
if cfg.MaxBlobSize <= 0 {
|
|
return nil, fmt.Errorf("max blob size must be positive")
|
|
}
|
|
return &Packer{
|
|
maxBlobSize: cfg.MaxBlobSize,
|
|
compressionLevel: cfg.CompressionLevel,
|
|
recipients: cfg.Recipients,
|
|
blobHandler: cfg.BlobHandler,
|
|
repos: cfg.Repositories,
|
|
finishedBlobs: make([]*FinishedBlob, 0),
|
|
}, nil
|
|
}
|
|
|
|
// SetBlobHandler sets the handler to be called when a blob is finalized.
|
|
// The handler is responsible for uploading the blob to storage.
|
|
// If no handler is set, finalized blobs are stored in memory and can be
|
|
// retrieved with GetFinishedBlobs().
|
|
func (p *Packer) SetBlobHandler(handler BlobHandler) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
p.blobHandler = handler
|
|
}
|
|
|
|
// AddChunk adds a chunk to the current blob being packed.
|
|
// If adding the chunk would exceed MaxBlobSize, returns ErrBlobSizeLimitExceeded.
|
|
// In this case, the caller should finalize the current blob and retry.
|
|
// The chunk data is written immediately and can be garbage collected after this call.
|
|
// Thread-safe.
|
|
func (p *Packer) AddChunk(chunk *ChunkRef) error {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
// Initialize new blob if needed
|
|
if p.currentBlob == nil {
|
|
if err := p.startNewBlob(); err != nil {
|
|
return fmt.Errorf("starting new blob: %w", err)
|
|
}
|
|
}
|
|
|
|
// Check if adding this chunk would exceed blob size limit
|
|
// Use conservative estimate: assume no compression
|
|
// Skip size check if chunk already exists in blob
|
|
if !p.currentBlob.chunkSet[chunk.Hash] {
|
|
currentSize := p.currentBlob.size
|
|
newSize := currentSize + int64(len(chunk.Data))
|
|
|
|
if newSize > p.maxBlobSize && len(p.currentBlob.chunks) > 0 {
|
|
// Return error indicating size limit would be exceeded
|
|
return ErrBlobSizeLimitExceeded
|
|
}
|
|
}
|
|
|
|
// Add chunk to current blob
|
|
if err := p.addChunkToCurrentBlob(chunk); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Flush finalizes any in-progress blob, compressing, encrypting, and hashing it.
|
|
// This should be called after all chunks have been added to ensure no data is lost.
|
|
// If a BlobHandler is set, it will be called with the finalized blob.
|
|
// Thread-safe.
|
|
func (p *Packer) Flush() error {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
if p.currentBlob != nil && len(p.currentBlob.chunks) > 0 {
|
|
if err := p.finalizeCurrentBlob(); err != nil {
|
|
return fmt.Errorf("finalizing blob: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// FinalizeBlob finalizes the current blob being assembled.
|
|
// This compresses the accumulated chunks, encrypts the result, and computes
|
|
// the content-addressed hash. The finalized blob is either passed to the
|
|
// BlobHandler (if set) or stored internally.
|
|
// Caller must handle retrying any chunk that triggered size limit exceeded.
|
|
// Not thread-safe - caller must hold the lock.
|
|
func (p *Packer) FinalizeBlob() error {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
if p.currentBlob == nil {
|
|
return nil
|
|
}
|
|
|
|
return p.finalizeCurrentBlob()
|
|
}
|
|
|
|
// GetFinishedBlobs returns all completed blobs and clears the internal list.
|
|
// This is only used when no BlobHandler is set. After calling this method,
|
|
// the caller is responsible for uploading the blobs to storage.
|
|
// Thread-safe.
|
|
func (p *Packer) GetFinishedBlobs() []*FinishedBlob {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
blobs := p.finishedBlobs
|
|
p.finishedBlobs = make([]*FinishedBlob, 0)
|
|
return blobs
|
|
}
|
|
|
|
// startNewBlob initializes a new blob (must be called with lock held)
|
|
func (p *Packer) startNewBlob() error {
|
|
// Generate UUID for the blob
|
|
blobID := uuid.New().String()
|
|
|
|
// Create blob record in database
|
|
if p.repos != nil {
|
|
blob := &database.Blob{
|
|
ID: blobID,
|
|
Hash: "temp-placeholder-" + blobID, // Temporary placeholder until finalized
|
|
CreatedTS: time.Now().UTC(),
|
|
FinishedTS: nil,
|
|
UncompressedSize: 0,
|
|
CompressedSize: 0,
|
|
UploadedTS: nil,
|
|
}
|
|
err := p.repos.WithTx(context.Background(), func(ctx context.Context, tx *sql.Tx) error {
|
|
return p.repos.Blobs.Create(ctx, tx, blob)
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("creating blob record: %w", err)
|
|
}
|
|
}
|
|
|
|
// Create temporary file
|
|
tempFile, err := os.CreateTemp("", "vaultik-blob-*.tmp")
|
|
if err != nil {
|
|
return fmt.Errorf("creating temp file: %w", err)
|
|
}
|
|
|
|
// Create blobgen writer for unified compression/encryption/hashing
|
|
writer, err := blobgen.NewWriter(tempFile, p.compressionLevel, p.recipients)
|
|
if err != nil {
|
|
_ = tempFile.Close()
|
|
_ = os.Remove(tempFile.Name())
|
|
return fmt.Errorf("creating blobgen writer: %w", err)
|
|
}
|
|
|
|
p.currentBlob = &blobInProgress{
|
|
id: blobID,
|
|
chunks: make([]*chunkInfo, 0),
|
|
chunkSet: make(map[string]bool),
|
|
startTime: time.Now().UTC(),
|
|
tempFile: tempFile,
|
|
writer: writer,
|
|
size: 0,
|
|
}
|
|
|
|
log.Debug("Created new blob container", "blob_id", blobID, "temp_file", tempFile.Name())
|
|
return nil
|
|
}
|
|
|
|
// addChunkToCurrentBlob adds a chunk to the current blob (must be called with lock held)
|
|
func (p *Packer) addChunkToCurrentBlob(chunk *ChunkRef) error {
|
|
// Skip if chunk already in current blob
|
|
if p.currentBlob.chunkSet[chunk.Hash] {
|
|
log.Debug("Skipping duplicate chunk already in current blob", "chunk_hash", chunk.Hash)
|
|
return nil
|
|
}
|
|
|
|
// Track offset before writing
|
|
offset := p.currentBlob.size
|
|
|
|
// Write to the blobgen writer (compression -> encryption -> disk)
|
|
if _, err := p.currentBlob.writer.Write(chunk.Data); err != nil {
|
|
return fmt.Errorf("writing to blob stream: %w", err)
|
|
}
|
|
|
|
// Track chunk info
|
|
chunkSize := int64(len(chunk.Data))
|
|
chunkInfo := &chunkInfo{
|
|
Hash: chunk.Hash,
|
|
Offset: offset,
|
|
Size: chunkSize,
|
|
}
|
|
p.currentBlob.chunks = append(p.currentBlob.chunks, chunkInfo)
|
|
p.currentBlob.chunkSet[chunk.Hash] = true
|
|
|
|
// Store blob-chunk association in database immediately
|
|
if p.repos != nil {
|
|
blobChunk := &database.BlobChunk{
|
|
BlobID: p.currentBlob.id,
|
|
ChunkHash: chunk.Hash,
|
|
Offset: offset,
|
|
Length: chunkSize,
|
|
}
|
|
err := p.repos.WithTx(context.Background(), func(ctx context.Context, tx *sql.Tx) error {
|
|
return p.repos.BlobChunks.Create(ctx, tx, blobChunk)
|
|
})
|
|
if err != nil {
|
|
log.Error("Failed to store blob-chunk association in database", "error", err,
|
|
"blob_id", p.currentBlob.id, "chunk_hash", chunk.Hash)
|
|
// Continue anyway - we can reconstruct this later if needed
|
|
}
|
|
}
|
|
|
|
// Update total size
|
|
p.currentBlob.size += chunkSize
|
|
|
|
log.Debug("Added chunk to blob container",
|
|
"blob_id", p.currentBlob.id,
|
|
"chunk_hash", chunk.Hash,
|
|
"chunk_size", len(chunk.Data),
|
|
"offset", offset,
|
|
"blob_chunks", len(p.currentBlob.chunks),
|
|
"uncompressed_size", p.currentBlob.size)
|
|
|
|
return nil
|
|
}
|
|
|
|
// finalizeCurrentBlob completes the current blob (must be called with lock held)
|
|
func (p *Packer) finalizeCurrentBlob() error {
|
|
if p.currentBlob == nil {
|
|
return nil
|
|
}
|
|
|
|
// Close blobgen writer to flush all data
|
|
if err := p.currentBlob.writer.Close(); err != nil {
|
|
p.cleanupTempFile()
|
|
return fmt.Errorf("closing blobgen writer: %w", err)
|
|
}
|
|
|
|
// Sync file to ensure all data is written
|
|
if err := p.currentBlob.tempFile.Sync(); err != nil {
|
|
p.cleanupTempFile()
|
|
return fmt.Errorf("syncing temp file: %w", err)
|
|
}
|
|
|
|
// Get the final size (encrypted if applicable)
|
|
finalSize, err := p.currentBlob.tempFile.Seek(0, io.SeekCurrent)
|
|
if err != nil {
|
|
p.cleanupTempFile()
|
|
return fmt.Errorf("getting file size: %w", err)
|
|
}
|
|
|
|
// Reset to beginning for reading
|
|
if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil {
|
|
p.cleanupTempFile()
|
|
return fmt.Errorf("seeking to start: %w", err)
|
|
}
|
|
|
|
// Get hash from blobgen writer (of final encrypted data)
|
|
finalHash := p.currentBlob.writer.Sum256()
|
|
blobHash := hex.EncodeToString(finalHash)
|
|
|
|
// Create chunk references with offsets
|
|
chunkRefs := make([]*BlobChunkRef, 0, len(p.currentBlob.chunks))
|
|
|
|
for _, chunk := range p.currentBlob.chunks {
|
|
chunkRefs = append(chunkRefs, &BlobChunkRef{
|
|
ChunkHash: chunk.Hash,
|
|
Offset: chunk.Offset,
|
|
Length: chunk.Size,
|
|
})
|
|
}
|
|
|
|
// Update blob record in database with hash and sizes
|
|
if p.repos != nil {
|
|
err := p.repos.WithTx(context.Background(), func(ctx context.Context, tx *sql.Tx) error {
|
|
return p.repos.Blobs.UpdateFinished(ctx, tx, p.currentBlob.id, blobHash,
|
|
p.currentBlob.size, finalSize)
|
|
})
|
|
if err != nil {
|
|
p.cleanupTempFile()
|
|
return fmt.Errorf("updating blob record: %w", err)
|
|
}
|
|
}
|
|
|
|
// Create finished blob
|
|
finished := &FinishedBlob{
|
|
ID: p.currentBlob.id,
|
|
Hash: blobHash,
|
|
Data: nil, // We don't load data into memory anymore
|
|
Chunks: chunkRefs,
|
|
CreatedTS: p.currentBlob.startTime,
|
|
Uncompressed: p.currentBlob.size,
|
|
Compressed: finalSize,
|
|
}
|
|
|
|
compressionRatio := float64(finished.Compressed) / float64(finished.Uncompressed)
|
|
log.Info("Finalized blob (compressed and encrypted)",
|
|
"hash", blobHash,
|
|
"chunks", len(chunkRefs),
|
|
"uncompressed", finished.Uncompressed,
|
|
"compressed", finished.Compressed,
|
|
"ratio", fmt.Sprintf("%.2f", compressionRatio),
|
|
"duration", time.Since(p.currentBlob.startTime))
|
|
|
|
// Call blob handler if set
|
|
if p.blobHandler != nil {
|
|
// Reset file position for handler
|
|
if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil {
|
|
p.cleanupTempFile()
|
|
return fmt.Errorf("seeking for handler: %w", err)
|
|
}
|
|
|
|
// Create a blob reader that includes the data stream
|
|
blobWithReader := &BlobWithReader{
|
|
FinishedBlob: finished,
|
|
Reader: p.currentBlob.tempFile,
|
|
TempFile: p.currentBlob.tempFile,
|
|
}
|
|
|
|
if err := p.blobHandler(blobWithReader); err != nil {
|
|
p.cleanupTempFile()
|
|
return fmt.Errorf("blob handler failed: %w", err)
|
|
}
|
|
// Note: blob handler is responsible for closing/cleaning up temp file
|
|
p.currentBlob = nil
|
|
} else {
|
|
log.Debug("No blob handler callback configured", "blob_hash", blobHash[:8]+"...")
|
|
// No handler, need to read data for legacy behavior
|
|
if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil {
|
|
p.cleanupTempFile()
|
|
return fmt.Errorf("seeking to read data: %w", err)
|
|
}
|
|
|
|
data, err := io.ReadAll(p.currentBlob.tempFile)
|
|
if err != nil {
|
|
p.cleanupTempFile()
|
|
return fmt.Errorf("reading blob data: %w", err)
|
|
}
|
|
finished.Data = data
|
|
|
|
p.finishedBlobs = append(p.finishedBlobs, finished)
|
|
|
|
// Cleanup
|
|
p.cleanupTempFile()
|
|
p.currentBlob = nil
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// cleanupTempFile removes the temporary file
|
|
func (p *Packer) cleanupTempFile() {
|
|
if p.currentBlob != nil && p.currentBlob.tempFile != nil {
|
|
name := p.currentBlob.tempFile.Name()
|
|
_ = p.currentBlob.tempFile.Close()
|
|
_ = os.Remove(name)
|
|
}
|
|
}
|
|
|
|
// PackChunks is a convenience method to pack multiple chunks at once
|
|
func (p *Packer) PackChunks(chunks []*ChunkRef) error {
|
|
for _, chunk := range chunks {
|
|
err := p.AddChunk(chunk)
|
|
if err == ErrBlobSizeLimitExceeded {
|
|
// Finalize current blob and retry
|
|
if err := p.FinalizeBlob(); err != nil {
|
|
return fmt.Errorf("finalizing blob before retry: %w", err)
|
|
}
|
|
// Retry the chunk
|
|
if err := p.AddChunk(chunk); err != nil {
|
|
return fmt.Errorf("adding chunk %s after finalize: %w", chunk.Hash, err)
|
|
}
|
|
} else if err != nil {
|
|
return fmt.Errorf("adding chunk %s: %w", chunk.Hash, err)
|
|
}
|
|
}
|
|
|
|
return p.Flush()
|
|
}
|