date range

This commit is contained in:
Jeffrey Paul 2026-02-11 03:01:48 -08:00
parent 3c890f0b83
commit 2d46dc70a9
5 changed files with 255 additions and 203 deletions

View File

@ -17,22 +17,56 @@ func main() {
slog.SetDefault(logger) slog.SetDefault(logger)
var dateFlag string var dateFlag string
var fromFlag string
var toFlag string
rootCmd := &cobra.Command{ rootCmd := &cobra.Command{
Use: "bsdaily", Use: "bsdaily",
Short: "Extract a single day's data from the latest daily snapshot", Short: "Extract a single day's data from the latest daily snapshot",
SilenceUsage: true, SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
var targetDate *time.Time hasDate := dateFlag != ""
if dateFlag != "" { hasFrom := fromFlag != ""
t, err := time.Parse("2006-01-02", dateFlag) hasTo := toFlag != ""
if err != nil {
return fmt.Errorf("invalid date %q (expected YYYY-MM-DD): %w", dateFlag, err) // Validate mutual exclusivity
if hasDate && (hasFrom || hasTo) {
return fmt.Errorf("--date and --from/--to are mutually exclusive")
}
if hasFrom != hasTo {
if hasFrom {
return fmt.Errorf("--from requires --to")
} }
targetDate = &t return fmt.Errorf("--to requires --from")
} }
if err := bsdaily.Run(targetDate); err != nil { var targetDates []time.Time
if hasDate {
t, err := time.Parse("2006-01-02", dateFlag)
if err != nil {
return fmt.Errorf("invalid --date %q (expected YYYY-MM-DD): %w", dateFlag, err)
}
targetDates = []time.Time{t}
} else if hasFrom {
from, err := time.Parse("2006-01-02", fromFlag)
if err != nil {
return fmt.Errorf("invalid --from %q (expected YYYY-MM-DD): %w", fromFlag, err)
}
to, err := time.Parse("2006-01-02", toFlag)
if err != nil {
return fmt.Errorf("invalid --to %q (expected YYYY-MM-DD): %w", toFlag, err)
}
if from.After(to) {
return fmt.Errorf("--from %s is after --to %s", fromFlag, toFlag)
}
for d := from; !d.After(to); d = d.AddDate(0, 0, 1) {
targetDates = append(targetDates, d)
}
}
// else: targetDates remains nil → Run() defaults to snapshot date minus one
if err := bsdaily.Run(targetDates); err != nil {
return err return err
} }
slog.Info("completed successfully") slog.Info("completed successfully")
@ -41,6 +75,8 @@ func main() {
} }
rootCmd.Flags().StringVarP(&dateFlag, "date", "d", "", "target date to extract (YYYY-MM-DD); defaults to snapshot date minus one day") rootCmd.Flags().StringVarP(&dateFlag, "date", "d", "", "target date to extract (YYYY-MM-DD); defaults to snapshot date minus one day")
rootCmd.Flags().StringVar(&fromFlag, "from", "", "start of date range to extract (YYYY-MM-DD, inclusive); use with --to")
rootCmd.Flags().StringVar(&toFlag, "to", "", "end of date range to extract (YYYY-MM-DD, inclusive); use with --from")
if err := rootCmd.Execute(); err != nil { if err := rootCmd.Execute(); err != nil {
os.Exit(1) os.Exit(1)

View File

@ -11,8 +11,7 @@ const (
SHMFilename = "firehose.db-shm" SHMFilename = "firehose.db-shm"
MinTmpFreeBytes = 500 * 1024 * 1024 * 1024 // 500 GB MinTmpFreeBytes = 500 * 1024 * 1024 * 1024 // 500 GB
MinDailiesFreeBytes = 20 * 1024 * 1024 * 1024 // 20 GB MinDailiesFreeBytes = 20 * 1024 * 1024 * 1024 // 20 GB
PostCopyTmpMinFree = 100 * 1024 * 1024 * 1024 // 100 GB
) )
var snapshotPattern = regexp.MustCompile(`^zfs-auto-snap_daily-(\d{4}-\d{2}-\d{2})-\d{4}$`) var snapshotPattern = regexp.MustCompile(`^zfs-auto-snap_daily-(\d{4}-\d{2}-\d{2})-\d{4}$`)

139
internal/bsdaily/extract.go Normal file
View File

@ -0,0 +1,139 @@
package bsdaily
import (
"database/sql"
"errors"
"fmt"
"log/slog"
"time"
_ "modernc.org/sqlite"
)
var ErrNoPosts = errors.New("no posts found for target day")
// ExtractDay opens a new empty database at dstDBPath, attaches srcDBPath,
// and copies only the target day's data into it. This is much faster than
// pruning a full copy because it only reads/writes the small slice of data
// being kept.
func ExtractDay(srcDBPath, dstDBPath string, targetDay time.Time) error {
dayStart := targetDay.Format("2006-01-02") + "T00:00:00"
dayEnd := targetDay.AddDate(0, 0, 1).Format("2006-01-02") + "T00:00:00"
slog.Info("extracting day", "from", dayStart, "until", dayEnd)
db, err := sql.Open("sqlite", dstDBPath+"?_pragma=journal_mode(MEMORY)&_pragma=synchronous(OFF)&_pragma=cache_size(-200000)&_pragma=foreign_keys(OFF)")
if err != nil {
return fmt.Errorf("opening destination database: %w", err)
}
defer db.Close()
// Attach source database
if _, err := db.Exec("ATTACH DATABASE ? AS src", srcDBPath); err != nil {
return fmt.Errorf("attaching source database: %w", err)
}
// Copy table DDL from source
slog.Info("copying table DDL from source")
rows, err := db.Query("SELECT sql FROM src.sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name")
if err != nil {
return fmt.Errorf("reading source schema: %w", err)
}
defer rows.Close()
var ddlStatements []string
for rows.Next() {
var ddl string
if err := rows.Scan(&ddl); err != nil {
return fmt.Errorf("scanning DDL: %w", err)
}
ddlStatements = append(ddlStatements, ddl)
}
if err := rows.Err(); err != nil {
return fmt.Errorf("iterating DDL rows: %w", err)
}
for _, ddl := range ddlStatements {
if _, err := db.Exec(ddl); err != nil {
return fmt.Errorf("creating table: %w\nDDL: %s", err, ddl)
}
}
// Insert target day's data
slog.Info("inserting posts for target day")
result, err := db.Exec("INSERT INTO posts SELECT * FROM src.posts WHERE timestamp >= ? AND timestamp < ?", dayStart, dayEnd)
if err != nil {
return fmt.Errorf("inserting posts: %w", err)
}
postCount, _ := result.RowsAffected()
slog.Info("inserted posts", "count", postCount)
if postCount == 0 {
return fmt.Errorf("%w %s - aborting to avoid producing empty output",
ErrNoPosts, targetDay.Format("2006-01-02"))
}
slog.Info("inserting junction and lookup tables")
if _, err := db.Exec("INSERT INTO posts_hashtags SELECT * FROM src.posts_hashtags WHERE post_id IN (SELECT id FROM posts)"); err != nil {
return fmt.Errorf("inserting posts_hashtags: %w", err)
}
if _, err := db.Exec("INSERT INTO posts_urls SELECT * FROM src.posts_urls WHERE post_id IN (SELECT id FROM posts)"); err != nil {
return fmt.Errorf("inserting posts_urls: %w", err)
}
if _, err := db.Exec("INSERT INTO hashtags SELECT * FROM src.hashtags WHERE id IN (SELECT hashtag_id FROM posts_hashtags)"); err != nil {
return fmt.Errorf("inserting hashtags: %w", err)
}
if _, err := db.Exec("INSERT INTO urls SELECT * FROM src.urls WHERE id IN (SELECT url_id FROM posts_urls)"); err != nil {
return fmt.Errorf("inserting urls: %w", err)
}
if _, err := db.Exec("INSERT INTO users SELECT * FROM src.users WHERE did IN (SELECT user_did FROM posts)"); err != nil {
return fmt.Errorf("inserting users: %w", err)
}
// Create indexes after bulk insert for speed
slog.Info("creating indexes")
idxRows, err := db.Query("SELECT sql FROM src.sqlite_master WHERE type='index' AND name NOT LIKE 'sqlite_%' AND sql IS NOT NULL ORDER BY name")
if err != nil {
return fmt.Errorf("reading source indexes: %w", err)
}
defer idxRows.Close()
var idxStatements []string
for idxRows.Next() {
var idxSQL string
if err := idxRows.Scan(&idxSQL); err != nil {
return fmt.Errorf("scanning index DDL: %w", err)
}
idxStatements = append(idxStatements, idxSQL)
}
if err := idxRows.Err(); err != nil {
return fmt.Errorf("iterating index rows: %w", err)
}
for _, idxSQL := range idxStatements {
if _, err := db.Exec(idxSQL); err != nil {
return fmt.Errorf("creating index: %w\nDDL: %s", err, idxSQL)
}
}
// Detach source
if _, err := db.Exec("DETACH DATABASE src"); err != nil {
return fmt.Errorf("detaching source database: %w", err)
}
// Verify post count
var verifyCount int64
if err := db.QueryRow("SELECT COUNT(*) FROM posts").Scan(&verifyCount); err != nil {
return fmt.Errorf("verifying post count: %w", err)
}
if verifyCount != postCount {
return fmt.Errorf("post count mismatch: inserted %d but found %d", postCount, verifyCount)
}
slog.Info("extraction complete", "posts", verifyCount)
return nil
}

View File

@ -1,143 +0,0 @@
package bsdaily
import (
"database/sql"
"fmt"
"log/slog"
"os"
"time"
_ "modernc.org/sqlite"
)
func PruneDatabase(dbPath string, targetDay time.Time) error {
dayStart := targetDay.Format("2006-01-02") + "T00:00:00"
dayEnd := targetDay.AddDate(0, 0, 1).Format("2006-01-02") + "T00:00:00"
slog.Info("pruning database", "keep_from", dayStart, "keep_until", dayEnd)
// No foreign_keys — we handle junction tables manually to avoid per-row CASCADE overhead.
// journal_mode=MEMORY and synchronous=OFF are safe because this is a throwaway copy.
db, err := sql.Open("sqlite", dbPath+"?_pragma=journal_mode(MEMORY)&_pragma=synchronous(OFF)&_pragma=cache_size(-200000)")
if err != nil {
return fmt.Errorf("opening database: %w", err)
}
defer db.Close()
// Verify we can query
var totalPosts int64
if err := db.QueryRow("SELECT COUNT(*) FROM posts").Scan(&totalPosts); err != nil {
return fmt.Errorf("counting posts: %w", err)
}
slog.Info("total posts before pruning", "count", totalPosts)
// Count posts in target range
var keepPosts int64
if err := db.QueryRow("SELECT COUNT(*) FROM posts WHERE timestamp >= ? AND timestamp < ?",
dayStart, dayEnd).Scan(&keepPosts); err != nil {
return fmt.Errorf("counting target posts: %w", err)
}
slog.Info("posts in target day", "count", keepPosts, "date", targetDay.Format("2006-01-02"))
if keepPosts == 0 {
return fmt.Errorf("no posts found for target day %s - aborting to avoid producing empty output",
targetDay.Format("2006-01-02"))
}
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("beginning transaction: %w", err)
}
defer tx.Rollback()
// Delete junction table rows for posts outside target day (bulk, no CASCADE overhead)
slog.Info("deleting junction table rows for non-target posts")
keepSubquery := "SELECT id FROM posts WHERE timestamp >= ? AND timestamp < ?"
result, err := tx.Exec("DELETE FROM posts_hashtags WHERE post_id NOT IN ("+keepSubquery+")", dayStart, dayEnd)
if err != nil {
return fmt.Errorf("deleting posts_hashtags: %w", err)
}
deleted, _ := result.RowsAffected()
slog.Info("deleted posts_hashtags rows", "count", deleted)
result, err = tx.Exec("DELETE FROM posts_urls WHERE post_id NOT IN ("+keepSubquery+")", dayStart, dayEnd)
if err != nil {
return fmt.Errorf("deleting posts_urls: %w", err)
}
deleted, _ = result.RowsAffected()
slog.Info("deleted posts_urls rows", "count", deleted)
// Delete posts outside target day
slog.Info("deleting posts outside target day")
result, err = tx.Exec("DELETE FROM posts WHERE timestamp < ? OR timestamp >= ?", dayStart, dayEnd)
if err != nil {
return fmt.Errorf("deleting posts: %w", err)
}
deleted, _ = result.RowsAffected()
slog.Info("deleted posts", "count", deleted)
// Clean up orphaned hashtags
slog.Info("cleaning orphaned hashtags")
result, err = tx.Exec("DELETE FROM hashtags WHERE id NOT IN (SELECT DISTINCT hashtag_id FROM posts_hashtags)")
if err != nil {
return fmt.Errorf("deleting orphaned hashtags: %w", err)
}
deleted, _ = result.RowsAffected()
slog.Info("deleted orphaned hashtags", "count", deleted)
// Clean up orphaned urls
slog.Info("cleaning orphaned urls")
result, err = tx.Exec("DELETE FROM urls WHERE id NOT IN (SELECT DISTINCT url_id FROM posts_urls)")
if err != nil {
return fmt.Errorf("deleting orphaned urls: %w", err)
}
deleted, _ = result.RowsAffected()
slog.Info("deleted orphaned urls", "count", deleted)
// Clear media (no FK to posts, can't prune until post_id migration lands)
slog.Info("clearing media table")
result, err = tx.Exec("DELETE FROM media")
if err != nil {
return fmt.Errorf("deleting media: %w", err)
}
deleted, _ = result.RowsAffected()
slog.Info("deleted media rows", "count", deleted)
// Clean up orphaned users
slog.Info("cleaning orphaned users")
result, err = tx.Exec("DELETE FROM users WHERE did NOT IN (SELECT DISTINCT user_did FROM posts)")
if err != nil {
return fmt.Errorf("deleting orphaned users: %w", err)
}
deleted, _ = result.RowsAffected()
slog.Info("deleted orphaned users", "count", deleted)
if err := tx.Commit(); err != nil {
return fmt.Errorf("committing transaction: %w", err)
}
// Verify remaining count
var remaining int64
if err := db.QueryRow("SELECT COUNT(*) FROM posts").Scan(&remaining); err != nil {
return fmt.Errorf("counting remaining posts: %w", err)
}
if remaining != keepPosts {
return fmt.Errorf("post count mismatch after prune: expected %d, got %d", keepPosts, remaining)
}
slog.Info("posts remaining after prune", "count", remaining)
// VACUUM to reclaim space
slog.Info("running VACUUM")
if _, err := db.Exec("VACUUM"); err != nil {
return fmt.Errorf("vacuum: %w", err)
}
info, err := os.Stat(dbPath)
if err != nil {
return fmt.Errorf("stat after vacuum: %w", err)
}
slog.Info("database size after vacuum", "bytes", info.Size(), "mb", info.Size()/(1024*1024))
return nil
}

View File

@ -1,6 +1,7 @@
package bsdaily package bsdaily
import ( import (
"errors"
"fmt" "fmt"
"log/slog" "log/slog"
"os" "os"
@ -8,27 +9,20 @@ import (
"time" "time"
) )
func Run(targetDate *time.Time) error { func Run(targetDates []time.Time) error {
snapshotDir, snapshotDate, err := FindLatestDailySnapshot() snapshotDir, snapshotDate, err := FindLatestDailySnapshot()
if err != nil { if err != nil {
return fmt.Errorf("finding latest snapshot: %w", err) return fmt.Errorf("finding latest snapshot: %w", err)
} }
slog.Info("found latest daily snapshot", "dir", snapshotDir, "snapshot_date", snapshotDate.Format("2006-01-02")) slog.Info("found latest daily snapshot", "dir", snapshotDir, "snapshot_date", snapshotDate.Format("2006-01-02"))
var targetDay time.Time if len(targetDates) == 0 {
if targetDate != nil { targetDates = []time.Time{snapshotDate.AddDate(0, 0, -1)}
targetDay = *targetDate
} else {
targetDay = snapshotDate.AddDate(0, 0, -1)
} }
slog.Info("target day for extraction", "date", targetDay.Format("2006-01-02"))
// Check if output already exists slog.Info("target days for extraction", "count", len(targetDates),
outputDir := filepath.Join(DailiesBase, targetDay.Format("2006-01")) "first", targetDates[0].Format("2006-01-02"),
outputFinal := filepath.Join(outputDir, targetDay.Format("2006-01-02")+".sql.zst") "last", targetDates[len(targetDates)-1].Format("2006-01-02"))
if _, err := os.Stat(outputFinal); err == nil {
return fmt.Errorf("output file already exists: %s", outputFinal)
}
// Check disk space // Check disk space
if err := CheckFreeSpace(TmpBase, MinTmpFreeBytes, "tmpBase"); err != nil { if err := CheckFreeSpace(TmpBase, MinTmpFreeBytes, "tmpBase"); err != nil {
@ -82,46 +76,73 @@ func Run(targetDate *time.Time) error {
} }
} }
if err := CheckFreeSpace(TmpBase, PostCopyTmpMinFree, "tmpBase (post-copy)"); err != nil { // Process each day
return err processed := 0
skipped := 0
for _, targetDay := range targetDates {
dayStr := targetDay.Format("2006-01-02")
slog.Info("processing day", "date", dayStr)
// Check if output already exists
outputDir := filepath.Join(DailiesBase, targetDay.Format("2006-01"))
outputFinal := filepath.Join(outputDir, dayStr+".sql.zst")
if _, err := os.Stat(outputFinal); err == nil {
slog.Info("output already exists, skipping", "path", outputFinal)
skipped++
continue
}
// Extract target day into a per-day database
extractedDB := filepath.Join(tmpDir, "extracted-"+dayStr+".db")
slog.Info("extracting target day", "src", dstDB, "dst", extractedDB)
if err := ExtractDay(dstDB, extractedDB, targetDay); err != nil {
if errors.Is(err, ErrNoPosts) {
slog.Warn("no posts found, skipping day", "date", dayStr)
os.Remove(extractedDB)
skipped++
continue
}
return fmt.Errorf("extracting day %s: %w", dayStr, err)
}
// Dump to SQL and compress
if err := os.MkdirAll(outputDir, 0755); err != nil {
return fmt.Errorf("creating output directory %s: %w", outputDir, err)
}
outputTmp := filepath.Join(outputDir, "."+dayStr+".sql.zst.tmp")
slog.Info("dumping and compressing", "tmp_output", outputTmp)
if err := DumpAndCompress(extractedDB, outputTmp); err != nil {
os.Remove(outputTmp)
return fmt.Errorf("dump and compress for %s: %w", dayStr, err)
}
slog.Info("verifying compressed output")
if err := VerifyOutput(outputTmp); err != nil {
os.Remove(outputTmp)
return fmt.Errorf("verification failed for %s: %w", dayStr, err)
}
// Atomic rename to final path
slog.Info("renaming to final output", "from", outputTmp, "to", outputFinal)
if err := os.Rename(outputTmp, outputFinal); err != nil {
return fmt.Errorf("atomic rename for %s: %w", dayStr, err)
}
info, err := os.Stat(outputFinal)
if err != nil {
return fmt.Errorf("stat final output: %w", err)
}
slog.Info("day completed", "date", dayStr, "path", outputFinal, "size_bytes", info.Size())
// Remove extracted DB to reclaim space
os.Remove(extractedDB)
processed++
} }
// Prune database to target day only slog.Info("run summary", "processed", processed, "skipped", skipped, "total", len(targetDates))
slog.Info("opening database for pruning", "path", dstDB)
if err := PruneDatabase(dstDB, targetDay); err != nil {
return fmt.Errorf("pruning database: %w", err)
}
// Dump to SQL and compress
if err := os.MkdirAll(outputDir, 0755); err != nil {
return fmt.Errorf("creating output directory %s: %w", outputDir, err)
}
outputTmp := filepath.Join(outputDir, "."+targetDay.Format("2006-01-02")+".sql.zst.tmp")
slog.Info("dumping and compressing", "tmp_output", outputTmp)
if err := DumpAndCompress(dstDB, outputTmp); err != nil {
os.Remove(outputTmp)
return fmt.Errorf("dump and compress: %w", err)
}
slog.Info("verifying compressed output")
if err := VerifyOutput(outputTmp); err != nil {
os.Remove(outputTmp)
return fmt.Errorf("verification failed: %w", err)
}
// Atomic rename to final path
slog.Info("renaming to final output", "from", outputTmp, "to", outputFinal)
if err := os.Rename(outputTmp, outputFinal); err != nil {
return fmt.Errorf("atomic rename: %w", err)
}
info, err := os.Stat(outputFinal)
if err != nil {
return fmt.Errorf("stat final output: %w", err)
}
slog.Info("final output written", "path", outputFinal, "size_bytes", info.Size())
return nil return nil
} }