From d39d939c5b8d4f285753ee89da2d864db33906d9 Mon Sep 17 00:00:00 2001 From: user Date: Tue, 17 Mar 2026 21:51:52 -0700 Subject: [PATCH] feat: concurrent manifest downloads in ListSnapshots Replace serial getManifestSize() calls with bounded concurrent downloads using errgroup. For each remote snapshot not in the local DB, manifest downloads now run in parallel (up to 10 concurrent) instead of one at a time. Changes: - Use errgroup with SetLimit(10) for bounded concurrency - Collect remote-only snapshot IDs first, pre-add entries with zero size - Download manifests concurrently, patch sizes from results - Remove now-unused getManifestSize helper (logic inlined into goroutines) - Promote golang.org/x/sync from indirect to direct dependency closes #8 --- go.mod | 2 +- internal/vaultik/snapshot.go | 91 +++++++++++++++++++++++++++--------- 2 files changed, 69 insertions(+), 24 deletions(-) diff --git a/go.mod b/go.mod index 32b149b..45ada46 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/spf13/cobra v1.10.1 github.com/stretchr/testify v1.11.1 go.uber.org/fx v1.24.0 + golang.org/x/sync v0.18.0 golang.org/x/term v0.37.0 gopkg.in/yaml.v3 v3.0.1 modernc.org/sqlite v1.38.0 @@ -266,7 +267,6 @@ require ( golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect golang.org/x/net v0.47.0 // indirect golang.org/x/oauth2 v0.33.0 // indirect - golang.org/x/sync v0.18.0 // indirect golang.org/x/sys v0.38.0 // indirect golang.org/x/text v0.31.0 // indirect golang.org/x/time v0.14.0 // indirect diff --git a/internal/vaultik/snapshot.go b/internal/vaultik/snapshot.go index 21e796d..d9eca91 100644 --- a/internal/vaultik/snapshot.go +++ b/internal/vaultik/snapshot.go @@ -8,6 +8,7 @@ import ( "regexp" "sort" "strings" + "sync" "text/tabwriter" "time" @@ -16,6 +17,7 @@ import ( "git.eeqj.de/sneak/vaultik/internal/snapshot" "git.eeqj.de/sneak/vaultik/internal/types" "github.com/dustin/go-humanize" + "golang.org/x/sync/errgroup" ) // SnapshotCreateOptions contains options for the snapshot create command @@ -438,6 +440,9 @@ func (v *Vaultik) reconcileLocalWithRemote(remoteSnapshots map[string]bool) (map func (v *Vaultik) buildSnapshotInfoList(remoteSnapshots map[string]bool, localSnapshotMap map[string]*database.Snapshot) ([]SnapshotInfo, error) { snapshots := make([]SnapshotInfo, 0, len(remoteSnapshots)) + // remoteOnly collects snapshot IDs that need a manifest download. + var remoteOnly []string + for snapshotID := range remoteSnapshots { if localSnap, exists := localSnapshotMap[snapshotID]; exists && localSnap.CompletedAt != nil { totalSize, err := v.Repositories.Snapshots.GetSnapshotTotalCompressedSize(v.ctx, snapshotID) @@ -458,16 +463,73 @@ func (v *Vaultik) buildSnapshotInfoList(remoteSnapshots map[string]bool, localSn continue } - totalSize, err := v.getManifestSize(snapshotID) - if err != nil { - return nil, fmt.Errorf("failed to get manifest size for %s: %w", snapshotID, err) - } - + // Pre-add with zero size; will be filled by concurrent downloads. snapshots = append(snapshots, SnapshotInfo{ ID: types.SnapshotID(snapshotID), Timestamp: timestamp, - CompressedSize: totalSize, + CompressedSize: 0, }) + remoteOnly = append(remoteOnly, snapshotID) + } + } + + // Download manifests concurrently for remote-only snapshots. + if len(remoteOnly) > 0 { + // maxConcurrentManifestDownloads bounds parallel manifest fetches to + // avoid overwhelming the S3 endpoint while still being much faster + // than serial downloads. + const maxConcurrentManifestDownloads = 10 + + type manifestResult struct { + snapshotID string + size int64 + } + + var ( + mu sync.Mutex + results []manifestResult + ) + + g, gctx := errgroup.WithContext(v.ctx) + g.SetLimit(maxConcurrentManifestDownloads) + + for _, sid := range remoteOnly { + g.Go(func() error { + manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", sid) + reader, err := v.Storage.Get(gctx, manifestPath) + if err != nil { + return fmt.Errorf("downloading manifest for %s: %w", sid, err) + } + defer func() { _ = reader.Close() }() + + manifest, err := snapshot.DecodeManifest(reader) + if err != nil { + return fmt.Errorf("decoding manifest for %s: %w", sid, err) + } + + mu.Lock() + results = append(results, manifestResult{ + snapshotID: sid, + size: manifest.TotalCompressedSize, + }) + mu.Unlock() + return nil + }) + } + + if err := g.Wait(); err != nil { + return nil, fmt.Errorf("fetching manifest sizes: %w", err) + } + + // Build a lookup from results and patch the pre-added entries. + sizeMap := make(map[string]int64, len(results)) + for _, r := range results { + sizeMap[r.snapshotID] = r.size + } + for i := range snapshots { + if sz, ok := sizeMap[string(snapshots[i].ID)]; ok { + snapshots[i].CompressedSize = sz + } } } @@ -788,23 +850,6 @@ func (v *Vaultik) outputVerifyJSON(result *VerifyResult) error { // Helper methods that were previously on SnapshotApp -func (v *Vaultik) getManifestSize(snapshotID string) (int64, error) { - manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID) - - reader, err := v.Storage.Get(v.ctx, manifestPath) - if err != nil { - return 0, fmt.Errorf("downloading manifest: %w", err) - } - defer func() { _ = reader.Close() }() - - manifest, err := snapshot.DecodeManifest(reader) - if err != nil { - return 0, fmt.Errorf("decoding manifest: %w", err) - } - - return manifest.TotalCompressedSize, nil -} - func (v *Vaultik) downloadManifest(snapshotID string) (*snapshot.Manifest, error) { manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)