From 588e84da9cc03463110811b0c70055b6a1f48f8c Mon Sep 17 00:00:00 2001 From: user Date: Tue, 17 Mar 2026 21:51:52 -0700 Subject: [PATCH] feat: concurrent manifest downloads in ListSnapshots Replace serial getManifestSize() calls with bounded concurrent downloads using errgroup. For each remote snapshot not in the local DB, manifest downloads now run in parallel (up to 10 concurrent) instead of one at a time. Changes: - Use errgroup with SetLimit(10) for bounded concurrency - Collect remote-only snapshot IDs first, pre-add entries with zero size - Download manifests concurrently, patch sizes from results - Remove now-unused getManifestSize helper (logic inlined into goroutines) - Promote golang.org/x/sync from indirect to direct dependency closes #8 --- go.mod | 2 +- internal/vaultik/snapshot.go | 99 +++++++++++++++++++++++++----------- 2 files changed, 71 insertions(+), 30 deletions(-) diff --git a/go.mod b/go.mod index 32b149b..45ada46 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/spf13/cobra v1.10.1 github.com/stretchr/testify v1.11.1 go.uber.org/fx v1.24.0 + golang.org/x/sync v0.18.0 golang.org/x/term v0.37.0 gopkg.in/yaml.v3 v3.0.1 modernc.org/sqlite v1.38.0 @@ -266,7 +267,6 @@ require ( golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect golang.org/x/net v0.47.0 // indirect golang.org/x/oauth2 v0.33.0 // indirect - golang.org/x/sync v0.18.0 // indirect golang.org/x/sys v0.38.0 // indirect golang.org/x/text v0.31.0 // indirect golang.org/x/time v0.14.0 // indirect diff --git a/internal/vaultik/snapshot.go b/internal/vaultik/snapshot.go index e0d93b2..55d03c4 100644 --- a/internal/vaultik/snapshot.go +++ b/internal/vaultik/snapshot.go @@ -8,6 +8,7 @@ import ( "regexp" "sort" "strings" + "sync" "text/tabwriter" "time" @@ -16,6 +17,7 @@ import ( "git.eeqj.de/sneak/vaultik/internal/snapshot" "git.eeqj.de/sneak/vaultik/internal/types" "github.com/dustin/go-humanize" + "golang.org/x/sync/errgroup" ) // SnapshotCreateOptions contains options for the snapshot create command @@ -388,17 +390,19 @@ func (v *Vaultik) ListSnapshots(jsonOutput bool) error { } } - // Build final snapshot list + // Build final snapshot list. + // Separate local (cheap DB lookup) from remote-only (needs manifest download). snapshots := make([]SnapshotInfo, 0, len(remoteSnapshots)) + // remoteOnly collects snapshot IDs that need a manifest download. + var remoteOnly []string + for snapshotID := range remoteSnapshots { - // Check if we have this snapshot locally if localSnap, exists := localSnapshotMap[snapshotID]; exists && localSnap.CompletedAt != nil { // Get total compressed size of all blobs referenced by this snapshot totalSize, err := v.Repositories.Snapshots.GetSnapshotTotalCompressedSize(v.ctx, snapshotID) if err != nil { log.Warn("Failed to get total compressed size", "id", snapshotID, "error", err) - // Fall back to stored blob size totalSize = localSnap.BlobSize } @@ -408,24 +412,78 @@ func (v *Vaultik) ListSnapshots(jsonOutput bool) error { CompressedSize: totalSize, }) } else { - // Remote snapshot not in local DB - fetch manifest to get size timestamp, err := parseSnapshotTimestamp(snapshotID) if err != nil { log.Warn("Failed to parse snapshot timestamp", "id", snapshotID, "error", err) continue } - - // Try to download manifest to get size - totalSize, err := v.getManifestSize(snapshotID) - if err != nil { - return fmt.Errorf("failed to get manifest size for %s: %w", snapshotID, err) - } - + // Pre-add with zero size; will be filled by concurrent downloads. snapshots = append(snapshots, SnapshotInfo{ ID: types.SnapshotID(snapshotID), Timestamp: timestamp, - CompressedSize: totalSize, + CompressedSize: 0, }) + remoteOnly = append(remoteOnly, snapshotID) + } + } + + // Download manifests concurrently for remote-only snapshots. + if len(remoteOnly) > 0 { + // maxConcurrentManifestDownloads bounds parallel manifest fetches to + // avoid overwhelming the S3 endpoint while still being much faster + // than serial downloads. + const maxConcurrentManifestDownloads = 10 + + type manifestResult struct { + snapshotID string + size int64 + } + + var ( + mu sync.Mutex + results []manifestResult + ) + + g, gctx := errgroup.WithContext(v.ctx) + g.SetLimit(maxConcurrentManifestDownloads) + + for _, sid := range remoteOnly { + g.Go(func() error { + manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", sid) + reader, err := v.Storage.Get(gctx, manifestPath) + if err != nil { + return fmt.Errorf("downloading manifest for %s: %w", sid, err) + } + defer func() { _ = reader.Close() }() + + manifest, err := snapshot.DecodeManifest(reader) + if err != nil { + return fmt.Errorf("decoding manifest for %s: %w", sid, err) + } + + mu.Lock() + results = append(results, manifestResult{ + snapshotID: sid, + size: manifest.TotalCompressedSize, + }) + mu.Unlock() + return nil + }) + } + + if err := g.Wait(); err != nil { + return fmt.Errorf("fetching manifest sizes: %w", err) + } + + // Build a lookup from results and patch the pre-added entries. + sizeMap := make(map[string]int64, len(results)) + for _, r := range results { + sizeMap[r.snapshotID] = r.size + } + for i := range snapshots { + if sz, ok := sizeMap[string(snapshots[i].ID)]; ok { + snapshots[i].CompressedSize = sz + } } } @@ -731,23 +789,6 @@ func (v *Vaultik) outputVerifyJSON(result *VerifyResult) error { // Helper methods that were previously on SnapshotApp -func (v *Vaultik) getManifestSize(snapshotID string) (int64, error) { - manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID) - - reader, err := v.Storage.Get(v.ctx, manifestPath) - if err != nil { - return 0, fmt.Errorf("downloading manifest: %w", err) - } - defer func() { _ = reader.Close() }() - - manifest, err := snapshot.DecodeManifest(reader) - if err != nil { - return 0, fmt.Errorf("decoding manifest: %w", err) - } - - return manifest.TotalCompressedSize, nil -} - func (v *Vaultik) downloadManifest(snapshotID string) (*snapshot.Manifest, error) { manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)