Fix file content change handling and improve log messages
- Delete old file_chunks and chunk_files when file content changes - Add DeleteByFileID method to ChunkFileRepository - Add tests to verify old chunks are properly disassociated - Make log messages more precise throughout scanner and snapshot - Support metadata-only snapshots when no files have changed - Add periodic status output during scan and snapshot operations - Improve scan summary output with clearer information
This commit is contained in:
parent
d3afa65420
commit
bb2292de7f
236
internal/backup/file_change_test.go
Normal file
236
internal/backup/file_change_test.go
Normal file
@ -0,0 +1,236 @@
|
|||||||
|
package backup_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.eeqj.de/sneak/vaultik/internal/backup"
|
||||||
|
"git.eeqj.de/sneak/vaultik/internal/database"
|
||||||
|
"git.eeqj.de/sneak/vaultik/internal/log"
|
||||||
|
"github.com/spf13/afero"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestFileContentChange verifies that when a file's content changes,
|
||||||
|
// the old chunks are properly disassociated
|
||||||
|
func TestFileContentChange(t *testing.T) {
|
||||||
|
// Initialize logger for tests
|
||||||
|
log.Initialize(log.Config{})
|
||||||
|
|
||||||
|
// Create in-memory filesystem
|
||||||
|
fs := afero.NewMemMapFs()
|
||||||
|
|
||||||
|
// Create initial file
|
||||||
|
err := afero.WriteFile(fs, "/test.txt", []byte("Initial content"), 0644)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Create test database
|
||||||
|
db, err := database.NewTestDB()
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() {
|
||||||
|
if err := db.Close(); err != nil {
|
||||||
|
t.Errorf("failed to close database: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
repos := database.NewRepositories(db)
|
||||||
|
|
||||||
|
// Create scanner
|
||||||
|
scanner := backup.NewScanner(backup.ScannerConfig{
|
||||||
|
FS: fs,
|
||||||
|
ChunkSize: int64(1024 * 16), // 16KB chunks for testing
|
||||||
|
Repositories: repos,
|
||||||
|
MaxBlobSize: int64(1024 * 1024), // 1MB blobs
|
||||||
|
CompressionLevel: 3,
|
||||||
|
AgeRecipients: []string{"age1ezrjmfpwsc95svdg0y54mums3zevgzu0x0ecq2f7tp8a05gl0sjq9q9wjg"}, // Test public key
|
||||||
|
})
|
||||||
|
|
||||||
|
// Create first snapshot
|
||||||
|
ctx := context.Background()
|
||||||
|
snapshotID1 := "snapshot1"
|
||||||
|
err = repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
||||||
|
snapshot := &database.Snapshot{
|
||||||
|
ID: snapshotID1,
|
||||||
|
Hostname: "test-host",
|
||||||
|
VaultikVersion: "test",
|
||||||
|
StartedAt: time.Now(),
|
||||||
|
}
|
||||||
|
return repos.Snapshots.Create(ctx, tx, snapshot)
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// First scan - should create chunks for initial content
|
||||||
|
result1, err := scanner.Scan(ctx, "/", snapshotID1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Logf("First scan: %d files scanned", result1.FilesScanned)
|
||||||
|
|
||||||
|
// Get file chunks from first scan
|
||||||
|
fileChunks1, err := repos.FileChunks.GetByPath(ctx, "/test.txt")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Len(t, fileChunks1, 1) // Small file = 1 chunk
|
||||||
|
oldChunkHash := fileChunks1[0].ChunkHash
|
||||||
|
|
||||||
|
// Get chunk files from first scan
|
||||||
|
chunkFiles1, err := repos.ChunkFiles.GetByFilePath(ctx, "/test.txt")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Len(t, chunkFiles1, 1)
|
||||||
|
|
||||||
|
// Modify the file
|
||||||
|
time.Sleep(10 * time.Millisecond) // Ensure mtime changes
|
||||||
|
err = afero.WriteFile(fs, "/test.txt", []byte("Modified content with different data"), 0644)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Create second snapshot
|
||||||
|
snapshotID2 := "snapshot2"
|
||||||
|
err = repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
||||||
|
snapshot := &database.Snapshot{
|
||||||
|
ID: snapshotID2,
|
||||||
|
Hostname: "test-host",
|
||||||
|
VaultikVersion: "test",
|
||||||
|
StartedAt: time.Now(),
|
||||||
|
}
|
||||||
|
return repos.Snapshots.Create(ctx, tx, snapshot)
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Second scan - should create new chunks and remove old associations
|
||||||
|
result2, err := scanner.Scan(ctx, "/", snapshotID2)
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Logf("Second scan: %d files scanned", result2.FilesScanned)
|
||||||
|
|
||||||
|
// Get file chunks from second scan
|
||||||
|
fileChunks2, err := repos.FileChunks.GetByPath(ctx, "/test.txt")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Len(t, fileChunks2, 1) // Still 1 chunk but different hash
|
||||||
|
newChunkHash := fileChunks2[0].ChunkHash
|
||||||
|
|
||||||
|
// Verify the chunk hashes are different
|
||||||
|
assert.NotEqual(t, oldChunkHash, newChunkHash, "Chunk hash should change when content changes")
|
||||||
|
|
||||||
|
// Get chunk files from second scan
|
||||||
|
chunkFiles2, err := repos.ChunkFiles.GetByFilePath(ctx, "/test.txt")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Len(t, chunkFiles2, 1)
|
||||||
|
assert.Equal(t, newChunkHash, chunkFiles2[0].ChunkHash)
|
||||||
|
|
||||||
|
// Verify old chunk still exists (it's still valid data)
|
||||||
|
oldChunk, err := repos.Chunks.GetByHash(ctx, oldChunkHash)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.NotNil(t, oldChunk)
|
||||||
|
|
||||||
|
// Verify new chunk exists
|
||||||
|
newChunk, err := repos.Chunks.GetByHash(ctx, newChunkHash)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.NotNil(t, newChunk)
|
||||||
|
|
||||||
|
// Verify that chunk_files for old chunk no longer references this file
|
||||||
|
oldChunkFiles, err := repos.ChunkFiles.GetByChunkHash(ctx, oldChunkHash)
|
||||||
|
require.NoError(t, err)
|
||||||
|
for _, cf := range oldChunkFiles {
|
||||||
|
file, err := repos.Files.GetByID(ctx, cf.FileID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.NotEqual(t, "/data/test.txt", file.Path, "Old chunk should not be associated with the modified file")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestMultipleFileChanges verifies handling of multiple file changes in one scan
|
||||||
|
func TestMultipleFileChanges(t *testing.T) {
|
||||||
|
// Initialize logger for tests
|
||||||
|
log.Initialize(log.Config{})
|
||||||
|
|
||||||
|
// Create in-memory filesystem
|
||||||
|
fs := afero.NewMemMapFs()
|
||||||
|
|
||||||
|
// Create initial files
|
||||||
|
files := map[string]string{
|
||||||
|
"/file1.txt": "Content 1",
|
||||||
|
"/file2.txt": "Content 2",
|
||||||
|
"/file3.txt": "Content 3",
|
||||||
|
}
|
||||||
|
|
||||||
|
for path, content := range files {
|
||||||
|
err := afero.WriteFile(fs, path, []byte(content), 0644)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create test database
|
||||||
|
db, err := database.NewTestDB()
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() {
|
||||||
|
if err := db.Close(); err != nil {
|
||||||
|
t.Errorf("failed to close database: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
repos := database.NewRepositories(db)
|
||||||
|
|
||||||
|
// Create scanner
|
||||||
|
scanner := backup.NewScanner(backup.ScannerConfig{
|
||||||
|
FS: fs,
|
||||||
|
ChunkSize: int64(1024 * 16), // 16KB chunks for testing
|
||||||
|
Repositories: repos,
|
||||||
|
MaxBlobSize: int64(1024 * 1024), // 1MB blobs
|
||||||
|
CompressionLevel: 3,
|
||||||
|
AgeRecipients: []string{"age1ezrjmfpwsc95svdg0y54mums3zevgzu0x0ecq2f7tp8a05gl0sjq9q9wjg"}, // Test public key
|
||||||
|
})
|
||||||
|
|
||||||
|
// Create first snapshot
|
||||||
|
ctx := context.Background()
|
||||||
|
snapshotID1 := "snapshot1"
|
||||||
|
err = repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
||||||
|
snapshot := &database.Snapshot{
|
||||||
|
ID: snapshotID1,
|
||||||
|
Hostname: "test-host",
|
||||||
|
VaultikVersion: "test",
|
||||||
|
StartedAt: time.Now(),
|
||||||
|
}
|
||||||
|
return repos.Snapshots.Create(ctx, tx, snapshot)
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// First scan
|
||||||
|
result1, err := scanner.Scan(ctx, "/", snapshotID1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
// 4 files because root directory is also counted
|
||||||
|
assert.Equal(t, 4, result1.FilesScanned)
|
||||||
|
|
||||||
|
// Modify two files
|
||||||
|
time.Sleep(10 * time.Millisecond) // Ensure mtime changes
|
||||||
|
err = afero.WriteFile(fs, "/file1.txt", []byte("Modified content 1"), 0644)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = afero.WriteFile(fs, "/file3.txt", []byte("Modified content 3"), 0644)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Create second snapshot
|
||||||
|
snapshotID2 := "snapshot2"
|
||||||
|
err = repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
||||||
|
snapshot := &database.Snapshot{
|
||||||
|
ID: snapshotID2,
|
||||||
|
Hostname: "test-host",
|
||||||
|
VaultikVersion: "test",
|
||||||
|
StartedAt: time.Now(),
|
||||||
|
}
|
||||||
|
return repos.Snapshots.Create(ctx, tx, snapshot)
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Second scan
|
||||||
|
result2, err := scanner.Scan(ctx, "/", snapshotID2)
|
||||||
|
require.NoError(t, err)
|
||||||
|
// 4 files because root directory is also counted
|
||||||
|
assert.Equal(t, 4, result2.FilesScanned)
|
||||||
|
|
||||||
|
// Verify each file has exactly one set of chunks
|
||||||
|
for path := range files {
|
||||||
|
fileChunks, err := repos.FileChunks.GetByPath(ctx, path)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Len(t, fileChunks, 1, "File %s should have exactly 1 chunk association", path)
|
||||||
|
|
||||||
|
chunkFiles, err := repos.ChunkFiles.GetByFilePath(ctx, path)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Len(t, chunkFiles, 1, "File %s should have exactly 1 chunk-file association", path)
|
||||||
|
}
|
||||||
|
}
|
@ -163,12 +163,28 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc
|
|||||||
"files_skipped", result.FilesSkipped,
|
"files_skipped", result.FilesSkipped,
|
||||||
"bytes_skipped", humanize.Bytes(uint64(result.BytesSkipped)))
|
"bytes_skipped", humanize.Bytes(uint64(result.BytesSkipped)))
|
||||||
|
|
||||||
|
// Print detailed scan summary
|
||||||
|
fmt.Printf("\n=== Scan Summary ===\n")
|
||||||
|
fmt.Printf("Total files examined: %d\n", result.FilesScanned)
|
||||||
|
fmt.Printf("Files with content changes: %d\n", len(filesToProcess))
|
||||||
|
fmt.Printf("Files with unchanged content: %d\n", result.FilesSkipped)
|
||||||
|
fmt.Printf("Total size of changed files: %s\n", humanize.Bytes(uint64(totalSizeToProcess)))
|
||||||
|
fmt.Printf("Total size of unchanged files: %s\n", humanize.Bytes(uint64(result.BytesSkipped)))
|
||||||
|
if len(filesToProcess) > 0 {
|
||||||
|
fmt.Printf("\nStarting snapshot of %d changed files...\n\n", len(filesToProcess))
|
||||||
|
} else {
|
||||||
|
fmt.Printf("\nNo file contents have changed.\n")
|
||||||
|
fmt.Printf("Creating metadata-only snapshot to capture current state...\n\n")
|
||||||
|
}
|
||||||
|
|
||||||
// Phase 2: Process files and create chunks
|
// Phase 2: Process files and create chunks
|
||||||
if len(filesToProcess) > 0 {
|
if len(filesToProcess) > 0 {
|
||||||
log.Info("Phase 2/3: Creating snapshot (chunking, compressing, encrypting, and uploading blobs)")
|
log.Info("Phase 2/3: Creating snapshot (chunking, compressing, encrypting, and uploading blobs)")
|
||||||
if err := s.processPhase(ctx, filesToProcess, result); err != nil {
|
if err := s.processPhase(ctx, filesToProcess, result); err != nil {
|
||||||
return nil, fmt.Errorf("process phase failed: %w", err)
|
return nil, fmt.Errorf("process phase failed: %w", err)
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
log.Info("Phase 2/3: Skipping (no file contents changed, metadata-only snapshot)")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get final stats from packer
|
// Get final stats from packer
|
||||||
@ -181,7 +197,7 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc
|
|||||||
if s.snapshotID != "" {
|
if s.snapshotID != "" {
|
||||||
uploadCount, err := s.repos.Uploads.GetCountBySnapshot(ctx, s.snapshotID)
|
uploadCount, err := s.repos.Uploads.GetCountBySnapshot(ctx, s.snapshotID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("Failed to get upload count from database", "error", err)
|
log.Warn("Failed to query upload count from database", "error", err)
|
||||||
} else {
|
} else {
|
||||||
result.BlobsCreated = int(uploadCount)
|
result.BlobsCreated = int(uploadCount)
|
||||||
}
|
}
|
||||||
@ -196,11 +212,17 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult
|
|||||||
var filesToProcess []*FileToProcess
|
var filesToProcess []*FileToProcess
|
||||||
var mu sync.Mutex
|
var mu sync.Mutex
|
||||||
|
|
||||||
|
// Set up periodic status output
|
||||||
|
lastStatusTime := time.Now()
|
||||||
|
statusInterval := 15 * time.Second
|
||||||
|
var filesScanned int64
|
||||||
|
var bytesScanned int64
|
||||||
|
|
||||||
log.Debug("Starting directory walk", "path", path)
|
log.Debug("Starting directory walk", "path", path)
|
||||||
err := afero.Walk(s.fs, path, func(path string, info os.FileInfo, err error) error {
|
err := afero.Walk(s.fs, path, func(path string, info os.FileInfo, err error) error {
|
||||||
log.Debug("Walking file", "path", path)
|
log.Debug("Scanning filesystem entry", "path", path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debug("Error walking path", "path", path, "error", err)
|
log.Debug("Error accessing filesystem entry", "path", path, "error", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -232,6 +254,25 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult
|
|||||||
mu.Unlock()
|
mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update scan statistics
|
||||||
|
if info.Mode().IsRegular() {
|
||||||
|
filesScanned++
|
||||||
|
bytesScanned += info.Size()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Output periodic status
|
||||||
|
if time.Since(lastStatusTime) >= statusInterval {
|
||||||
|
mu.Lock()
|
||||||
|
changedCount := len(filesToProcess)
|
||||||
|
mu.Unlock()
|
||||||
|
|
||||||
|
fmt.Printf("Scan progress: %d files examined, %s total size, %d files changed\n",
|
||||||
|
filesScanned,
|
||||||
|
humanize.Bytes(uint64(bytesScanned)),
|
||||||
|
changedCount)
|
||||||
|
lastStatusTime = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -244,6 +285,13 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult
|
|||||||
|
|
||||||
// processPhase processes the files that need backing up
|
// processPhase processes the files that need backing up
|
||||||
func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProcess, result *ScanResult) error {
|
func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProcess, result *ScanResult) error {
|
||||||
|
// Set up periodic status output
|
||||||
|
lastStatusTime := time.Now()
|
||||||
|
statusInterval := 15 * time.Second
|
||||||
|
startTime := time.Now()
|
||||||
|
filesProcessed := 0
|
||||||
|
totalFiles := len(filesToProcess)
|
||||||
|
|
||||||
// Process each file
|
// Process each file
|
||||||
for _, fileToProcess := range filesToProcess {
|
for _, fileToProcess := range filesToProcess {
|
||||||
// Update progress
|
// Update progress
|
||||||
@ -260,6 +308,26 @@ func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProc
|
|||||||
if s.progress != nil {
|
if s.progress != nil {
|
||||||
s.progress.GetStats().FilesProcessed.Add(1)
|
s.progress.GetStats().FilesProcessed.Add(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
filesProcessed++
|
||||||
|
|
||||||
|
// Output periodic status
|
||||||
|
if time.Since(lastStatusTime) >= statusInterval {
|
||||||
|
elapsed := time.Since(startTime)
|
||||||
|
remaining := totalFiles - filesProcessed
|
||||||
|
var eta time.Duration
|
||||||
|
if filesProcessed > 0 {
|
||||||
|
eta = elapsed / time.Duration(filesProcessed) * time.Duration(remaining)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("Snapshot progress: %d/%d files processed, %d chunks created, %d blobs uploaded",
|
||||||
|
filesProcessed, totalFiles, result.ChunksCreated, result.BlobsCreated)
|
||||||
|
if remaining > 0 && eta > 0 {
|
||||||
|
fmt.Printf(", ETA: %s", eta.Round(time.Second))
|
||||||
|
}
|
||||||
|
fmt.Println()
|
||||||
|
lastStatusTime = time.Now()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Final flush (outside any transaction)
|
// Final flush (outside any transaction)
|
||||||
@ -338,7 +406,7 @@ func (s *Scanner) checkFile(ctx context.Context, path string, info os.FileInfo,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Check if file has changed since last backup (no transaction needed for read)
|
// Check if file has changed since last backup (no transaction needed for read)
|
||||||
log.Debug("Checking if file exists in database", "path", path)
|
log.Debug("Querying database for existing file record", "path", path)
|
||||||
existingFile, err := s.repos.Files.GetByPath(ctx, path)
|
existingFile, err := s.repos.Files.GetByPath(ctx, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, fmt.Errorf("checking existing file: %w", err)
|
return nil, false, fmt.Errorf("checking existing file: %w", err)
|
||||||
@ -347,7 +415,7 @@ func (s *Scanner) checkFile(ctx context.Context, path string, info os.FileInfo,
|
|||||||
fileChanged := existingFile == nil || s.hasFileChanged(existingFile, file)
|
fileChanged := existingFile == nil || s.hasFileChanged(existingFile, file)
|
||||||
|
|
||||||
// Update file metadata and add to snapshot in a single transaction
|
// Update file metadata and add to snapshot in a single transaction
|
||||||
log.Debug("Updating file metadata and adding to snapshot", "path", path, "changed", fileChanged, "snapshot", s.snapshotID)
|
log.Debug("Updating file record in database and adding to snapshot", "path", path, "changed", fileChanged, "snapshot", s.snapshotID)
|
||||||
err = s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
err = s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
||||||
// First create/update the file
|
// First create/update the file
|
||||||
if err := s.repos.Files.Create(ctx, tx, file); err != nil {
|
if err := s.repos.Files.Create(ctx, tx, file); err != nil {
|
||||||
@ -362,7 +430,7 @@ func (s *Scanner) checkFile(ctx context.Context, path string, info os.FileInfo,
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
log.Debug("File added to snapshot", "path", path)
|
log.Debug("File record added to snapshot association", "path", path)
|
||||||
|
|
||||||
result.FilesScanned++
|
result.FilesScanned++
|
||||||
|
|
||||||
@ -383,11 +451,11 @@ func (s *Scanner) checkFile(ctx context.Context, path string, info os.FileInfo,
|
|||||||
stats.BytesSkipped.Add(info.Size())
|
stats.BytesSkipped.Add(info.Size())
|
||||||
}
|
}
|
||||||
// File hasn't changed, but we still need to associate existing chunks with this snapshot
|
// File hasn't changed, but we still need to associate existing chunks with this snapshot
|
||||||
log.Debug("File hasn't changed, associating existing chunks", "path", path)
|
log.Debug("File content unchanged, reusing existing chunks and blobs", "path", path)
|
||||||
if err := s.associateExistingChunks(ctx, path); err != nil {
|
if err := s.associateExistingChunks(ctx, path); err != nil {
|
||||||
return nil, false, fmt.Errorf("associating existing chunks: %w", err)
|
return nil, false, fmt.Errorf("associating existing chunks: %w", err)
|
||||||
}
|
}
|
||||||
log.Debug("Existing chunks associated", "path", path)
|
log.Debug("Existing chunks and blobs associated with snapshot", "path", path)
|
||||||
} else {
|
} else {
|
||||||
// File changed or is not a regular file
|
// File changed or is not a regular file
|
||||||
result.BytesScanned += info.Size()
|
result.BytesScanned += info.Size()
|
||||||
@ -428,29 +496,29 @@ func (s *Scanner) associateExistingChunks(ctx context.Context, path string) erro
|
|||||||
log.Debug("associateExistingChunks start", "path", path)
|
log.Debug("associateExistingChunks start", "path", path)
|
||||||
|
|
||||||
// Get existing file chunks (no transaction needed for read)
|
// Get existing file chunks (no transaction needed for read)
|
||||||
log.Debug("Getting existing file chunks", "path", path)
|
log.Debug("Querying database for file's chunk associations", "path", path)
|
||||||
fileChunks, err := s.repos.FileChunks.GetByFile(ctx, path)
|
fileChunks, err := s.repos.FileChunks.GetByFile(ctx, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("getting existing file chunks: %w", err)
|
return fmt.Errorf("getting existing file chunks: %w", err)
|
||||||
}
|
}
|
||||||
log.Debug("Got file chunks", "path", path, "count", len(fileChunks))
|
log.Debug("Retrieved file chunk associations from database", "path", path, "count", len(fileChunks))
|
||||||
|
|
||||||
// Collect unique blob IDs that need to be added to snapshot
|
// Collect unique blob IDs that need to be added to snapshot
|
||||||
blobsToAdd := make(map[string]string) // blob ID -> blob hash
|
blobsToAdd := make(map[string]string) // blob ID -> blob hash
|
||||||
for i, fc := range fileChunks {
|
for i, fc := range fileChunks {
|
||||||
log.Debug("Processing chunk", "path", path, "chunk_index", i, "chunk_hash", fc.ChunkHash)
|
log.Debug("Looking up blob containing chunk", "path", path, "chunk_index", i, "chunk_hash", fc.ChunkHash)
|
||||||
|
|
||||||
// Find which blob contains this chunk (no transaction needed for read)
|
// Find which blob contains this chunk (no transaction needed for read)
|
||||||
log.Debug("Finding blob for chunk", "chunk_hash", fc.ChunkHash)
|
log.Debug("Querying database for blob containing chunk", "chunk_hash", fc.ChunkHash)
|
||||||
blobChunk, err := s.repos.BlobChunks.GetByChunkHash(ctx, fc.ChunkHash)
|
blobChunk, err := s.repos.BlobChunks.GetByChunkHash(ctx, fc.ChunkHash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("finding blob for chunk %s: %w", fc.ChunkHash, err)
|
return fmt.Errorf("finding blob for chunk %s: %w", fc.ChunkHash, err)
|
||||||
}
|
}
|
||||||
if blobChunk == nil {
|
if blobChunk == nil {
|
||||||
log.Warn("Chunk exists but not in any blob", "chunk", fc.ChunkHash, "file", path)
|
log.Warn("Chunk record exists in database but not associated with any blob", "chunk", fc.ChunkHash, "file", path)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
log.Debug("Found blob for chunk", "chunk_hash", fc.ChunkHash, "blob_id", blobChunk.BlobID)
|
log.Debug("Found blob record containing chunk", "chunk_hash", fc.ChunkHash, "blob_id", blobChunk.BlobID)
|
||||||
|
|
||||||
// Track blob ID for later processing
|
// Track blob ID for later processing
|
||||||
if _, exists := blobsToAdd[blobChunk.BlobID]; !exists {
|
if _, exists := blobsToAdd[blobChunk.BlobID]; !exists {
|
||||||
@ -465,7 +533,7 @@ func (s *Scanner) associateExistingChunks(ctx context.Context, path string) erro
|
|||||||
return fmt.Errorf("getting blob %s: %w", blobID, err)
|
return fmt.Errorf("getting blob %s: %w", blobID, err)
|
||||||
}
|
}
|
||||||
if blob == nil {
|
if blob == nil {
|
||||||
log.Warn("Blob record not found", "blob_id", blobID)
|
log.Warn("Blob record missing from database", "blob_id", blobID)
|
||||||
delete(blobsToAdd, blobID)
|
delete(blobsToAdd, blobID)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -474,14 +542,14 @@ func (s *Scanner) associateExistingChunks(ctx context.Context, path string) erro
|
|||||||
|
|
||||||
// Add blobs to snapshot using short transactions
|
// Add blobs to snapshot using short transactions
|
||||||
for blobID, blobHash := range blobsToAdd {
|
for blobID, blobHash := range blobsToAdd {
|
||||||
log.Debug("Adding blob to snapshot", "blob_id", blobID, "blob_hash", blobHash, "snapshot", s.snapshotID)
|
log.Debug("Adding blob reference to snapshot association", "blob_id", blobID, "blob_hash", blobHash, "snapshot", s.snapshotID)
|
||||||
err := s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
err := s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
||||||
return s.repos.Snapshots.AddBlob(ctx, tx, s.snapshotID, blobID, blobHash)
|
return s.repos.Snapshots.AddBlob(ctx, tx, s.snapshotID, blobID, blobHash)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("adding existing blob to snapshot: %w", err)
|
return fmt.Errorf("adding existing blob to snapshot: %w", err)
|
||||||
}
|
}
|
||||||
log.Debug("Added blob to snapshot", "blob_id", blobID)
|
log.Debug("Created snapshot-blob association in database", "blob_id", blobID)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug("associateExistingChunks complete", "path", path, "blobs_processed", len(blobsToAdd))
|
log.Debug("associateExistingChunks complete", "path", path, "blobs_processed", len(blobsToAdd))
|
||||||
@ -490,7 +558,7 @@ func (s *Scanner) associateExistingChunks(ctx context.Context, path string) erro
|
|||||||
|
|
||||||
// handleBlobReady is called by the packer when a blob is finalized
|
// handleBlobReady is called by the packer when a blob is finalized
|
||||||
func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
||||||
log.Debug("Blob handler called", "blob_hash", blobWithReader.Hash[:8]+"...")
|
log.Debug("Invoking blob upload handler", "blob_hash", blobWithReader.Hash[:8]+"...")
|
||||||
|
|
||||||
startTime := time.Now().UTC()
|
startTime := time.Now().UTC()
|
||||||
finishedBlob := blobWithReader.FinishedBlob
|
finishedBlob := blobWithReader.FinishedBlob
|
||||||
@ -547,7 +615,7 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
|||||||
|
|
||||||
// Log upload stats
|
// Log upload stats
|
||||||
uploadSpeed := float64(finishedBlob.Compressed) * 8 / uploadDuration.Seconds() // bits per second
|
uploadSpeed := float64(finishedBlob.Compressed) * 8 / uploadDuration.Seconds() // bits per second
|
||||||
log.Info("Uploaded blob to S3",
|
log.Info("Successfully uploaded blob to S3 storage",
|
||||||
"path", blobPath,
|
"path", blobPath,
|
||||||
"size", humanize.Bytes(uint64(finishedBlob.Compressed)),
|
"size", humanize.Bytes(uint64(finishedBlob.Compressed)),
|
||||||
"duration", uploadDuration,
|
"duration", uploadDuration,
|
||||||
@ -639,9 +707,9 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT
|
|||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug("Processing chunk",
|
log.Debug("Processing content-defined chunk from file",
|
||||||
"file", fileToProcess.Path,
|
"file", fileToProcess.Path,
|
||||||
"chunk", chunkIndex,
|
"chunk_index", chunkIndex,
|
||||||
"hash", chunk.Hash,
|
"hash", chunk.Hash,
|
||||||
"size", chunk.Size)
|
"size", chunk.Size)
|
||||||
|
|
||||||
@ -737,13 +805,22 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT
|
|||||||
return fmt.Errorf("chunking file: %w", err)
|
return fmt.Errorf("chunking file: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug("Completed chunking file",
|
log.Debug("Completed snapshotting file",
|
||||||
"path", fileToProcess.Path,
|
"path", fileToProcess.Path,
|
||||||
"file_hash", fileHash,
|
"file_hash", fileHash,
|
||||||
"chunks", len(chunks))
|
"chunks", len(chunks))
|
||||||
|
|
||||||
// Store file-chunk associations and chunk-file mappings in database
|
// Store file-chunk associations and chunk-file mappings in database
|
||||||
err = s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
err = s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
||||||
|
// First, delete all existing file_chunks and chunk_files for this file
|
||||||
|
// This ensures old chunks are no longer associated when file content changes
|
||||||
|
if err := s.repos.FileChunks.DeleteByFileID(txCtx, tx, fileToProcess.File.ID); err != nil {
|
||||||
|
return fmt.Errorf("deleting old file chunks: %w", err)
|
||||||
|
}
|
||||||
|
if err := s.repos.ChunkFiles.DeleteByFileID(txCtx, tx, fileToProcess.File.ID); err != nil {
|
||||||
|
return fmt.Errorf("deleting old chunk files: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
for _, ci := range chunks {
|
for _, ci := range chunks {
|
||||||
// Create file-chunk mapping
|
// Create file-chunk mapping
|
||||||
if err := s.repos.FileChunks.Create(txCtx, tx, &ci.fileChunk); err != nil {
|
if err := s.repos.FileChunks.Create(txCtx, tx, &ci.fileChunk); err != nil {
|
||||||
|
@ -215,12 +215,12 @@ func (sm *SnapshotManager) ExportSnapshotMetadata(ctx context.Context, dbPath st
|
|||||||
log.Debug("Database copy complete", "size", getFileSize(tempDBPath))
|
log.Debug("Database copy complete", "size", getFileSize(tempDBPath))
|
||||||
|
|
||||||
// Step 2: Clean the temp database to only contain current snapshot data
|
// Step 2: Clean the temp database to only contain current snapshot data
|
||||||
log.Debug("Cleaning temporary snapshot database to contain only current snapshot", "snapshot_id", snapshotID, "db_path", tempDBPath)
|
log.Debug("Cleaning temporary database to contain only current snapshot data", "snapshot_id", snapshotID, "db_path", tempDBPath)
|
||||||
stats, err := sm.cleanSnapshotDB(ctx, tempDBPath, snapshotID)
|
stats, err := sm.cleanSnapshotDB(ctx, tempDBPath, snapshotID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cleaning snapshot database: %w", err)
|
return fmt.Errorf("cleaning snapshot database: %w", err)
|
||||||
}
|
}
|
||||||
log.Info("Snapshot database cleanup complete",
|
log.Info("Temporary database cleanup complete",
|
||||||
"db_path", tempDBPath,
|
"db_path", tempDBPath,
|
||||||
"size_after_clean", humanize.Bytes(uint64(getFileSize(tempDBPath))),
|
"size_after_clean", humanize.Bytes(uint64(getFileSize(tempDBPath))),
|
||||||
"files", stats.FileCount,
|
"files", stats.FileCount,
|
||||||
@ -642,20 +642,20 @@ func (sm *SnapshotManager) CleanupIncompleteSnapshots(ctx context.Context, hostn
|
|||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Metadata doesn't exist in S3 - this is an incomplete snapshot
|
// Metadata doesn't exist in S3 - this is an incomplete snapshot
|
||||||
log.Info("Cleaning up incomplete snapshot", "snapshot_id", snapshot.ID, "started_at", snapshot.StartedAt)
|
log.Info("Cleaning up incomplete snapshot record", "snapshot_id", snapshot.ID, "started_at", snapshot.StartedAt)
|
||||||
|
|
||||||
// Delete the snapshot and all its associations
|
// Delete the snapshot and all its associations
|
||||||
if err := sm.deleteSnapshot(ctx, snapshot.ID); err != nil {
|
if err := sm.deleteSnapshot(ctx, snapshot.ID); err != nil {
|
||||||
return fmt.Errorf("deleting incomplete snapshot %s: %w", snapshot.ID, err)
|
return fmt.Errorf("deleting incomplete snapshot %s: %w", snapshot.ID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("Deleted incomplete snapshot", "snapshot_id", snapshot.ID)
|
log.Info("Deleted incomplete snapshot record and associated data", "snapshot_id", snapshot.ID)
|
||||||
} else {
|
} else {
|
||||||
// Metadata exists - this snapshot was completed but database wasn't updated
|
// Metadata exists - this snapshot was completed but database wasn't updated
|
||||||
// This shouldn't happen in normal operation, but mark it complete
|
// This shouldn't happen in normal operation, but mark it complete
|
||||||
log.Warn("Found snapshot with metadata but incomplete in DB", "snapshot_id", snapshot.ID)
|
log.Warn("Found snapshot with S3 metadata but incomplete in database", "snapshot_id", snapshot.ID)
|
||||||
if err := sm.repos.Snapshots.MarkComplete(ctx, nil, snapshot.ID); err != nil {
|
if err := sm.repos.Snapshots.MarkComplete(ctx, nil, snapshot.ID); err != nil {
|
||||||
log.Error("Failed to mark snapshot complete", "snapshot_id", snapshot.ID, "error", err)
|
log.Error("Failed to mark snapshot as complete in database", "snapshot_id", snapshot.ID, "error", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -681,7 +681,7 @@ func (sm *SnapshotManager) deleteSnapshot(ctx context.Context, snapshotID string
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Clean up orphaned data
|
// Clean up orphaned data
|
||||||
log.Debug("Cleaning up orphaned data in main database")
|
log.Debug("Cleaning up orphaned records in main database")
|
||||||
if err := sm.cleanupOrphanedData(ctx); err != nil {
|
if err := sm.cleanupOrphanedData(ctx); err != nil {
|
||||||
return fmt.Errorf("cleaning up orphaned data: %w", err)
|
return fmt.Errorf("cleaning up orphaned data: %w", err)
|
||||||
}
|
}
|
||||||
@ -698,28 +698,28 @@ func (sm *SnapshotManager) cleanupOrphanedData(ctx context.Context) error {
|
|||||||
// 4. Delete orphaned chunks (now safe after all blob_chunks are gone)
|
// 4. Delete orphaned chunks (now safe after all blob_chunks are gone)
|
||||||
|
|
||||||
// Delete orphaned files (files not in any snapshot)
|
// Delete orphaned files (files not in any snapshot)
|
||||||
log.Debug("Deleting orphaned files")
|
log.Debug("Deleting orphaned file records from database")
|
||||||
if err := sm.repos.Files.DeleteOrphaned(ctx); err != nil {
|
if err := sm.repos.Files.DeleteOrphaned(ctx); err != nil {
|
||||||
return fmt.Errorf("deleting orphaned files: %w", err)
|
return fmt.Errorf("deleting orphaned files: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete orphaned blobs (blobs not in any snapshot)
|
// Delete orphaned blobs (blobs not in any snapshot)
|
||||||
// This will cascade delete blob_chunks for deleted blobs
|
// This will cascade delete blob_chunks for deleted blobs
|
||||||
log.Debug("Deleting orphaned blobs")
|
log.Debug("Deleting orphaned blob records from database")
|
||||||
if err := sm.repos.Blobs.DeleteOrphaned(ctx); err != nil {
|
if err := sm.repos.Blobs.DeleteOrphaned(ctx); err != nil {
|
||||||
return fmt.Errorf("deleting orphaned blobs: %w", err)
|
return fmt.Errorf("deleting orphaned blobs: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete orphaned blob_chunks entries
|
// Delete orphaned blob_chunks entries
|
||||||
// This handles cases where the blob still exists but chunks were deleted
|
// This handles cases where the blob still exists but chunks were deleted
|
||||||
log.Debug("Deleting orphaned blob_chunks")
|
log.Debug("Deleting orphaned blob_chunks associations from database")
|
||||||
if err := sm.repos.BlobChunks.DeleteOrphaned(ctx); err != nil {
|
if err := sm.repos.BlobChunks.DeleteOrphaned(ctx); err != nil {
|
||||||
return fmt.Errorf("deleting orphaned blob_chunks: %w", err)
|
return fmt.Errorf("deleting orphaned blob_chunks: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete orphaned chunks (chunks not referenced by any file)
|
// Delete orphaned chunks (chunks not referenced by any file)
|
||||||
// This must come after cleaning up blob_chunks to avoid foreign key violations
|
// This must come after cleaning up blob_chunks to avoid foreign key violations
|
||||||
log.Debug("Deleting orphaned chunks")
|
log.Debug("Deleting orphaned chunk records from database")
|
||||||
if err := sm.repos.Chunks.DeleteOrphaned(ctx); err != nil {
|
if err := sm.repos.Chunks.DeleteOrphaned(ctx); err != nil {
|
||||||
return fmt.Errorf("deleting orphaned chunks: %w", err)
|
return fmt.Errorf("deleting orphaned chunks: %w", err)
|
||||||
}
|
}
|
||||||
@ -729,44 +729,44 @@ func (sm *SnapshotManager) cleanupOrphanedData(ctx context.Context) error {
|
|||||||
|
|
||||||
// deleteOtherSnapshots deletes all snapshots except the current one
|
// deleteOtherSnapshots deletes all snapshots except the current one
|
||||||
func (sm *SnapshotManager) deleteOtherSnapshots(ctx context.Context, tx *sql.Tx, currentSnapshotID string) error {
|
func (sm *SnapshotManager) deleteOtherSnapshots(ctx context.Context, tx *sql.Tx, currentSnapshotID string) error {
|
||||||
log.Debug("[Temp DB Cleanup] Deleting other snapshots", "keeping", currentSnapshotID)
|
log.Debug("[Temp DB Cleanup] Deleting all snapshot records except current", "keeping", currentSnapshotID)
|
||||||
database.LogSQL("Execute", "DELETE FROM snapshots WHERE id != ?", currentSnapshotID)
|
database.LogSQL("Execute", "DELETE FROM snapshots WHERE id != ?", currentSnapshotID)
|
||||||
result, err := tx.ExecContext(ctx, "DELETE FROM snapshots WHERE id != ?", currentSnapshotID)
|
result, err := tx.ExecContext(ctx, "DELETE FROM snapshots WHERE id != ?", currentSnapshotID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("deleting other snapshots: %w", err)
|
return fmt.Errorf("deleting other snapshots: %w", err)
|
||||||
}
|
}
|
||||||
rowsAffected, _ := result.RowsAffected()
|
rowsAffected, _ := result.RowsAffected()
|
||||||
log.Debug("[Temp DB Cleanup] Deleted snapshots", "count", rowsAffected)
|
log.Debug("[Temp DB Cleanup] Deleted snapshot records from database", "count", rowsAffected)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteOrphanedSnapshotAssociations deletes snapshot_files and snapshot_blobs for deleted snapshots
|
// deleteOrphanedSnapshotAssociations deletes snapshot_files and snapshot_blobs for deleted snapshots
|
||||||
func (sm *SnapshotManager) deleteOrphanedSnapshotAssociations(ctx context.Context, tx *sql.Tx, currentSnapshotID string) error {
|
func (sm *SnapshotManager) deleteOrphanedSnapshotAssociations(ctx context.Context, tx *sql.Tx, currentSnapshotID string) error {
|
||||||
// Delete orphaned snapshot_files
|
// Delete orphaned snapshot_files
|
||||||
log.Debug("[Temp DB Cleanup] Deleting orphaned snapshot_files")
|
log.Debug("[Temp DB Cleanup] Deleting orphaned snapshot_files associations")
|
||||||
database.LogSQL("Execute", "DELETE FROM snapshot_files WHERE snapshot_id != ?", currentSnapshotID)
|
database.LogSQL("Execute", "DELETE FROM snapshot_files WHERE snapshot_id != ?", currentSnapshotID)
|
||||||
result, err := tx.ExecContext(ctx, "DELETE FROM snapshot_files WHERE snapshot_id != ?", currentSnapshotID)
|
result, err := tx.ExecContext(ctx, "DELETE FROM snapshot_files WHERE snapshot_id != ?", currentSnapshotID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("deleting orphaned snapshot_files: %w", err)
|
return fmt.Errorf("deleting orphaned snapshot_files: %w", err)
|
||||||
}
|
}
|
||||||
rowsAffected, _ := result.RowsAffected()
|
rowsAffected, _ := result.RowsAffected()
|
||||||
log.Debug("[Temp DB Cleanup] Deleted snapshot_files", "count", rowsAffected)
|
log.Debug("[Temp DB Cleanup] Deleted snapshot_files associations", "count", rowsAffected)
|
||||||
|
|
||||||
// Delete orphaned snapshot_blobs
|
// Delete orphaned snapshot_blobs
|
||||||
log.Debug("[Temp DB Cleanup] Deleting orphaned snapshot_blobs")
|
log.Debug("[Temp DB Cleanup] Deleting orphaned snapshot_blobs associations")
|
||||||
database.LogSQL("Execute", "DELETE FROM snapshot_blobs WHERE snapshot_id != ?", currentSnapshotID)
|
database.LogSQL("Execute", "DELETE FROM snapshot_blobs WHERE snapshot_id != ?", currentSnapshotID)
|
||||||
result, err = tx.ExecContext(ctx, "DELETE FROM snapshot_blobs WHERE snapshot_id != ?", currentSnapshotID)
|
result, err = tx.ExecContext(ctx, "DELETE FROM snapshot_blobs WHERE snapshot_id != ?", currentSnapshotID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("deleting orphaned snapshot_blobs: %w", err)
|
return fmt.Errorf("deleting orphaned snapshot_blobs: %w", err)
|
||||||
}
|
}
|
||||||
rowsAffected, _ = result.RowsAffected()
|
rowsAffected, _ = result.RowsAffected()
|
||||||
log.Debug("[Temp DB Cleanup] Deleted snapshot_blobs", "count", rowsAffected)
|
log.Debug("[Temp DB Cleanup] Deleted snapshot_blobs associations", "count", rowsAffected)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteOrphanedFiles deletes files not in the current snapshot
|
// deleteOrphanedFiles deletes files not in the current snapshot
|
||||||
func (sm *SnapshotManager) deleteOrphanedFiles(ctx context.Context, tx *sql.Tx, currentSnapshotID string) error {
|
func (sm *SnapshotManager) deleteOrphanedFiles(ctx context.Context, tx *sql.Tx, currentSnapshotID string) error {
|
||||||
log.Debug("[Temp DB Cleanup] Deleting files not in current snapshot")
|
log.Debug("[Temp DB Cleanup] Deleting file records not referenced by current snapshot")
|
||||||
database.LogSQL("Execute", `DELETE FROM files WHERE NOT EXISTS (SELECT 1 FROM snapshot_files WHERE snapshot_files.file_id = files.id AND snapshot_files.snapshot_id = ?)`, currentSnapshotID)
|
database.LogSQL("Execute", `DELETE FROM files WHERE NOT EXISTS (SELECT 1 FROM snapshot_files WHERE snapshot_files.file_id = files.id AND snapshot_files.snapshot_id = ?)`, currentSnapshotID)
|
||||||
result, err := tx.ExecContext(ctx, `
|
result, err := tx.ExecContext(ctx, `
|
||||||
DELETE FROM files
|
DELETE FROM files
|
||||||
@ -779,16 +779,16 @@ func (sm *SnapshotManager) deleteOrphanedFiles(ctx context.Context, tx *sql.Tx,
|
|||||||
return fmt.Errorf("deleting orphaned files: %w", err)
|
return fmt.Errorf("deleting orphaned files: %w", err)
|
||||||
}
|
}
|
||||||
rowsAffected, _ := result.RowsAffected()
|
rowsAffected, _ := result.RowsAffected()
|
||||||
log.Debug("[Temp DB Cleanup] Deleted files", "count", rowsAffected)
|
log.Debug("[Temp DB Cleanup] Deleted file records from database", "count", rowsAffected)
|
||||||
|
|
||||||
// Note: file_chunks will be deleted via CASCADE
|
// Note: file_chunks will be deleted via CASCADE
|
||||||
log.Debug("[Temp DB Cleanup] file_chunks will be deleted via CASCADE")
|
log.Debug("[Temp DB Cleanup] file_chunks associations deleted via CASCADE")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteOrphanedChunkToFileMappings deletes chunk_files entries for deleted files
|
// deleteOrphanedChunkToFileMappings deletes chunk_files entries for deleted files
|
||||||
func (sm *SnapshotManager) deleteOrphanedChunkToFileMappings(ctx context.Context, tx *sql.Tx) error {
|
func (sm *SnapshotManager) deleteOrphanedChunkToFileMappings(ctx context.Context, tx *sql.Tx) error {
|
||||||
log.Debug("[Temp DB Cleanup] Deleting orphaned chunk_files")
|
log.Debug("[Temp DB Cleanup] Deleting orphaned chunk_files associations")
|
||||||
database.LogSQL("Execute", `DELETE FROM chunk_files WHERE NOT EXISTS (SELECT 1 FROM files WHERE files.id = chunk_files.file_id)`)
|
database.LogSQL("Execute", `DELETE FROM chunk_files WHERE NOT EXISTS (SELECT 1 FROM files WHERE files.id = chunk_files.file_id)`)
|
||||||
result, err := tx.ExecContext(ctx, `
|
result, err := tx.ExecContext(ctx, `
|
||||||
DELETE FROM chunk_files
|
DELETE FROM chunk_files
|
||||||
@ -800,13 +800,13 @@ func (sm *SnapshotManager) deleteOrphanedChunkToFileMappings(ctx context.Context
|
|||||||
return fmt.Errorf("deleting orphaned chunk_files: %w", err)
|
return fmt.Errorf("deleting orphaned chunk_files: %w", err)
|
||||||
}
|
}
|
||||||
rowsAffected, _ := result.RowsAffected()
|
rowsAffected, _ := result.RowsAffected()
|
||||||
log.Debug("[Temp DB Cleanup] Deleted chunk_files", "count", rowsAffected)
|
log.Debug("[Temp DB Cleanup] Deleted chunk_files associations", "count", rowsAffected)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteOrphanedBlobs deletes blobs not in the current snapshot
|
// deleteOrphanedBlobs deletes blobs not in the current snapshot
|
||||||
func (sm *SnapshotManager) deleteOrphanedBlobs(ctx context.Context, tx *sql.Tx, currentSnapshotID string) error {
|
func (sm *SnapshotManager) deleteOrphanedBlobs(ctx context.Context, tx *sql.Tx, currentSnapshotID string) error {
|
||||||
log.Debug("[Temp DB Cleanup] Deleting blobs not in current snapshot")
|
log.Debug("[Temp DB Cleanup] Deleting blob records not referenced by current snapshot")
|
||||||
database.LogSQL("Execute", `DELETE FROM blobs WHERE NOT EXISTS (SELECT 1 FROM snapshot_blobs WHERE snapshot_blobs.blob_hash = blobs.blob_hash AND snapshot_blobs.snapshot_id = ?)`, currentSnapshotID)
|
database.LogSQL("Execute", `DELETE FROM blobs WHERE NOT EXISTS (SELECT 1 FROM snapshot_blobs WHERE snapshot_blobs.blob_hash = blobs.blob_hash AND snapshot_blobs.snapshot_id = ?)`, currentSnapshotID)
|
||||||
result, err := tx.ExecContext(ctx, `
|
result, err := tx.ExecContext(ctx, `
|
||||||
DELETE FROM blobs
|
DELETE FROM blobs
|
||||||
@ -819,13 +819,13 @@ func (sm *SnapshotManager) deleteOrphanedBlobs(ctx context.Context, tx *sql.Tx,
|
|||||||
return fmt.Errorf("deleting orphaned blobs: %w", err)
|
return fmt.Errorf("deleting orphaned blobs: %w", err)
|
||||||
}
|
}
|
||||||
rowsAffected, _ := result.RowsAffected()
|
rowsAffected, _ := result.RowsAffected()
|
||||||
log.Debug("[Temp DB Cleanup] Deleted blobs not in snapshot", "count", rowsAffected)
|
log.Debug("[Temp DB Cleanup] Deleted blob records from database", "count", rowsAffected)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteOrphanedBlobToChunkMappings deletes blob_chunks entries for deleted blobs
|
// deleteOrphanedBlobToChunkMappings deletes blob_chunks entries for deleted blobs
|
||||||
func (sm *SnapshotManager) deleteOrphanedBlobToChunkMappings(ctx context.Context, tx *sql.Tx) error {
|
func (sm *SnapshotManager) deleteOrphanedBlobToChunkMappings(ctx context.Context, tx *sql.Tx) error {
|
||||||
log.Debug("[Temp DB Cleanup] Deleting orphaned blob_chunks")
|
log.Debug("[Temp DB Cleanup] Deleting orphaned blob_chunks associations")
|
||||||
database.LogSQL("Execute", `DELETE FROM blob_chunks WHERE NOT EXISTS (SELECT 1 FROM blobs WHERE blobs.id = blob_chunks.blob_id)`)
|
database.LogSQL("Execute", `DELETE FROM blob_chunks WHERE NOT EXISTS (SELECT 1 FROM blobs WHERE blobs.id = blob_chunks.blob_id)`)
|
||||||
result, err := tx.ExecContext(ctx, `
|
result, err := tx.ExecContext(ctx, `
|
||||||
DELETE FROM blob_chunks
|
DELETE FROM blob_chunks
|
||||||
@ -837,13 +837,13 @@ func (sm *SnapshotManager) deleteOrphanedBlobToChunkMappings(ctx context.Context
|
|||||||
return fmt.Errorf("deleting orphaned blob_chunks: %w", err)
|
return fmt.Errorf("deleting orphaned blob_chunks: %w", err)
|
||||||
}
|
}
|
||||||
rowsAffected, _ := result.RowsAffected()
|
rowsAffected, _ := result.RowsAffected()
|
||||||
log.Debug("[Temp DB Cleanup] Deleted blob_chunks", "count", rowsAffected)
|
log.Debug("[Temp DB Cleanup] Deleted blob_chunks associations", "count", rowsAffected)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteOrphanedChunks deletes chunks not referenced by any file
|
// deleteOrphanedChunks deletes chunks not referenced by any file
|
||||||
func (sm *SnapshotManager) deleteOrphanedChunks(ctx context.Context, tx *sql.Tx) error {
|
func (sm *SnapshotManager) deleteOrphanedChunks(ctx context.Context, tx *sql.Tx) error {
|
||||||
log.Debug("[Temp DB Cleanup] Deleting orphaned chunks")
|
log.Debug("[Temp DB Cleanup] Deleting orphaned chunk records")
|
||||||
database.LogSQL("Execute", `DELETE FROM chunks WHERE NOT EXISTS (SELECT 1 FROM file_chunks WHERE file_chunks.chunk_hash = chunks.chunk_hash)`)
|
database.LogSQL("Execute", `DELETE FROM chunks WHERE NOT EXISTS (SELECT 1 FROM file_chunks WHERE file_chunks.chunk_hash = chunks.chunk_hash)`)
|
||||||
result, err := tx.ExecContext(ctx, `
|
result, err := tx.ExecContext(ctx, `
|
||||||
DELETE FROM chunks
|
DELETE FROM chunks
|
||||||
@ -855,6 +855,6 @@ func (sm *SnapshotManager) deleteOrphanedChunks(ctx context.Context, tx *sql.Tx)
|
|||||||
return fmt.Errorf("deleting orphaned chunks: %w", err)
|
return fmt.Errorf("deleting orphaned chunks: %w", err)
|
||||||
}
|
}
|
||||||
rowsAffected, _ := result.RowsAffected()
|
rowsAffected, _ := result.RowsAffected()
|
||||||
log.Debug("[Temp DB Cleanup] Deleted chunks", "count", rowsAffected)
|
log.Debug("[Temp DB Cleanup] Deleted chunk records from database", "count", rowsAffected)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -278,7 +278,7 @@ func (p *Packer) startNewBlob() error {
|
|||||||
size: 0,
|
size: 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug("Started new blob", "blob_id", blobID, "temp_file", tempFile.Name())
|
log.Debug("Created new blob container", "blob_id", blobID, "temp_file", tempFile.Name())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -286,7 +286,7 @@ func (p *Packer) startNewBlob() error {
|
|||||||
func (p *Packer) addChunkToCurrentBlob(chunk *ChunkRef) error {
|
func (p *Packer) addChunkToCurrentBlob(chunk *ChunkRef) error {
|
||||||
// Skip if chunk already in current blob
|
// Skip if chunk already in current blob
|
||||||
if p.currentBlob.chunkSet[chunk.Hash] {
|
if p.currentBlob.chunkSet[chunk.Hash] {
|
||||||
log.Debug("Skipping duplicate chunk in blob", "chunk_hash", chunk.Hash)
|
log.Debug("Skipping duplicate chunk already in current blob", "chunk_hash", chunk.Hash)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -320,7 +320,7 @@ func (p *Packer) addChunkToCurrentBlob(chunk *ChunkRef) error {
|
|||||||
return p.repos.BlobChunks.Create(ctx, tx, blobChunk)
|
return p.repos.BlobChunks.Create(ctx, tx, blobChunk)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Failed to store blob-chunk association", "error", err,
|
log.Error("Failed to store blob-chunk association in database", "error", err,
|
||||||
"blob_id", p.currentBlob.id, "chunk_hash", chunk.Hash)
|
"blob_id", p.currentBlob.id, "chunk_hash", chunk.Hash)
|
||||||
// Continue anyway - we can reconstruct this later if needed
|
// Continue anyway - we can reconstruct this later if needed
|
||||||
}
|
}
|
||||||
@ -329,7 +329,7 @@ func (p *Packer) addChunkToCurrentBlob(chunk *ChunkRef) error {
|
|||||||
// Update total size
|
// Update total size
|
||||||
p.currentBlob.size += chunkSize
|
p.currentBlob.size += chunkSize
|
||||||
|
|
||||||
log.Debug("Added chunk to blob",
|
log.Debug("Added chunk to blob container",
|
||||||
"blob_id", p.currentBlob.id,
|
"blob_id", p.currentBlob.id,
|
||||||
"chunk_hash", chunk.Hash,
|
"chunk_hash", chunk.Hash,
|
||||||
"chunk_size", len(chunk.Data),
|
"chunk_size", len(chunk.Data),
|
||||||
@ -410,7 +410,7 @@ func (p *Packer) finalizeCurrentBlob() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
compressionRatio := float64(finished.Compressed) / float64(finished.Uncompressed)
|
compressionRatio := float64(finished.Compressed) / float64(finished.Uncompressed)
|
||||||
log.Info("Finalized blob",
|
log.Info("Finalized blob (compressed and encrypted)",
|
||||||
"hash", blobHash,
|
"hash", blobHash,
|
||||||
"chunks", len(chunkRefs),
|
"chunks", len(chunkRefs),
|
||||||
"uncompressed", finished.Uncompressed,
|
"uncompressed", finished.Uncompressed,
|
||||||
@ -420,7 +420,7 @@ func (p *Packer) finalizeCurrentBlob() error {
|
|||||||
|
|
||||||
// Call blob handler if set
|
// Call blob handler if set
|
||||||
if p.blobHandler != nil {
|
if p.blobHandler != nil {
|
||||||
log.Debug("Calling blob handler", "blob_hash", blobHash[:8]+"...")
|
log.Debug("Invoking blob handler callback", "blob_hash", blobHash[:8]+"...")
|
||||||
// Reset file position for handler
|
// Reset file position for handler
|
||||||
if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil {
|
if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil {
|
||||||
p.cleanupTempFile()
|
p.cleanupTempFile()
|
||||||
@ -441,7 +441,7 @@ func (p *Packer) finalizeCurrentBlob() error {
|
|||||||
// Note: blob handler is responsible for closing/cleaning up temp file
|
// Note: blob handler is responsible for closing/cleaning up temp file
|
||||||
p.currentBlob = nil
|
p.currentBlob = nil
|
||||||
} else {
|
} else {
|
||||||
log.Debug("No blob handler set", "blob_hash", blobHash[:8]+"...")
|
log.Debug("No blob handler callback configured", "blob_hash", blobHash[:8]+"...")
|
||||||
// No handler, need to read data for legacy behavior
|
// No handler, need to read data for legacy behavior
|
||||||
if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil {
|
if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil {
|
||||||
p.cleanupTempFile()
|
p.cleanupTempFile()
|
||||||
|
@ -114,3 +114,21 @@ func (r *ChunkFileRepository) GetByFileID(ctx context.Context, fileID string) ([
|
|||||||
|
|
||||||
return chunkFiles, rows.Err()
|
return chunkFiles, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeleteByFileID deletes all chunk_files entries for a given file ID
|
||||||
|
func (r *ChunkFileRepository) DeleteByFileID(ctx context.Context, tx *sql.Tx, fileID string) error {
|
||||||
|
query := `DELETE FROM chunk_files WHERE file_id = ?`
|
||||||
|
|
||||||
|
var err error
|
||||||
|
if tx != nil {
|
||||||
|
_, err = tx.ExecContext(ctx, query, fileID)
|
||||||
|
} else {
|
||||||
|
_, err = r.db.ExecWithLog(ctx, query, fileID)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("deleting chunk files: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user