commit e6e85b6e11bde5956d3d09cd03ad5183558f0cb8 Author: sneak Date: Mon Feb 9 00:42:06 2026 -0800 initial diff --git a/cmd/bsdaily/main.go b/cmd/bsdaily/main.go new file mode 100644 index 0000000..a05c68b --- /dev/null +++ b/cmd/bsdaily/main.go @@ -0,0 +1,21 @@ +package main + +import ( + "log/slog" + "os" + + "github.com/sneak/bsarchivesegment/internal/bsdaily" +) + +func main() { + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{ + Level: slog.LevelInfo, + })) + slog.SetDefault(logger) + + if err := bsdaily.Run(); err != nil { + slog.Error("fatal error", "error", err) + os.Exit(1) + } + slog.Info("completed successfully") +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..829d2f7 --- /dev/null +++ b/go.mod @@ -0,0 +1,20 @@ +module github.com/sneak/bsarchivesegment + +go 1.25.5 + +require ( + golang.org/x/sys v0.41.0 + modernc.org/sqlite v1.44.3 +) + +require ( + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/ncruces/go-strftime v1.0.0 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect + modernc.org/libc v1.67.6 // indirect + modernc.org/mathutil v1.7.1 // indirect + modernc.org/memory v1.11.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..27915ba --- /dev/null +++ b/go.sum @@ -0,0 +1,53 @@ +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= +github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 h1:mgKeJMpvi0yx/sU5GsxQ7p6s2wtOnGAHZWCHUM4KGzY= +golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70= +golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA= +golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= +golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ= +golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs= +modernc.org/cc/v4 v4.27.1 h1:9W30zRlYrefrDV2JE2O8VDtJ1yPGownxciz5rrbQZis= +modernc.org/cc/v4 v4.27.1/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0= +modernc.org/ccgo/v4 v4.30.1 h1:4r4U1J6Fhj98NKfSjnPUN7Ze2c6MnAdL0hWw6+LrJpc= +modernc.org/ccgo/v4 v4.30.1/go.mod h1:bIOeI1JL54Utlxn+LwrFyjCx2n2RDiYEaJVSrgdrRfM= +modernc.org/fileutil v1.3.40 h1:ZGMswMNc9JOCrcrakF1HrvmergNLAmxOPjizirpfqBA= +modernc.org/fileutil v1.3.40/go.mod h1:HxmghZSZVAz/LXcMNwZPA/DRrQZEVP9VX0V4LQGQFOc= +modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI= +modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito= +modernc.org/gc/v3 v3.1.1 h1:k8T3gkXWY9sEiytKhcgyiZ2L0DTyCQ/nvX+LoCljoRE= +modernc.org/gc/v3 v3.1.1/go.mod h1:HFK/6AGESC7Ex+EZJhJ2Gni6cTaYpSMmU/cT9RmlfYY= +modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks= +modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI= +modernc.org/libc v1.67.6 h1:eVOQvpModVLKOdT+LvBPjdQqfrZq+pC39BygcT+E7OI= +modernc.org/libc v1.67.6/go.mod h1:JAhxUVlolfYDErnwiqaLvUqc8nfb2r6S6slAgZOnaiE= +modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= +modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= +modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI= +modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw= +modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8= +modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns= +modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w= +modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE= +modernc.org/sqlite v1.44.3 h1:+39JvV/HWMcYslAwRxHb8067w+2zowvFOUrOWIy9PjY= +modernc.org/sqlite v1.44.3/go.mod h1:CzbrU2lSB1DKUusvwGz7rqEKIq+NUd8GWuBBZDs9/nA= +modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0= +modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A= +modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= +modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= diff --git a/internal/bsdaily/config.go b/internal/bsdaily/config.go new file mode 100644 index 0000000..85e23e3 --- /dev/null +++ b/internal/bsdaily/config.go @@ -0,0 +1,18 @@ +package bsdaily + +import "regexp" + +const ( + SnapshotBase = "/srv/berlin.sneak.fs.blueskyarchive/.zfs/snapshot" + TmpBase = "/srv/tmp" + DailiesBase = "/srv/berlin.sneak.fs.bluesky-dailies" + DBFilename = "firehose.db" + WALFilename = "firehose.db-wal" + SHMFilename = "firehose.db-shm" + + MinTmpFreeBytes = 500 * 1024 * 1024 * 1024 // 500 GB + MinDailiesFreeBytes = 20 * 1024 * 1024 * 1024 // 20 GB + PostCopyTmpMinFree = 100 * 1024 * 1024 * 1024 // 100 GB +) + +var snapshotPattern = regexp.MustCompile(`^zfs-auto-snap_daily-(\d{4}-\d{2}-\d{2})-\d{4}$`) diff --git a/internal/bsdaily/copy.go b/internal/bsdaily/copy.go new file mode 100644 index 0000000..2ae1b45 --- /dev/null +++ b/internal/bsdaily/copy.go @@ -0,0 +1,49 @@ +package bsdaily + +import ( + "fmt" + "io" + "log/slog" + "os" +) + +func CopyFile(src, dst string) (err error) { + slog.Info("copying file", "src", src, "dst", dst) + + srcFile, err := os.Open(src) + if err != nil { + return fmt.Errorf("opening source %s: %w", src, err) + } + defer srcFile.Close() + + srcInfo, err := srcFile.Stat() + if err != nil { + return fmt.Errorf("stat source %s: %w", src, err) + } + + dstFile, err := os.Create(dst) + if err != nil { + return fmt.Errorf("creating destination %s: %w", dst, err) + } + defer func() { + if cerr := dstFile.Close(); cerr != nil && err == nil { + err = fmt.Errorf("closing destination %s: %w", dst, cerr) + } + }() + + written, err := io.Copy(dstFile, srcFile) + if err != nil { + return fmt.Errorf("copying data: %w", err) + } + + if written != srcInfo.Size() { + return fmt.Errorf("short copy: wrote %d bytes, expected %d", written, srcInfo.Size()) + } + + if err := dstFile.Sync(); err != nil { + return fmt.Errorf("syncing destination %s: %w", dst, err) + } + + slog.Info("file copied", "dst", dst, "bytes", written) + return nil +} diff --git a/internal/bsdaily/disk.go b/internal/bsdaily/disk.go new file mode 100644 index 0000000..51245db --- /dev/null +++ b/internal/bsdaily/disk.go @@ -0,0 +1,26 @@ +package bsdaily + +import ( + "fmt" + "log/slog" + + "golang.org/x/sys/unix" +) + +func CheckFreeSpace(path string, minBytes uint64, label string) error { + var stat unix.Statfs_t + if err := unix.Statfs(path, &stat); err != nil { + return fmt.Errorf("statfs %s (%s): %w", path, label, err) + } + free := uint64(stat.Bavail) * uint64(stat.Bsize) + freeGB := float64(free) / (1024 * 1024 * 1024) + minGB := float64(minBytes) / (1024 * 1024 * 1024) + slog.Info("disk space check", "label", label, "path", path, + "free_gb", fmt.Sprintf("%.1f", freeGB), + "required_gb", fmt.Sprintf("%.1f", minGB)) + if free < minBytes { + return fmt.Errorf("insufficient disk space on %s (%s): %.1f GB free, need %.1f GB", + path, label, freeGB, minGB) + } + return nil +} diff --git a/internal/bsdaily/dump.go b/internal/bsdaily/dump.go new file mode 100644 index 0000000..549070d --- /dev/null +++ b/internal/bsdaily/dump.go @@ -0,0 +1,79 @@ +package bsdaily + +import ( + "fmt" + "log/slog" + "os" + "os/exec" + "path/filepath" + "strings" +) + +func DumpAndCompress(dbPath, outputPath string) (err error) { + for _, tool := range []string{"sqlite3", "zstdmt"} { + if _, err := exec.LookPath(tool); err != nil { + return fmt.Errorf("required tool %q not found in PATH: %w", tool, err) + } + } + + if err := CheckFreeSpace(filepath.Dir(outputPath), MinDailiesFreeBytes, "dailiesBase (pre-dump)"); err != nil { + return err + } + + outFile, err := os.Create(outputPath) + if err != nil { + return fmt.Errorf("creating output file: %w", err) + } + defer func() { + if cerr := outFile.Close(); cerr != nil && err == nil { + err = fmt.Errorf("closing output: %w", cerr) + } + }() + + dumpCmd := exec.Command("sqlite3", dbPath, ".dump") + zstdCmd := exec.Command("zstdmt", "-15") + + pipe, err := dumpCmd.StdoutPipe() + if err != nil { + return fmt.Errorf("creating dump stdout pipe: %w", err) + } + zstdCmd.Stdin = pipe + zstdCmd.Stdout = outFile + + var dumpStderr, zstdStderr strings.Builder + dumpCmd.Stderr = &dumpStderr + zstdCmd.Stderr = &zstdStderr + + slog.Info("starting sqlite3 dump and zstdmt compression") + + if err := zstdCmd.Start(); err != nil { + return fmt.Errorf("starting zstdmt: %w", err) + } + if err := dumpCmd.Start(); err != nil { + return fmt.Errorf("starting sqlite3 dump: %w", err) + } + + if err := dumpCmd.Wait(); err != nil { + return fmt.Errorf("sqlite3 dump failed: %w; stderr: %s", err, dumpStderr.String()) + } + if err := zstdCmd.Wait(); err != nil { + return fmt.Errorf("zstdmt failed: %w; stderr: %s", err, zstdStderr.String()) + } + + if err := outFile.Sync(); err != nil { + return fmt.Errorf("syncing output: %w", err) + } + + info, err := os.Stat(outputPath) + if err != nil { + return fmt.Errorf("stat output: %w", err) + } + slog.Info("compressed output written", "path", outputPath, + "size_bytes", info.Size(), "size_mb", info.Size()/(1024*1024)) + + if info.Size() == 0 { + return fmt.Errorf("compressed output is empty") + } + + return nil +} diff --git a/internal/bsdaily/prune.go b/internal/bsdaily/prune.go new file mode 100644 index 0000000..9225b04 --- /dev/null +++ b/internal/bsdaily/prune.go @@ -0,0 +1,113 @@ +package bsdaily + +import ( + "database/sql" + "fmt" + "log/slog" + "os" + "time" + + _ "modernc.org/sqlite" +) + +func PruneDatabase(dbPath string, targetDay time.Time) error { + dayStart := targetDay.Format("2006-01-02") + "T00:00:00" + dayEnd := targetDay.AddDate(0, 0, 1).Format("2006-01-02") + "T00:00:00" + + slog.Info("pruning database", "keep_from", dayStart, "keep_until", dayEnd) + + db, err := sql.Open("sqlite", dbPath+"?_pragma=journal_mode(WAL)&_pragma=foreign_keys(ON)") + if err != nil { + return fmt.Errorf("opening database: %w", err) + } + defer db.Close() + + // Verify we can query + var totalPosts int64 + if err := db.QueryRow("SELECT COUNT(*) FROM posts").Scan(&totalPosts); err != nil { + return fmt.Errorf("counting posts: %w", err) + } + slog.Info("total posts before pruning", "count", totalPosts) + + // Count posts in target range + var keepPosts int64 + if err := db.QueryRow("SELECT COUNT(*) FROM posts WHERE timestamp >= ? AND timestamp < ?", + dayStart, dayEnd).Scan(&keepPosts); err != nil { + return fmt.Errorf("counting target posts: %w", err) + } + slog.Info("posts in target day", "count", keepPosts, "date", targetDay.Format("2006-01-02")) + + if keepPosts == 0 { + return fmt.Errorf("no posts found for target day %s - aborting to avoid producing empty output", + targetDay.Format("2006-01-02")) + } + + // Delete posts outside target day (CASCADE handles junction tables) + slog.Info("deleting posts outside target day") + result, err := db.Exec("DELETE FROM posts WHERE timestamp < ? OR timestamp >= ?", dayStart, dayEnd) + if err != nil { + return fmt.Errorf("deleting posts: %w", err) + } + deleted, _ := result.RowsAffected() + slog.Info("deleted posts", "count", deleted) + + // Clean up orphaned hashtags + slog.Info("cleaning orphaned hashtags") + result, err = db.Exec("DELETE FROM hashtags WHERE id NOT IN (SELECT DISTINCT hashtag_id FROM posts_hashtags)") + if err != nil { + return fmt.Errorf("deleting orphaned hashtags: %w", err) + } + deleted, _ = result.RowsAffected() + slog.Info("deleted orphaned hashtags", "count", deleted) + + // Clean up orphaned urls + slog.Info("cleaning orphaned urls") + result, err = db.Exec("DELETE FROM urls WHERE id NOT IN (SELECT DISTINCT url_id FROM posts_urls)") + if err != nil { + return fmt.Errorf("deleting orphaned urls: %w", err) + } + deleted, _ = result.RowsAffected() + slog.Info("deleted orphaned urls", "count", deleted) + + // Clear media (no FK to posts, not useful in daily segment) + slog.Info("clearing media table") + result, err = db.Exec("DELETE FROM media") + if err != nil { + return fmt.Errorf("deleting media: %w", err) + } + deleted, _ = result.RowsAffected() + slog.Info("deleted media rows", "count", deleted) + + // Clean up orphaned users + slog.Info("cleaning orphaned users") + result, err = db.Exec("DELETE FROM users WHERE did NOT IN (SELECT DISTINCT user_did FROM posts)") + if err != nil { + return fmt.Errorf("deleting orphaned users: %w", err) + } + deleted, _ = result.RowsAffected() + slog.Info("deleted orphaned users", "count", deleted) + + // Verify remaining count + var remaining int64 + if err := db.QueryRow("SELECT COUNT(*) FROM posts").Scan(&remaining); err != nil { + return fmt.Errorf("counting remaining posts: %w", err) + } + if remaining != keepPosts { + return fmt.Errorf("post count mismatch after prune: expected %d, got %d", keepPosts, remaining) + } + slog.Info("posts remaining after prune", "count", remaining) + + // VACUUM to reclaim space + slog.Info("running VACUUM (this may take a while)") + if _, err := db.Exec("VACUUM"); err != nil { + return fmt.Errorf("vacuum: %w", err) + } + + info, err := os.Stat(dbPath) + if err != nil { + return fmt.Errorf("stat after vacuum: %w", err) + } + slog.Info("database size after vacuum", "bytes", info.Size(), "mb", info.Size()/(1024*1024)) + + return nil +} diff --git a/internal/bsdaily/run.go b/internal/bsdaily/run.go new file mode 100644 index 0000000..08dd826 --- /dev/null +++ b/internal/bsdaily/run.go @@ -0,0 +1,121 @@ +package bsdaily + +import ( + "fmt" + "log/slog" + "os" + "path/filepath" +) + +func Run() error { + snapshotDir, snapshotDate, err := FindLatestDailySnapshot() + if err != nil { + return fmt.Errorf("finding latest snapshot: %w", err) + } + slog.Info("found latest daily snapshot", "dir", snapshotDir, "snapshot_date", snapshotDate.Format("2006-01-02")) + + targetDay := snapshotDate.AddDate(0, 0, -1) + slog.Info("target day for extraction", "date", targetDay.Format("2006-01-02")) + + // Check if output already exists + outputDir := filepath.Join(DailiesBase, targetDay.Format("2006-01")) + outputFinal := filepath.Join(outputDir, targetDay.Format("2006-01-02")+".sql.zst") + if _, err := os.Stat(outputFinal); err == nil { + return fmt.Errorf("output file already exists: %s", outputFinal) + } + + // Check disk space + if err := CheckFreeSpace(TmpBase, MinTmpFreeBytes, "tmpBase"); err != nil { + return err + } + if err := CheckFreeSpace(DailiesBase, MinDailiesFreeBytes, "dailiesBase"); err != nil { + return err + } + + // Create temp directory + tmpDir, err := os.MkdirTemp(TmpBase, "bsarchivesegment-*") + if err != nil { + return fmt.Errorf("creating temp directory in %s: %w", TmpBase, err) + } + slog.Info("created temp directory", "path", tmpDir) + defer func() { + slog.Info("cleaning up temp directory", "path", tmpDir) + if err := os.RemoveAll(tmpDir); err != nil { + slog.Error("failed to remove temp directory", "path", tmpDir, "error", err) + } + }() + + // Copy database files from snapshot to temp + srcDB := filepath.Join(snapshotDir, DBFilename) + srcWAL := filepath.Join(snapshotDir, WALFilename) + srcSHM := filepath.Join(snapshotDir, SHMFilename) + dstDB := filepath.Join(tmpDir, DBFilename) + dstWAL := filepath.Join(tmpDir, WALFilename) + dstSHM := filepath.Join(tmpDir, SHMFilename) + + for _, f := range []string{srcDB, srcWAL} { + info, err := os.Stat(f) + if err != nil { + return fmt.Errorf("source file missing: %s: %w", f, err) + } + if info.Size() == 0 { + return fmt.Errorf("source file is empty: %s", f) + } + slog.Info("source file", "path", f, "size_bytes", info.Size()) + } + + if err := CopyFile(srcDB, dstDB); err != nil { + return fmt.Errorf("copying database: %w", err) + } + if err := CopyFile(srcWAL, dstWAL); err != nil { + return fmt.Errorf("copying WAL: %w", err) + } + if _, err := os.Stat(srcSHM); err == nil { + if err := CopyFile(srcSHM, dstSHM); err != nil { + return fmt.Errorf("copying SHM: %w", err) + } + } + + if err := CheckFreeSpace(TmpBase, PostCopyTmpMinFree, "tmpBase (post-copy)"); err != nil { + return err + } + + // Prune database to target day only + slog.Info("opening database for pruning", "path", dstDB) + if err := PruneDatabase(dstDB, targetDay); err != nil { + return fmt.Errorf("pruning database: %w", err) + } + + // Dump to SQL and compress + if err := os.MkdirAll(outputDir, 0755); err != nil { + return fmt.Errorf("creating output directory %s: %w", outputDir, err) + } + + outputTmp := filepath.Join(outputDir, "."+targetDay.Format("2006-01-02")+".sql.zst.tmp") + + slog.Info("dumping and compressing", "tmp_output", outputTmp) + if err := DumpAndCompress(dstDB, outputTmp); err != nil { + os.Remove(outputTmp) + return fmt.Errorf("dump and compress: %w", err) + } + + slog.Info("verifying compressed output") + if err := VerifyOutput(outputTmp); err != nil { + os.Remove(outputTmp) + return fmt.Errorf("verification failed: %w", err) + } + + // Atomic rename to final path + slog.Info("renaming to final output", "from", outputTmp, "to", outputFinal) + if err := os.Rename(outputTmp, outputFinal); err != nil { + return fmt.Errorf("atomic rename: %w", err) + } + + info, err := os.Stat(outputFinal) + if err != nil { + return fmt.Errorf("stat final output: %w", err) + } + slog.Info("final output written", "path", outputFinal, "size_bytes", info.Size()) + + return nil +} diff --git a/internal/bsdaily/snapshot.go b/internal/bsdaily/snapshot.go new file mode 100644 index 0000000..a8a3824 --- /dev/null +++ b/internal/bsdaily/snapshot.go @@ -0,0 +1,57 @@ +package bsdaily + +import ( + "fmt" + "log/slog" + "os" + "path/filepath" + "sort" + "time" +) + +func FindLatestDailySnapshot() (dir string, snapshotDate time.Time, err error) { + entries, err := os.ReadDir(SnapshotBase) + if err != nil { + return "", time.Time{}, fmt.Errorf("reading snapshot directory %s: %w", SnapshotBase, err) + } + + type snapshot struct { + name string + date time.Time + } + + var snapshots []snapshot + for _, e := range entries { + if !e.IsDir() { + continue + } + m := snapshotPattern.FindStringSubmatch(e.Name()) + if m == nil { + continue + } + d, err := time.Parse("2006-01-02", m[1]) + if err != nil { + slog.Warn("skipping snapshot with unparseable date", "name", e.Name(), "error", err) + continue + } + snapshots = append(snapshots, snapshot{name: e.Name(), date: d}) + } + + if len(snapshots) == 0 { + return "", time.Time{}, fmt.Errorf("no daily snapshots found in %s", SnapshotBase) + } + + sort.Slice(snapshots, func(i, j int) bool { + return snapshots[i].date.After(snapshots[j].date) + }) + + latest := snapshots[0] + dir = filepath.Join(SnapshotBase, latest.name) + + dbPath := filepath.Join(dir, DBFilename) + if _, err := os.Stat(dbPath); err != nil { + return "", time.Time{}, fmt.Errorf("database not found in snapshot %s: %w", dir, err) + } + + return dir, latest.date, nil +} diff --git a/internal/bsdaily/verify.go b/internal/bsdaily/verify.go new file mode 100644 index 0000000..bcac24b --- /dev/null +++ b/internal/bsdaily/verify.go @@ -0,0 +1,63 @@ +package bsdaily + +import ( + "fmt" + "log/slog" + "os/exec" + "strings" +) + +func VerifyOutput(path string) error { + slog.Info("running zstdmt integrity check") + testCmd := exec.Command("zstdmt", "--test", path) + var testStderr strings.Builder + testCmd.Stderr = &testStderr + if err := testCmd.Run(); err != nil { + return fmt.Errorf("zstdmt --test failed: %w; stderr: %s", err, testStderr.String()) + } + slog.Info("zstdmt integrity check passed") + + slog.Info("verifying SQL content") + catCmd := exec.Command("zstdcat", path) + headCmd := exec.Command("head", "-20") + + pipe, err := catCmd.StdoutPipe() + if err != nil { + return fmt.Errorf("creating zstdcat pipe: %w", err) + } + headCmd.Stdin = pipe + + var headOut strings.Builder + headCmd.Stdout = &headOut + + if err := headCmd.Start(); err != nil { + return fmt.Errorf("starting head: %w", err) + } + if err := catCmd.Start(); err != nil { + return fmt.Errorf("starting zstdcat: %w", err) + } + + // head closes pipe early, zstdcat gets SIGPIPE - expected + _ = headCmd.Wait() + _ = catCmd.Wait() + + content := headOut.String() + if len(content) == 0 { + return fmt.Errorf("decompressed content is empty") + } + + hasSQLMarker := false + for _, marker := range []string{"BEGIN TRANSACTION", "CREATE TABLE", "INSERT INTO", "PRAGMA"} { + if strings.Contains(content, marker) { + hasSQLMarker = true + break + } + } + if !hasSQLMarker { + return fmt.Errorf("decompressed content does not look like SQL; first 200 bytes: %s", + content[:min(200, len(content))]) + } + + slog.Info("SQL content verification passed") + return nil +} diff --git a/schema.sql b/schema.sql new file mode 100644 index 0000000..4ec3209 --- /dev/null +++ b/schema.sql @@ -0,0 +1,88 @@ +CREATE TABLE posts ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL, + sequence INTEGER NOT NULL, + user_did TEXT NOT NULL, + user_handle TEXT, + record_key TEXT NOT NULL, + permalink TEXT, + action TEXT NOT NULL, + cid TEXT, + text TEXT, + created_at TEXT, + langs TEXT, + reply TEXT, + embed TEXT, + facets TEXT, + tags TEXT, + labels TEXT, + has_media BOOLEAN DEFAULT FALSE, + blob_cids TEXT, + raw_json TEXT NOT NULL, + ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(user_did, record_key) + ); +CREATE TABLE sqlite_sequence(name,seq); +CREATE INDEX idx_posts_timestamp ON posts(timestamp); +CREATE INDEX idx_posts_user_did ON posts(user_did); +CREATE INDEX idx_posts_user_handle ON posts(user_handle); +CREATE INDEX idx_posts_record_key ON posts(record_key); +CREATE INDEX idx_posts_action ON posts(action); +CREATE TABLE users ( + did TEXT PRIMARY KEY, + handle TEXT NOT NULL, + display_name TEXT, + resolved_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ); +CREATE INDEX idx_users_handle ON users(handle); +CREATE TABLE media ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + url TEXT NOT NULL UNIQUE, + url_hash TEXT NOT NULL, + content_hash TEXT, + file_path TEXT, + file_size INTEGER, + content_type TEXT, + status TEXT DEFAULT 'pending', + downloaded_at TIMESTAMP, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + error TEXT + ); +CREATE INDEX idx_media_url_hash ON media(url_hash); +CREATE INDEX idx_media_content_hash ON media(content_hash); +CREATE INDEX idx_media_status ON media(status); +CREATE TABLE hashtags ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + tag TEXT NOT NULL UNIQUE, + first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + use_count INTEGER DEFAULT 1 + ); +CREATE INDEX idx_hashtags_tag ON hashtags(tag); +CREATE TABLE posts_hashtags ( + post_id INTEGER NOT NULL, + hashtag_id INTEGER NOT NULL, + PRIMARY KEY (post_id, hashtag_id), + FOREIGN KEY (post_id) REFERENCES posts(id) ON DELETE CASCADE, + FOREIGN KEY (hashtag_id) REFERENCES hashtags(id) ON DELETE CASCADE + ); +CREATE INDEX idx_posts_hashtags_post_id ON posts_hashtags(post_id); +CREATE INDEX idx_posts_hashtags_hashtag_id ON posts_hashtags(hashtag_id); +CREATE TABLE urls ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + url TEXT NOT NULL UNIQUE, + domain TEXT NOT NULL, + first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + use_count INTEGER DEFAULT 1 + ); +CREATE INDEX idx_urls_url ON urls(url); +CREATE INDEX idx_urls_domain ON urls(domain); +CREATE TABLE posts_urls ( + post_id INTEGER NOT NULL, + url_id INTEGER NOT NULL, + PRIMARY KEY (post_id, url_id), + FOREIGN KEY (post_id) REFERENCES posts(id) ON DELETE CASCADE, + FOREIGN KEY (url_id) REFERENCES urls(id) ON DELETE CASCADE + ); +CREATE INDEX idx_posts_urls_post_id ON posts_urls(post_id); +CREATE INDEX idx_posts_urls_url_id ON posts_urls(url_id);