- Implement exclude patterns with anchored pattern support: - Patterns starting with / only match from root of source dir - Unanchored patterns match anywhere in path - Support for glob patterns (*.log, .*, **/*.pack) - Directory patterns skip entire subtrees - Add gobwas/glob dependency for pattern matching - Add 16 comprehensive tests for exclude functionality - Add snapshot prune command to clean orphaned data: - Removes incomplete snapshots from database - Cleans orphaned files, chunks, and blobs - Runs automatically at backup start for consistency - Add snapshot remove command for deleting snapshots - Add VAULTIK_AGE_SECRET_KEY environment variable support - Fix duplicate fx module provider in restore command - Change snapshot ID format to hostname_YYYY-MM-DDTHH:MM:SSZ
154 lines
5.0 KiB
Go
154 lines
5.0 KiB
Go
package chunker
|
|
|
|
import (
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
)
|
|
|
|
// Chunk represents a single chunk of data produced by the content-defined chunking algorithm.
|
|
// Each chunk is identified by its SHA256 hash and contains the raw data along with
|
|
// its position and size information from the original file.
|
|
type Chunk struct {
|
|
Hash string // Content hash of the chunk
|
|
Data []byte // Chunk data
|
|
Offset int64 // Offset in the original file
|
|
Size int64 // Size of the chunk
|
|
}
|
|
|
|
// Chunker provides content-defined chunking using the FastCDC algorithm.
|
|
// It splits data into variable-sized chunks based on content patterns, ensuring
|
|
// that identical data sequences produce identical chunks regardless of their
|
|
// position in the file. This enables efficient deduplication.
|
|
type Chunker struct {
|
|
avgChunkSize int
|
|
minChunkSize int
|
|
maxChunkSize int
|
|
}
|
|
|
|
// NewChunker creates a new chunker with the specified average chunk size.
|
|
// The actual chunk sizes will vary between avgChunkSize/4 and avgChunkSize*4
|
|
// as recommended by the FastCDC algorithm. Typical values for avgChunkSize
|
|
// are 64KB (65536), 256KB (262144), or 1MB (1048576).
|
|
func NewChunker(avgChunkSize int64) *Chunker {
|
|
// FastCDC recommends min = avg/4 and max = avg*4
|
|
return &Chunker{
|
|
avgChunkSize: int(avgChunkSize),
|
|
minChunkSize: int(avgChunkSize / 4),
|
|
maxChunkSize: int(avgChunkSize * 4),
|
|
}
|
|
}
|
|
|
|
// ChunkReader splits the reader into content-defined chunks and returns all chunks at once.
|
|
// This method loads all chunk data into memory, so it should only be used for
|
|
// reasonably sized inputs. For large files or streams, use ChunkReaderStreaming instead.
|
|
// Returns an error if chunking fails or if reading from the input fails.
|
|
func (c *Chunker) ChunkReader(r io.Reader) ([]Chunk, error) {
|
|
chunker := AcquireReusableChunker(r, c.minChunkSize, c.avgChunkSize, c.maxChunkSize)
|
|
defer chunker.Release()
|
|
|
|
var chunks []Chunk
|
|
offset := int64(0)
|
|
|
|
for {
|
|
chunk, err := chunker.Next()
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("reading chunk: %w", err)
|
|
}
|
|
|
|
// Calculate hash
|
|
hash := sha256.Sum256(chunk.Data)
|
|
|
|
// Make a copy of the data since the chunker reuses the buffer
|
|
chunkData := make([]byte, len(chunk.Data))
|
|
copy(chunkData, chunk.Data)
|
|
|
|
chunks = append(chunks, Chunk{
|
|
Hash: hex.EncodeToString(hash[:]),
|
|
Data: chunkData,
|
|
Offset: offset,
|
|
Size: int64(len(chunk.Data)),
|
|
})
|
|
|
|
offset += int64(len(chunk.Data))
|
|
}
|
|
|
|
return chunks, nil
|
|
}
|
|
|
|
// ChunkCallback is a function called for each chunk as it's processed.
|
|
// The callback receives a Chunk containing the hash, data, offset, and size.
|
|
// If the callback returns an error, chunk processing stops and the error is propagated.
|
|
type ChunkCallback func(chunk Chunk) error
|
|
|
|
// ChunkReaderStreaming splits the reader into chunks and calls the callback for each chunk.
|
|
// This is the preferred method for processing large files or streams as it doesn't
|
|
// accumulate all chunks in memory. The callback is invoked for each chunk as it's
|
|
// produced, allowing for streaming processing and immediate storage or transmission.
|
|
// Returns the SHA256 hash of the entire file content and an error if chunking fails,
|
|
// reading fails, or if the callback returns an error.
|
|
func (c *Chunker) ChunkReaderStreaming(r io.Reader, callback ChunkCallback) (string, error) {
|
|
// Create a tee reader to calculate full file hash while chunking
|
|
fileHasher := sha256.New()
|
|
teeReader := io.TeeReader(r, fileHasher)
|
|
|
|
chunker := AcquireReusableChunker(teeReader, c.minChunkSize, c.avgChunkSize, c.maxChunkSize)
|
|
defer chunker.Release()
|
|
|
|
offset := int64(0)
|
|
|
|
for {
|
|
chunk, err := chunker.Next()
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return "", fmt.Errorf("reading chunk: %w", err)
|
|
}
|
|
|
|
// Calculate chunk hash
|
|
hash := sha256.Sum256(chunk.Data)
|
|
|
|
// Pass the data directly - caller must process it before we call Next() again
|
|
// (chunker reuses its internal buffer, but since we process synchronously
|
|
// and completely before continuing, no copy is needed)
|
|
if err := callback(Chunk{
|
|
Hash: hex.EncodeToString(hash[:]),
|
|
Data: chunk.Data,
|
|
Offset: offset,
|
|
Size: int64(len(chunk.Data)),
|
|
}); err != nil {
|
|
return "", fmt.Errorf("callback error: %w", err)
|
|
}
|
|
|
|
offset += int64(len(chunk.Data))
|
|
}
|
|
|
|
// Return the full file hash
|
|
return hex.EncodeToString(fileHasher.Sum(nil)), nil
|
|
}
|
|
|
|
// ChunkFile splits a file into content-defined chunks by reading the entire file.
|
|
// This is a convenience method that opens the file and passes it to ChunkReader.
|
|
// For large files, consider using ChunkReaderStreaming with a file handle instead.
|
|
// Returns an error if the file cannot be opened or if chunking fails.
|
|
func (c *Chunker) ChunkFile(path string) ([]Chunk, error) {
|
|
file, err := os.Open(path)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("opening file: %w", err)
|
|
}
|
|
defer func() {
|
|
if err := file.Close(); err != nil && err.Error() != "invalid argument" {
|
|
// Log error or handle as needed
|
|
_ = err
|
|
}
|
|
}()
|
|
|
|
return c.ChunkReader(file)
|
|
}
|