package database import ( "context" "database/sql" "fmt" "time" "git.eeqj.de/sneak/vaultik/internal/log" ) type BlobRepository struct { db *DB } func NewBlobRepository(db *DB) *BlobRepository { return &BlobRepository{db: db} } func (r *BlobRepository) Create(ctx context.Context, tx *sql.Tx, blob *Blob) error { query := ` INSERT INTO blobs (id, blob_hash, created_ts, finished_ts, uncompressed_size, compressed_size, uploaded_ts) VALUES (?, ?, ?, ?, ?, ?, ?) ` var finishedTS, uploadedTS *int64 if blob.FinishedTS != nil { ts := blob.FinishedTS.Unix() finishedTS = &ts } if blob.UploadedTS != nil { ts := blob.UploadedTS.Unix() uploadedTS = &ts } var err error if tx != nil { _, err = tx.ExecContext(ctx, query, blob.ID, blob.Hash, blob.CreatedTS.Unix(), finishedTS, blob.UncompressedSize, blob.CompressedSize, uploadedTS) } else { _, err = r.db.ExecWithLog(ctx, query, blob.ID, blob.Hash, blob.CreatedTS.Unix(), finishedTS, blob.UncompressedSize, blob.CompressedSize, uploadedTS) } if err != nil { return fmt.Errorf("inserting blob: %w", err) } return nil } func (r *BlobRepository) GetByHash(ctx context.Context, hash string) (*Blob, error) { query := ` SELECT id, blob_hash, created_ts, finished_ts, uncompressed_size, compressed_size, uploaded_ts FROM blobs WHERE blob_hash = ? ` var blob Blob var createdTSUnix int64 var finishedTSUnix, uploadedTSUnix sql.NullInt64 err := r.db.conn.QueryRowContext(ctx, query, hash).Scan( &blob.ID, &blob.Hash, &createdTSUnix, &finishedTSUnix, &blob.UncompressedSize, &blob.CompressedSize, &uploadedTSUnix, ) if err == sql.ErrNoRows { return nil, nil } if err != nil { return nil, fmt.Errorf("querying blob: %w", err) } blob.CreatedTS = time.Unix(createdTSUnix, 0).UTC() if finishedTSUnix.Valid { ts := time.Unix(finishedTSUnix.Int64, 0).UTC() blob.FinishedTS = &ts } if uploadedTSUnix.Valid { ts := time.Unix(uploadedTSUnix.Int64, 0).UTC() blob.UploadedTS = &ts } return &blob, nil } // GetByID retrieves a blob by its ID func (r *BlobRepository) GetByID(ctx context.Context, id string) (*Blob, error) { query := ` SELECT id, blob_hash, created_ts, finished_ts, uncompressed_size, compressed_size, uploaded_ts FROM blobs WHERE id = ? ` var blob Blob var createdTSUnix int64 var finishedTSUnix, uploadedTSUnix sql.NullInt64 err := r.db.conn.QueryRowContext(ctx, query, id).Scan( &blob.ID, &blob.Hash, &createdTSUnix, &finishedTSUnix, &blob.UncompressedSize, &blob.CompressedSize, &uploadedTSUnix, ) if err == sql.ErrNoRows { return nil, nil } if err != nil { return nil, fmt.Errorf("querying blob: %w", err) } blob.CreatedTS = time.Unix(createdTSUnix, 0).UTC() if finishedTSUnix.Valid { ts := time.Unix(finishedTSUnix.Int64, 0).UTC() blob.FinishedTS = &ts } if uploadedTSUnix.Valid { ts := time.Unix(uploadedTSUnix.Int64, 0).UTC() blob.UploadedTS = &ts } return &blob, nil } // UpdateFinished updates a blob when it's finalized func (r *BlobRepository) UpdateFinished(ctx context.Context, tx *sql.Tx, id string, hash string, uncompressedSize, compressedSize int64) error { query := ` UPDATE blobs SET blob_hash = ?, finished_ts = ?, uncompressed_size = ?, compressed_size = ? WHERE id = ? ` now := time.Now().UTC().Unix() var err error if tx != nil { _, err = tx.ExecContext(ctx, query, hash, now, uncompressedSize, compressedSize, id) } else { _, err = r.db.ExecWithLog(ctx, query, hash, now, uncompressedSize, compressedSize, id) } if err != nil { return fmt.Errorf("updating blob: %w", err) } return nil } // UpdateUploaded marks a blob as uploaded func (r *BlobRepository) UpdateUploaded(ctx context.Context, tx *sql.Tx, id string) error { query := ` UPDATE blobs SET uploaded_ts = ? WHERE id = ? ` now := time.Now().UTC().Unix() var err error if tx != nil { _, err = tx.ExecContext(ctx, query, now, id) } else { _, err = r.db.ExecWithLog(ctx, query, now, id) } if err != nil { return fmt.Errorf("marking blob as uploaded: %w", err) } return nil } // DeleteOrphaned deletes blobs that are not referenced by any snapshot func (r *BlobRepository) DeleteOrphaned(ctx context.Context) error { query := ` DELETE FROM blobs WHERE NOT EXISTS ( SELECT 1 FROM snapshot_blobs WHERE snapshot_blobs.blob_id = blobs.id ) ` result, err := r.db.ExecWithLog(ctx, query) if err != nil { return fmt.Errorf("deleting orphaned blobs: %w", err) } rowsAffected, _ := result.RowsAffected() if rowsAffected > 0 { log.Debug("Deleted orphaned blobs", "count", rowsAffected) } return nil }