Merge feature/upload-progress-output
This commit is contained in:
@@ -1169,7 +1169,11 @@ func (s *Scanner) uploadBlobIfNeeded(ctx context.Context, blobPath string, blobW
|
|||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
progressCallback := s.makeUploadProgressCallback(ctx, finishedBlob)
|
_, _ = fmt.Fprintf(s.output, "Uploading blob: %s (%s)\n",
|
||||||
|
finishedBlob.Hash[:12]+"...",
|
||||||
|
humanize.Bytes(uint64(finishedBlob.Compressed)))
|
||||||
|
|
||||||
|
progressCallback := s.makeUploadProgressCallback(ctx, finishedBlob, startTime)
|
||||||
|
|
||||||
if err := s.storage.PutWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil {
|
if err := s.storage.PutWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil {
|
||||||
log.Error("Failed to upload blob", "hash", finishedBlob.Hash, "error", err)
|
log.Error("Failed to upload blob", "hash", finishedBlob.Hash, "error", err)
|
||||||
@@ -1201,10 +1205,14 @@ func (s *Scanner) uploadBlobIfNeeded(ctx context.Context, blobPath string, blobW
|
|||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// makeUploadProgressCallback creates a progress callback for blob uploads
|
// makeUploadProgressCallback creates a progress callback for blob uploads.
|
||||||
func (s *Scanner) makeUploadProgressCallback(ctx context.Context, finishedBlob *blob.FinishedBlob) func(int64) error {
|
// It updates the live progress reporter ~twice/sec for ETAs and prints a
|
||||||
|
// human-readable status line to s.output at most every 15 seconds.
|
||||||
|
func (s *Scanner) makeUploadProgressCallback(ctx context.Context, finishedBlob *blob.FinishedBlob, uploadStart time.Time) func(int64) error {
|
||||||
lastProgressTime := time.Now()
|
lastProgressTime := time.Now()
|
||||||
lastProgressBytes := int64(0)
|
lastProgressBytes := int64(0)
|
||||||
|
lastStdoutTime := time.Now()
|
||||||
|
const stdoutInterval = 15 * time.Second
|
||||||
|
|
||||||
return func(uploaded int64) error {
|
return func(uploaded int64) error {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
@@ -1218,6 +1226,27 @@ func (s *Scanner) makeUploadProgressCallback(ctx context.Context, finishedBlob *
|
|||||||
lastProgressTime = now
|
lastProgressTime = now
|
||||||
lastProgressBytes = uploaded
|
lastProgressBytes = uploaded
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Periodic stdout status line so the user knows the upload is alive.
|
||||||
|
if now.Sub(lastStdoutTime) >= stdoutInterval {
|
||||||
|
totalElapsed := now.Sub(uploadStart)
|
||||||
|
pct := float64(uploaded) / float64(finishedBlob.Compressed) * 100
|
||||||
|
avgSpeed := float64(uploaded) / totalElapsed.Seconds()
|
||||||
|
var eta time.Duration
|
||||||
|
if avgSpeed > 0 {
|
||||||
|
eta = time.Duration(float64(finishedBlob.Compressed-uploaded)/avgSpeed) * time.Second
|
||||||
|
}
|
||||||
|
_, _ = fmt.Fprintf(s.output, " uploading %s: %s/%s (%.0f%%), %s/sec, %s elapsed, ETA %s\n",
|
||||||
|
finishedBlob.Hash[:12]+"...",
|
||||||
|
humanize.Bytes(uint64(uploaded)),
|
||||||
|
humanize.Bytes(uint64(finishedBlob.Compressed)),
|
||||||
|
pct,
|
||||||
|
humanize.Bytes(uint64(avgSpeed)),
|
||||||
|
totalElapsed.Round(time.Second),
|
||||||
|
eta.Round(time.Second))
|
||||||
|
lastStdoutTime = now
|
||||||
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
|
|||||||
Reference in New Issue
Block a user