feat: concurrent manifest downloads in ListSnapshots #50
2
go.mod
2
go.mod
@@ -24,6 +24,7 @@ require (
|
|||||||
github.com/spf13/cobra v1.10.1
|
github.com/spf13/cobra v1.10.1
|
||||||
github.com/stretchr/testify v1.11.1
|
github.com/stretchr/testify v1.11.1
|
||||||
go.uber.org/fx v1.24.0
|
go.uber.org/fx v1.24.0
|
||||||
|
golang.org/x/sync v0.18.0
|
||||||
golang.org/x/term v0.37.0
|
golang.org/x/term v0.37.0
|
||||||
gopkg.in/yaml.v3 v3.0.1
|
gopkg.in/yaml.v3 v3.0.1
|
||||||
modernc.org/sqlite v1.38.0
|
modernc.org/sqlite v1.38.0
|
||||||
@@ -266,7 +267,6 @@ require (
|
|||||||
golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect
|
golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect
|
||||||
golang.org/x/net v0.47.0 // indirect
|
golang.org/x/net v0.47.0 // indirect
|
||||||
golang.org/x/oauth2 v0.33.0 // indirect
|
golang.org/x/oauth2 v0.33.0 // indirect
|
||||||
golang.org/x/sync v0.18.0 // indirect
|
|
||||||
golang.org/x/sys v0.38.0 // indirect
|
golang.org/x/sys v0.38.0 // indirect
|
||||||
golang.org/x/text v0.31.0 // indirect
|
golang.org/x/text v0.31.0 // indirect
|
||||||
golang.org/x/time v0.14.0 // indirect
|
golang.org/x/time v0.14.0 // indirect
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
"regexp"
|
"regexp"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"text/tabwriter"
|
"text/tabwriter"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -16,6 +17,7 @@ import (
|
|||||||
"git.eeqj.de/sneak/vaultik/internal/snapshot"
|
"git.eeqj.de/sneak/vaultik/internal/snapshot"
|
||||||
"git.eeqj.de/sneak/vaultik/internal/types"
|
"git.eeqj.de/sneak/vaultik/internal/types"
|
||||||
"github.com/dustin/go-humanize"
|
"github.com/dustin/go-humanize"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SnapshotCreateOptions contains options for the snapshot create command
|
// SnapshotCreateOptions contains options for the snapshot create command
|
||||||
@@ -438,6 +440,9 @@ func (v *Vaultik) reconcileLocalWithRemote(remoteSnapshots map[string]bool) (map
|
|||||||
func (v *Vaultik) buildSnapshotInfoList(remoteSnapshots map[string]bool, localSnapshotMap map[string]*database.Snapshot) ([]SnapshotInfo, error) {
|
func (v *Vaultik) buildSnapshotInfoList(remoteSnapshots map[string]bool, localSnapshotMap map[string]*database.Snapshot) ([]SnapshotInfo, error) {
|
||||||
snapshots := make([]SnapshotInfo, 0, len(remoteSnapshots))
|
snapshots := make([]SnapshotInfo, 0, len(remoteSnapshots))
|
||||||
|
|
||||||
|
// remoteOnly collects snapshot IDs that need a manifest download.
|
||||||
|
var remoteOnly []string
|
||||||
|
|
||||||
for snapshotID := range remoteSnapshots {
|
for snapshotID := range remoteSnapshots {
|
||||||
if localSnap, exists := localSnapshotMap[snapshotID]; exists && localSnap.CompletedAt != nil {
|
if localSnap, exists := localSnapshotMap[snapshotID]; exists && localSnap.CompletedAt != nil {
|
||||||
totalSize, err := v.Repositories.Snapshots.GetSnapshotTotalCompressedSize(v.ctx, snapshotID)
|
totalSize, err := v.Repositories.Snapshots.GetSnapshotTotalCompressedSize(v.ctx, snapshotID)
|
||||||
@@ -458,16 +463,73 @@ func (v *Vaultik) buildSnapshotInfoList(remoteSnapshots map[string]bool, localSn
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
totalSize, err := v.getManifestSize(snapshotID)
|
// Pre-add with zero size; will be filled by concurrent downloads.
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to get manifest size for %s: %w", snapshotID, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
snapshots = append(snapshots, SnapshotInfo{
|
snapshots = append(snapshots, SnapshotInfo{
|
||||||
ID: types.SnapshotID(snapshotID),
|
ID: types.SnapshotID(snapshotID),
|
||||||
Timestamp: timestamp,
|
Timestamp: timestamp,
|
||||||
CompressedSize: totalSize,
|
CompressedSize: 0,
|
||||||
})
|
})
|
||||||
|
remoteOnly = append(remoteOnly, snapshotID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Download manifests concurrently for remote-only snapshots.
|
||||||
|
if len(remoteOnly) > 0 {
|
||||||
|
// maxConcurrentManifestDownloads bounds parallel manifest fetches to
|
||||||
|
// avoid overwhelming the S3 endpoint while still being much faster
|
||||||
|
// than serial downloads.
|
||||||
|
const maxConcurrentManifestDownloads = 10
|
||||||
|
|
||||||
|
type manifestResult struct {
|
||||||
|
snapshotID string
|
||||||
|
size int64
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
mu sync.Mutex
|
||||||
|
results []manifestResult
|
||||||
|
)
|
||||||
|
|
||||||
|
g, gctx := errgroup.WithContext(v.ctx)
|
||||||
|
g.SetLimit(maxConcurrentManifestDownloads)
|
||||||
|
|
||||||
|
for _, sid := range remoteOnly {
|
||||||
|
g.Go(func() error {
|
||||||
|
manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", sid)
|
||||||
|
reader, err := v.Storage.Get(gctx, manifestPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("downloading manifest for %s: %w", sid, err)
|
||||||
|
}
|
||||||
|
defer func() { _ = reader.Close() }()
|
||||||
|
|
||||||
|
manifest, err := snapshot.DecodeManifest(reader)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("decoding manifest for %s: %w", sid, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
mu.Lock()
|
||||||
|
results = append(results, manifestResult{
|
||||||
|
snapshotID: sid,
|
||||||
|
size: manifest.TotalCompressedSize,
|
||||||
|
})
|
||||||
|
mu.Unlock()
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := g.Wait(); err != nil {
|
||||||
|
return nil, fmt.Errorf("fetching manifest sizes: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build a lookup from results and patch the pre-added entries.
|
||||||
|
sizeMap := make(map[string]int64, len(results))
|
||||||
|
for _, r := range results {
|
||||||
|
sizeMap[r.snapshotID] = r.size
|
||||||
|
}
|
||||||
|
for i := range snapshots {
|
||||||
|
if sz, ok := sizeMap[string(snapshots[i].ID)]; ok {
|
||||||
|
snapshots[i].CompressedSize = sz
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -788,23 +850,6 @@ func (v *Vaultik) outputVerifyJSON(result *VerifyResult) error {
|
|||||||
|
|
||||||
// Helper methods that were previously on SnapshotApp
|
// Helper methods that were previously on SnapshotApp
|
||||||
|
|
||||||
func (v *Vaultik) getManifestSize(snapshotID string) (int64, error) {
|
|
||||||
manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)
|
|
||||||
|
|
||||||
reader, err := v.Storage.Get(v.ctx, manifestPath)
|
|
||||||
if err != nil {
|
|
||||||
return 0, fmt.Errorf("downloading manifest: %w", err)
|
|
||||||
}
|
|
||||||
defer func() { _ = reader.Close() }()
|
|
||||||
|
|
||||||
manifest, err := snapshot.DecodeManifest(reader)
|
|
||||||
if err != nil {
|
|
||||||
return 0, fmt.Errorf("decoding manifest: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return manifest.TotalCompressedSize, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *Vaultik) downloadManifest(snapshotID string) (*snapshot.Manifest, error) {
|
func (v *Vaultik) downloadManifest(snapshotID string) (*snapshot.Manifest, error) {
|
||||||
manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)
|
manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user