initial
This commit is contained in:
18
internal/bsdaily/config.go
Normal file
18
internal/bsdaily/config.go
Normal file
@@ -0,0 +1,18 @@
|
||||
package bsdaily
|
||||
|
||||
import "regexp"
|
||||
|
||||
const (
|
||||
SnapshotBase = "/srv/berlin.sneak.fs.blueskyarchive/.zfs/snapshot"
|
||||
TmpBase = "/srv/tmp"
|
||||
DailiesBase = "/srv/berlin.sneak.fs.bluesky-dailies"
|
||||
DBFilename = "firehose.db"
|
||||
WALFilename = "firehose.db-wal"
|
||||
SHMFilename = "firehose.db-shm"
|
||||
|
||||
MinTmpFreeBytes = 500 * 1024 * 1024 * 1024 // 500 GB
|
||||
MinDailiesFreeBytes = 20 * 1024 * 1024 * 1024 // 20 GB
|
||||
PostCopyTmpMinFree = 100 * 1024 * 1024 * 1024 // 100 GB
|
||||
)
|
||||
|
||||
var snapshotPattern = regexp.MustCompile(`^zfs-auto-snap_daily-(\d{4}-\d{2}-\d{2})-\d{4}$`)
|
||||
49
internal/bsdaily/copy.go
Normal file
49
internal/bsdaily/copy.go
Normal file
@@ -0,0 +1,49 @@
|
||||
package bsdaily
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"os"
|
||||
)
|
||||
|
||||
func CopyFile(src, dst string) (err error) {
|
||||
slog.Info("copying file", "src", src, "dst", dst)
|
||||
|
||||
srcFile, err := os.Open(src)
|
||||
if err != nil {
|
||||
return fmt.Errorf("opening source %s: %w", src, err)
|
||||
}
|
||||
defer srcFile.Close()
|
||||
|
||||
srcInfo, err := srcFile.Stat()
|
||||
if err != nil {
|
||||
return fmt.Errorf("stat source %s: %w", src, err)
|
||||
}
|
||||
|
||||
dstFile, err := os.Create(dst)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating destination %s: %w", dst, err)
|
||||
}
|
||||
defer func() {
|
||||
if cerr := dstFile.Close(); cerr != nil && err == nil {
|
||||
err = fmt.Errorf("closing destination %s: %w", dst, cerr)
|
||||
}
|
||||
}()
|
||||
|
||||
written, err := io.Copy(dstFile, srcFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("copying data: %w", err)
|
||||
}
|
||||
|
||||
if written != srcInfo.Size() {
|
||||
return fmt.Errorf("short copy: wrote %d bytes, expected %d", written, srcInfo.Size())
|
||||
}
|
||||
|
||||
if err := dstFile.Sync(); err != nil {
|
||||
return fmt.Errorf("syncing destination %s: %w", dst, err)
|
||||
}
|
||||
|
||||
slog.Info("file copied", "dst", dst, "bytes", written)
|
||||
return nil
|
||||
}
|
||||
26
internal/bsdaily/disk.go
Normal file
26
internal/bsdaily/disk.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package bsdaily
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
func CheckFreeSpace(path string, minBytes uint64, label string) error {
|
||||
var stat unix.Statfs_t
|
||||
if err := unix.Statfs(path, &stat); err != nil {
|
||||
return fmt.Errorf("statfs %s (%s): %w", path, label, err)
|
||||
}
|
||||
free := uint64(stat.Bavail) * uint64(stat.Bsize)
|
||||
freeGB := float64(free) / (1024 * 1024 * 1024)
|
||||
minGB := float64(minBytes) / (1024 * 1024 * 1024)
|
||||
slog.Info("disk space check", "label", label, "path", path,
|
||||
"free_gb", fmt.Sprintf("%.1f", freeGB),
|
||||
"required_gb", fmt.Sprintf("%.1f", minGB))
|
||||
if free < minBytes {
|
||||
return fmt.Errorf("insufficient disk space on %s (%s): %.1f GB free, need %.1f GB",
|
||||
path, label, freeGB, minGB)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
79
internal/bsdaily/dump.go
Normal file
79
internal/bsdaily/dump.go
Normal file
@@ -0,0 +1,79 @@
|
||||
package bsdaily
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func DumpAndCompress(dbPath, outputPath string) (err error) {
|
||||
for _, tool := range []string{"sqlite3", "zstdmt"} {
|
||||
if _, err := exec.LookPath(tool); err != nil {
|
||||
return fmt.Errorf("required tool %q not found in PATH: %w", tool, err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := CheckFreeSpace(filepath.Dir(outputPath), MinDailiesFreeBytes, "dailiesBase (pre-dump)"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
outFile, err := os.Create(outputPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating output file: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if cerr := outFile.Close(); cerr != nil && err == nil {
|
||||
err = fmt.Errorf("closing output: %w", cerr)
|
||||
}
|
||||
}()
|
||||
|
||||
dumpCmd := exec.Command("sqlite3", dbPath, ".dump")
|
||||
zstdCmd := exec.Command("zstdmt", "-15")
|
||||
|
||||
pipe, err := dumpCmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating dump stdout pipe: %w", err)
|
||||
}
|
||||
zstdCmd.Stdin = pipe
|
||||
zstdCmd.Stdout = outFile
|
||||
|
||||
var dumpStderr, zstdStderr strings.Builder
|
||||
dumpCmd.Stderr = &dumpStderr
|
||||
zstdCmd.Stderr = &zstdStderr
|
||||
|
||||
slog.Info("starting sqlite3 dump and zstdmt compression")
|
||||
|
||||
if err := zstdCmd.Start(); err != nil {
|
||||
return fmt.Errorf("starting zstdmt: %w", err)
|
||||
}
|
||||
if err := dumpCmd.Start(); err != nil {
|
||||
return fmt.Errorf("starting sqlite3 dump: %w", err)
|
||||
}
|
||||
|
||||
if err := dumpCmd.Wait(); err != nil {
|
||||
return fmt.Errorf("sqlite3 dump failed: %w; stderr: %s", err, dumpStderr.String())
|
||||
}
|
||||
if err := zstdCmd.Wait(); err != nil {
|
||||
return fmt.Errorf("zstdmt failed: %w; stderr: %s", err, zstdStderr.String())
|
||||
}
|
||||
|
||||
if err := outFile.Sync(); err != nil {
|
||||
return fmt.Errorf("syncing output: %w", err)
|
||||
}
|
||||
|
||||
info, err := os.Stat(outputPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("stat output: %w", err)
|
||||
}
|
||||
slog.Info("compressed output written", "path", outputPath,
|
||||
"size_bytes", info.Size(), "size_mb", info.Size()/(1024*1024))
|
||||
|
||||
if info.Size() == 0 {
|
||||
return fmt.Errorf("compressed output is empty")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
113
internal/bsdaily/prune.go
Normal file
113
internal/bsdaily/prune.go
Normal file
@@ -0,0 +1,113 @@
|
||||
package bsdaily
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
func PruneDatabase(dbPath string, targetDay time.Time) error {
|
||||
dayStart := targetDay.Format("2006-01-02") + "T00:00:00"
|
||||
dayEnd := targetDay.AddDate(0, 0, 1).Format("2006-01-02") + "T00:00:00"
|
||||
|
||||
slog.Info("pruning database", "keep_from", dayStart, "keep_until", dayEnd)
|
||||
|
||||
db, err := sql.Open("sqlite", dbPath+"?_pragma=journal_mode(WAL)&_pragma=foreign_keys(ON)")
|
||||
if err != nil {
|
||||
return fmt.Errorf("opening database: %w", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Verify we can query
|
||||
var totalPosts int64
|
||||
if err := db.QueryRow("SELECT COUNT(*) FROM posts").Scan(&totalPosts); err != nil {
|
||||
return fmt.Errorf("counting posts: %w", err)
|
||||
}
|
||||
slog.Info("total posts before pruning", "count", totalPosts)
|
||||
|
||||
// Count posts in target range
|
||||
var keepPosts int64
|
||||
if err := db.QueryRow("SELECT COUNT(*) FROM posts WHERE timestamp >= ? AND timestamp < ?",
|
||||
dayStart, dayEnd).Scan(&keepPosts); err != nil {
|
||||
return fmt.Errorf("counting target posts: %w", err)
|
||||
}
|
||||
slog.Info("posts in target day", "count", keepPosts, "date", targetDay.Format("2006-01-02"))
|
||||
|
||||
if keepPosts == 0 {
|
||||
return fmt.Errorf("no posts found for target day %s - aborting to avoid producing empty output",
|
||||
targetDay.Format("2006-01-02"))
|
||||
}
|
||||
|
||||
// Delete posts outside target day (CASCADE handles junction tables)
|
||||
slog.Info("deleting posts outside target day")
|
||||
result, err := db.Exec("DELETE FROM posts WHERE timestamp < ? OR timestamp >= ?", dayStart, dayEnd)
|
||||
if err != nil {
|
||||
return fmt.Errorf("deleting posts: %w", err)
|
||||
}
|
||||
deleted, _ := result.RowsAffected()
|
||||
slog.Info("deleted posts", "count", deleted)
|
||||
|
||||
// Clean up orphaned hashtags
|
||||
slog.Info("cleaning orphaned hashtags")
|
||||
result, err = db.Exec("DELETE FROM hashtags WHERE id NOT IN (SELECT DISTINCT hashtag_id FROM posts_hashtags)")
|
||||
if err != nil {
|
||||
return fmt.Errorf("deleting orphaned hashtags: %w", err)
|
||||
}
|
||||
deleted, _ = result.RowsAffected()
|
||||
slog.Info("deleted orphaned hashtags", "count", deleted)
|
||||
|
||||
// Clean up orphaned urls
|
||||
slog.Info("cleaning orphaned urls")
|
||||
result, err = db.Exec("DELETE FROM urls WHERE id NOT IN (SELECT DISTINCT url_id FROM posts_urls)")
|
||||
if err != nil {
|
||||
return fmt.Errorf("deleting orphaned urls: %w", err)
|
||||
}
|
||||
deleted, _ = result.RowsAffected()
|
||||
slog.Info("deleted orphaned urls", "count", deleted)
|
||||
|
||||
// Clear media (no FK to posts, not useful in daily segment)
|
||||
slog.Info("clearing media table")
|
||||
result, err = db.Exec("DELETE FROM media")
|
||||
if err != nil {
|
||||
return fmt.Errorf("deleting media: %w", err)
|
||||
}
|
||||
deleted, _ = result.RowsAffected()
|
||||
slog.Info("deleted media rows", "count", deleted)
|
||||
|
||||
// Clean up orphaned users
|
||||
slog.Info("cleaning orphaned users")
|
||||
result, err = db.Exec("DELETE FROM users WHERE did NOT IN (SELECT DISTINCT user_did FROM posts)")
|
||||
if err != nil {
|
||||
return fmt.Errorf("deleting orphaned users: %w", err)
|
||||
}
|
||||
deleted, _ = result.RowsAffected()
|
||||
slog.Info("deleted orphaned users", "count", deleted)
|
||||
|
||||
// Verify remaining count
|
||||
var remaining int64
|
||||
if err := db.QueryRow("SELECT COUNT(*) FROM posts").Scan(&remaining); err != nil {
|
||||
return fmt.Errorf("counting remaining posts: %w", err)
|
||||
}
|
||||
if remaining != keepPosts {
|
||||
return fmt.Errorf("post count mismatch after prune: expected %d, got %d", keepPosts, remaining)
|
||||
}
|
||||
slog.Info("posts remaining after prune", "count", remaining)
|
||||
|
||||
// VACUUM to reclaim space
|
||||
slog.Info("running VACUUM (this may take a while)")
|
||||
if _, err := db.Exec("VACUUM"); err != nil {
|
||||
return fmt.Errorf("vacuum: %w", err)
|
||||
}
|
||||
|
||||
info, err := os.Stat(dbPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("stat after vacuum: %w", err)
|
||||
}
|
||||
slog.Info("database size after vacuum", "bytes", info.Size(), "mb", info.Size()/(1024*1024))
|
||||
|
||||
return nil
|
||||
}
|
||||
121
internal/bsdaily/run.go
Normal file
121
internal/bsdaily/run.go
Normal file
@@ -0,0 +1,121 @@
|
||||
package bsdaily
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
func Run() error {
|
||||
snapshotDir, snapshotDate, err := FindLatestDailySnapshot()
|
||||
if err != nil {
|
||||
return fmt.Errorf("finding latest snapshot: %w", err)
|
||||
}
|
||||
slog.Info("found latest daily snapshot", "dir", snapshotDir, "snapshot_date", snapshotDate.Format("2006-01-02"))
|
||||
|
||||
targetDay := snapshotDate.AddDate(0, 0, -1)
|
||||
slog.Info("target day for extraction", "date", targetDay.Format("2006-01-02"))
|
||||
|
||||
// Check if output already exists
|
||||
outputDir := filepath.Join(DailiesBase, targetDay.Format("2006-01"))
|
||||
outputFinal := filepath.Join(outputDir, targetDay.Format("2006-01-02")+".sql.zst")
|
||||
if _, err := os.Stat(outputFinal); err == nil {
|
||||
return fmt.Errorf("output file already exists: %s", outputFinal)
|
||||
}
|
||||
|
||||
// Check disk space
|
||||
if err := CheckFreeSpace(TmpBase, MinTmpFreeBytes, "tmpBase"); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := CheckFreeSpace(DailiesBase, MinDailiesFreeBytes, "dailiesBase"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create temp directory
|
||||
tmpDir, err := os.MkdirTemp(TmpBase, "bsarchivesegment-*")
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating temp directory in %s: %w", TmpBase, err)
|
||||
}
|
||||
slog.Info("created temp directory", "path", tmpDir)
|
||||
defer func() {
|
||||
slog.Info("cleaning up temp directory", "path", tmpDir)
|
||||
if err := os.RemoveAll(tmpDir); err != nil {
|
||||
slog.Error("failed to remove temp directory", "path", tmpDir, "error", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Copy database files from snapshot to temp
|
||||
srcDB := filepath.Join(snapshotDir, DBFilename)
|
||||
srcWAL := filepath.Join(snapshotDir, WALFilename)
|
||||
srcSHM := filepath.Join(snapshotDir, SHMFilename)
|
||||
dstDB := filepath.Join(tmpDir, DBFilename)
|
||||
dstWAL := filepath.Join(tmpDir, WALFilename)
|
||||
dstSHM := filepath.Join(tmpDir, SHMFilename)
|
||||
|
||||
for _, f := range []string{srcDB, srcWAL} {
|
||||
info, err := os.Stat(f)
|
||||
if err != nil {
|
||||
return fmt.Errorf("source file missing: %s: %w", f, err)
|
||||
}
|
||||
if info.Size() == 0 {
|
||||
return fmt.Errorf("source file is empty: %s", f)
|
||||
}
|
||||
slog.Info("source file", "path", f, "size_bytes", info.Size())
|
||||
}
|
||||
|
||||
if err := CopyFile(srcDB, dstDB); err != nil {
|
||||
return fmt.Errorf("copying database: %w", err)
|
||||
}
|
||||
if err := CopyFile(srcWAL, dstWAL); err != nil {
|
||||
return fmt.Errorf("copying WAL: %w", err)
|
||||
}
|
||||
if _, err := os.Stat(srcSHM); err == nil {
|
||||
if err := CopyFile(srcSHM, dstSHM); err != nil {
|
||||
return fmt.Errorf("copying SHM: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := CheckFreeSpace(TmpBase, PostCopyTmpMinFree, "tmpBase (post-copy)"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Prune database to target day only
|
||||
slog.Info("opening database for pruning", "path", dstDB)
|
||||
if err := PruneDatabase(dstDB, targetDay); err != nil {
|
||||
return fmt.Errorf("pruning database: %w", err)
|
||||
}
|
||||
|
||||
// Dump to SQL and compress
|
||||
if err := os.MkdirAll(outputDir, 0755); err != nil {
|
||||
return fmt.Errorf("creating output directory %s: %w", outputDir, err)
|
||||
}
|
||||
|
||||
outputTmp := filepath.Join(outputDir, "."+targetDay.Format("2006-01-02")+".sql.zst.tmp")
|
||||
|
||||
slog.Info("dumping and compressing", "tmp_output", outputTmp)
|
||||
if err := DumpAndCompress(dstDB, outputTmp); err != nil {
|
||||
os.Remove(outputTmp)
|
||||
return fmt.Errorf("dump and compress: %w", err)
|
||||
}
|
||||
|
||||
slog.Info("verifying compressed output")
|
||||
if err := VerifyOutput(outputTmp); err != nil {
|
||||
os.Remove(outputTmp)
|
||||
return fmt.Errorf("verification failed: %w", err)
|
||||
}
|
||||
|
||||
// Atomic rename to final path
|
||||
slog.Info("renaming to final output", "from", outputTmp, "to", outputFinal)
|
||||
if err := os.Rename(outputTmp, outputFinal); err != nil {
|
||||
return fmt.Errorf("atomic rename: %w", err)
|
||||
}
|
||||
|
||||
info, err := os.Stat(outputFinal)
|
||||
if err != nil {
|
||||
return fmt.Errorf("stat final output: %w", err)
|
||||
}
|
||||
slog.Info("final output written", "path", outputFinal, "size_bytes", info.Size())
|
||||
|
||||
return nil
|
||||
}
|
||||
57
internal/bsdaily/snapshot.go
Normal file
57
internal/bsdaily/snapshot.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package bsdaily
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"time"
|
||||
)
|
||||
|
||||
func FindLatestDailySnapshot() (dir string, snapshotDate time.Time, err error) {
|
||||
entries, err := os.ReadDir(SnapshotBase)
|
||||
if err != nil {
|
||||
return "", time.Time{}, fmt.Errorf("reading snapshot directory %s: %w", SnapshotBase, err)
|
||||
}
|
||||
|
||||
type snapshot struct {
|
||||
name string
|
||||
date time.Time
|
||||
}
|
||||
|
||||
var snapshots []snapshot
|
||||
for _, e := range entries {
|
||||
if !e.IsDir() {
|
||||
continue
|
||||
}
|
||||
m := snapshotPattern.FindStringSubmatch(e.Name())
|
||||
if m == nil {
|
||||
continue
|
||||
}
|
||||
d, err := time.Parse("2006-01-02", m[1])
|
||||
if err != nil {
|
||||
slog.Warn("skipping snapshot with unparseable date", "name", e.Name(), "error", err)
|
||||
continue
|
||||
}
|
||||
snapshots = append(snapshots, snapshot{name: e.Name(), date: d})
|
||||
}
|
||||
|
||||
if len(snapshots) == 0 {
|
||||
return "", time.Time{}, fmt.Errorf("no daily snapshots found in %s", SnapshotBase)
|
||||
}
|
||||
|
||||
sort.Slice(snapshots, func(i, j int) bool {
|
||||
return snapshots[i].date.After(snapshots[j].date)
|
||||
})
|
||||
|
||||
latest := snapshots[0]
|
||||
dir = filepath.Join(SnapshotBase, latest.name)
|
||||
|
||||
dbPath := filepath.Join(dir, DBFilename)
|
||||
if _, err := os.Stat(dbPath); err != nil {
|
||||
return "", time.Time{}, fmt.Errorf("database not found in snapshot %s: %w", dir, err)
|
||||
}
|
||||
|
||||
return dir, latest.date, nil
|
||||
}
|
||||
63
internal/bsdaily/verify.go
Normal file
63
internal/bsdaily/verify.go
Normal file
@@ -0,0 +1,63 @@
|
||||
package bsdaily
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os/exec"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func VerifyOutput(path string) error {
|
||||
slog.Info("running zstdmt integrity check")
|
||||
testCmd := exec.Command("zstdmt", "--test", path)
|
||||
var testStderr strings.Builder
|
||||
testCmd.Stderr = &testStderr
|
||||
if err := testCmd.Run(); err != nil {
|
||||
return fmt.Errorf("zstdmt --test failed: %w; stderr: %s", err, testStderr.String())
|
||||
}
|
||||
slog.Info("zstdmt integrity check passed")
|
||||
|
||||
slog.Info("verifying SQL content")
|
||||
catCmd := exec.Command("zstdcat", path)
|
||||
headCmd := exec.Command("head", "-20")
|
||||
|
||||
pipe, err := catCmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating zstdcat pipe: %w", err)
|
||||
}
|
||||
headCmd.Stdin = pipe
|
||||
|
||||
var headOut strings.Builder
|
||||
headCmd.Stdout = &headOut
|
||||
|
||||
if err := headCmd.Start(); err != nil {
|
||||
return fmt.Errorf("starting head: %w", err)
|
||||
}
|
||||
if err := catCmd.Start(); err != nil {
|
||||
return fmt.Errorf("starting zstdcat: %w", err)
|
||||
}
|
||||
|
||||
// head closes pipe early, zstdcat gets SIGPIPE - expected
|
||||
_ = headCmd.Wait()
|
||||
_ = catCmd.Wait()
|
||||
|
||||
content := headOut.String()
|
||||
if len(content) == 0 {
|
||||
return fmt.Errorf("decompressed content is empty")
|
||||
}
|
||||
|
||||
hasSQLMarker := false
|
||||
for _, marker := range []string{"BEGIN TRANSACTION", "CREATE TABLE", "INSERT INTO", "PRAGMA"} {
|
||||
if strings.Contains(content, marker) {
|
||||
hasSQLMarker = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !hasSQLMarker {
|
||||
return fmt.Errorf("decompressed content does not look like SQL; first 200 bytes: %s",
|
||||
content[:min(200, len(content))])
|
||||
}
|
||||
|
||||
slog.Info("SQL content verification passed")
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user